【消息队列-Kafka】01-Kafka入门使用
一、引入kafka pom三方配置
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency>
二、生产者使用实例
1.生产者配置
public class KafkaProducerClient { private Producer<String, String> producer; private KafkaProducerClient() { } /** * 获取kafka消费端实例 */ public KafkaProducerClient(Properties props) { // kafka生产者配置详解:https://www.jianshu.com/p/9a31538ea4b3 props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(props); } /** * 消息同步阻塞发送 */ public String sendSyncMessage(String topic, String message) throws Exception { RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topic, message)).get(); return recordMetadata.offset() + "|" + recordMetadata.partition(); } /** * 消息异步非阻塞发送 */ public void sendAsyncMessage(String topic, String message) { producer.send(new ProducerRecord<String, String>(topic, message)); } }
2.单元测试
public class KafkaProducerClientTest { private static final String TOPIC="topic-zcx"; @Test public void sendMessage() throws Exception { Properties props=new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.97.167.84:19090"); //配置retry时数据的幂等性,避免数据重复提交 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true"); //配置数据压缩 //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); KafkaProducerClient kafkaProducerClient=new KafkaProducerClient(props); String offset=kafkaProducerClient.sendSyncMessage(TOPIC,"hello kafka"); System.out.printf("offset:"+offset); } }
三、消费者使用实例
1.消费者配置
public class KafkaConsumerClient { private KafkaConsumer consumer; private KafkaConsumerClient() { } /** * 获取kafka消费端实例 */ public KafkaConsumerClient(Properties properties) { // kafka消费端配置详解:https://blog.csdn.net/Dongguabai/article/details/86524023?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-1 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer(properties); } /** * 监听消费指定topic数据 */ public void receive(String topic, IConsumerListener callable) { consumer.subscribe(Arrays.asList(topic)); while (true) { callable.doReceive(consumer); } } public interface IConsumerListener { void doReceive(KafkaConsumer consumer); } }
2.单元测试
public class KafkaConsumerClientTest { private static final String TOPIC="topic-zcx"; @Test public void receive() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "47.97.167.84:19090,47.97.167.84:19091,47.97.167.84:19092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-test"); properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); KafkaConsumerClient kafkaConsumerClient=new KafkaConsumerClient(properties); kafkaConsumerClient.receive(TOPIC,consumer -> { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %s,value = %s\n", record.partition()+"|"+record.offset(),record.value()); } }); } }
相关推荐
sweetgirl0 2020-06-28
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26