Kafka Cluster 2019(4)Spring Boot Kafka Reactive
KafkaCluster2019(4)SpringBootKafkaReactive
Foundsomearticlesandonesamplesherehttps://github.com/vanseverk/paymentprocessor-kafka-intro/tree/master/paymentprocessor-gateway
toavoidexception,disableSerializationFeature.FAIL_ON_EMPTY_BEANS
AdddependencysupportstoPOM,pom.xml
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
KafkaConfigurationclasstoloadtheconfigurationsfromYAML
packagecom.sillycat.webfluxlatency.config;
importjava.util.Collections;
importjava.util.HashMap;
importjava.util.Map;
importorg.apache.kafka.clients.consumer.ConsumerConfig;
importorg.apache.kafka.clients.producer.ProducerConfig;
importorg.apache.kafka.common.serialization.IntegerDeserializer;
importorg.apache.kafka.common.serialization.IntegerSerializer;
importorg.apache.kafka.common.serialization.StringDeserializer;
importorg.apache.kafka.common.serialization.StringSerializer;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.boot.context.properties.EnableConfigurationProperties;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importlombok.Getter;
importlombok.Setter;
importlombok.extern.slf4j.Slf4j;
importreactor.kafka.receiver.KafkaReceiver;
importreactor.kafka.receiver.ReceiverOptions;
importreactor.kafka.sender.KafkaSender;
importreactor.kafka.sender.SenderOptions;
@Slf4j
@Setter
@Getter
@Configuration
@EnableConfigurationProperties
publicclassKafkaConfig{
@Value("${spring.kafka.bootstrap-servers}")
privateStringbootstrapServers;
@Bean
publicKafkaSender<Integer,String>kafkaProducer(){
finalMap<String,Object>producerProps=newHashMap<>();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
finalSenderOptions<Integer,String>producerOptions=SenderOptions.create(producerProps);
returnKafkaSender.create(producerOptions);
}
@Bean
publicKafkaReceiver<Object,Object>kafkaReceiver(){
finalMap<String,Object>consumerProps=newHashMap<>();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG,"webflux-1");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"webflux");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
ReceiverOptions<Object,Object>consumerOptions=ReceiverOptions.create(consumerProps)
.subscription(Collections.singleton("hello-webflux"))
.addAssignListener(partitions->log.debug("onPartitionsAssigned{}",partitions))
.addRevokeListener(partitions->log.debug("onPartitionsRevoked{}",partitions));
returnKafkaReceiver.create(consumerOptions);
}
}
Theimplementationclasswhichsendoutmessagesandconsumemessages
packagecom.sillycat.webfluxlatency.service;
importjavax.annotation.PostConstruct;
importorg.apache.kafka.clients.producer.ProducerRecord;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.stereotype.Service;
importlombok.extern.slf4j.Slf4j;
importreactor.core.publisher.Flux;
importreactor.core.publisher.Mono;
importreactor.kafka.receiver.KafkaReceiver;
importreactor.kafka.receiver.ReceiverRecord;
importreactor.kafka.sender.KafkaSender;
importreactor.kafka.sender.SenderRecord;
importreactor.kafka.sender.SenderResult;
@Slf4j
@Service
publicclassKafkaServiceImplimplementsKafkaService{
@Autowired
KafkaSender<Integer,String>kafkaSender;
@Autowired
KafkaReceiver<Object,Object>kafkaReceiver;
@PostConstruct
publicvoidinit(){
((Flux<ReceiverRecord<Object,Object>>)kafkaReceiver.receive()).doOnNext(r->{
processEvent(r.value().toString());
r.receiverOffset().acknowledge();
}).subscribe();
}
privatevoidprocessEvent(Stringmessage){
log.info("receivedmessage:"+message);
}
publicMono<SenderResult<Integer>>send(finalStringpayload){
log.info("sendoutmessage:"+payload);
SenderRecord<Integer,String,Integer>message=SenderRecord
.create(newProducerRecord<>("hello-webflux",payload),1);
returnkafkaSender.send(Mono.just(message)).next();
}
}
WebfluxControllerwhichsendbackMONO
packagecom.sillycat.webfluxlatency.web;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.web.bind.annotation.PathVariable;
importorg.springframework.web.bind.annotation.PostMapping;
importorg.springframework.web.bind.annotation.RequestBody;
importorg.springframework.web.bind.annotation.RestController;
importcom.sillycat.webfluxlatency.service.KafkaService;
importio.swagger.annotations.Api;
importlombok.extern.slf4j.Slf4j;
importreactor.core.publisher.Mono;
importreactor.kafka.sender.SenderResult;
@Slf4j
@Api(value="/kafka/")
@RestController
publicclassKafkaController{
@Autowired
KafkaServicekafkaService;
@PostMapping("/kafka/{topicName}")
publicMono<SenderResult<Integer>>sendToTopic(@PathVariableStringtopicName,@RequestBodyStringmessage){
log.info("topicName"+topicName+"message"+message);
returnkafkaService.send(message);
}
}
ApplicationYAMLconfigurationFile
spring:
jackson:
serialization:
FAIL_ON_EMPTY_BEANS:false
kafka:
consumer:
group-id:mvccluster
bootstrap-servers:ubuntu-master:9092,ubuntu-dev2:9092,ubuntu-dev4:9092
ThenIcaneasilysendoutmessagesandconsumemessages.
References:
https://github.com/reactor/reactor-kafka
https://www.reactiveprogramming.be/an-introduction-to-reactor-kafka/
https://codar.club/blogs/reactor-kafka-via-spring-boot-webflux.html