Kafka Cluster 2019(4)Spring Boot Kafka Reactive

KafkaCluster2019(4)SpringBootKafkaReactive

Foundsomearticlesandonesamplesherehttps://github.com/vanseverk/paymentprocessor-kafka-intro/tree/master/paymentprocessor-gateway

toavoidexception,disableSerializationFeature.FAIL_ON_EMPTY_BEANS

AdddependencysupportstoPOM,pom.xml

<dependency>

<groupId>io.projectreactor.kafka</groupId>

<artifactId>reactor-kafka</artifactId>

</dependency>

KafkaConfigurationclasstoloadtheconfigurationsfromYAML

packagecom.sillycat.webfluxlatency.config;

importjava.util.Collections;

importjava.util.HashMap;

importjava.util.Map;

importorg.apache.kafka.clients.consumer.ConsumerConfig;

importorg.apache.kafka.clients.producer.ProducerConfig;

importorg.apache.kafka.common.serialization.IntegerDeserializer;

importorg.apache.kafka.common.serialization.IntegerSerializer;

importorg.apache.kafka.common.serialization.StringDeserializer;

importorg.apache.kafka.common.serialization.StringSerializer;

importorg.springframework.beans.factory.annotation.Value;

importorg.springframework.boot.context.properties.EnableConfigurationProperties;

importorg.springframework.context.annotation.Bean;

importorg.springframework.context.annotation.Configuration;

importlombok.Getter;

importlombok.Setter;

importlombok.extern.slf4j.Slf4j;

importreactor.kafka.receiver.KafkaReceiver;

importreactor.kafka.receiver.ReceiverOptions;

importreactor.kafka.sender.KafkaSender;

importreactor.kafka.sender.SenderOptions;

@Slf4j

@Setter

@Getter

@Configuration

@EnableConfigurationProperties

publicclassKafkaConfig{

@Value("${spring.kafka.bootstrap-servers}")

privateStringbootstrapServers;

@Bean

publicKafkaSender<Integer,String>kafkaProducer(){

finalMap<String,Object>producerProps=newHashMap<>();

producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class);

producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);

finalSenderOptions<Integer,String>producerOptions=SenderOptions.create(producerProps);

returnKafkaSender.create(producerOptions);

}

@Bean

publicKafkaReceiver<Object,Object>kafkaReceiver(){

finalMap<String,Object>consumerProps=newHashMap<>();

consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,IntegerDeserializer.class);

consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG,"webflux-1");

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"webflux");

consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);

ReceiverOptions<Object,Object>consumerOptions=ReceiverOptions.create(consumerProps)

.subscription(Collections.singleton("hello-webflux"))

.addAssignListener(partitions->log.debug("onPartitionsAssigned{}",partitions))

.addRevokeListener(partitions->log.debug("onPartitionsRevoked{}",partitions));

returnKafkaReceiver.create(consumerOptions);

}

}

Theimplementationclasswhichsendoutmessagesandconsumemessages

packagecom.sillycat.webfluxlatency.service;

importjavax.annotation.PostConstruct;

importorg.apache.kafka.clients.producer.ProducerRecord;

importorg.springframework.beans.factory.annotation.Autowired;

importorg.springframework.stereotype.Service;

importlombok.extern.slf4j.Slf4j;

importreactor.core.publisher.Flux;

importreactor.core.publisher.Mono;

importreactor.kafka.receiver.KafkaReceiver;

importreactor.kafka.receiver.ReceiverRecord;

importreactor.kafka.sender.KafkaSender;

importreactor.kafka.sender.SenderRecord;

importreactor.kafka.sender.SenderResult;

@Slf4j

@Service

publicclassKafkaServiceImplimplementsKafkaService{

@Autowired

KafkaSender<Integer,String>kafkaSender;

@Autowired

KafkaReceiver<Object,Object>kafkaReceiver;

@PostConstruct

publicvoidinit(){

((Flux<ReceiverRecord<Object,Object>>)kafkaReceiver.receive()).doOnNext(r->{

processEvent(r.value().toString());

r.receiverOffset().acknowledge();

}).subscribe();

}

privatevoidprocessEvent(Stringmessage){

log.info("receivedmessage:"+message);

}

publicMono<SenderResult<Integer>>send(finalStringpayload){

log.info("sendoutmessage:"+payload);

SenderRecord<Integer,String,Integer>message=SenderRecord

.create(newProducerRecord<>("hello-webflux",payload),1);

returnkafkaSender.send(Mono.just(message)).next();

}

}

WebfluxControllerwhichsendbackMONO

packagecom.sillycat.webfluxlatency.web;

importorg.springframework.beans.factory.annotation.Autowired;

importorg.springframework.web.bind.annotation.PathVariable;

importorg.springframework.web.bind.annotation.PostMapping;

importorg.springframework.web.bind.annotation.RequestBody;

importorg.springframework.web.bind.annotation.RestController;

importcom.sillycat.webfluxlatency.service.KafkaService;

importio.swagger.annotations.Api;

importlombok.extern.slf4j.Slf4j;

importreactor.core.publisher.Mono;

importreactor.kafka.sender.SenderResult;

@Slf4j

@Api(value="/kafka/")

@RestController

publicclassKafkaController{

@Autowired

KafkaServicekafkaService;

@PostMapping("/kafka/{topicName}")

publicMono<SenderResult<Integer>>sendToTopic(@PathVariableStringtopicName,@RequestBodyStringmessage){

log.info("topicName"+topicName+"message"+message);

returnkafkaService.send(message);

}

}

ApplicationYAMLconfigurationFile

spring:

jackson:

serialization:

FAIL_ON_EMPTY_BEANS:false

kafka:

consumer:

group-id:mvccluster

bootstrap-servers:ubuntu-master:9092,ubuntu-dev2:9092,ubuntu-dev4:9092

ThenIcaneasilysendoutmessagesandconsumemessages.

References:

https://github.com/reactor/reactor-kafka

https://www.reactiveprogramming.be/an-introduction-to-reactor-kafka/

https://codar.club/blogs/reactor-kafka-via-spring-boot-webflux.html

相关推荐