消费消息+手动提交+同步异步
依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
代码
package com.perfect.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.Properties; public class KafkaComsumerTest { @Test public void cunsumertest(){ Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); //关闭自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //latest,earliest props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG,"bigdata1"); KafkaConsumer<String,String> c = new KafkaConsumer<String, String>(props); c.subscribe(Collections.singletonList("test2")); while(true){ for(int i=0;i<10000;i++){ ConsumerRecords records = c.poll(100); records.forEach(System.out::println); } //同步提交 c.commitSync(); //异步提交 //c.commitAsync(); } // c.close(); } }
相关推荐
Kafka 2020-09-18
Wepe0 2020-10-30
杜倩 2020-10-29
windle 2020-10-29
minerd 2020-10-28
mengzuchao 2020-10-22
Junzizhiai 2020-10-10
bxqybxqy 2020-09-30
风之沙城 2020-09-24
kingszelda 2020-09-22
大唐帝国前营 2020-08-18
yixu0 2020-08-17
TangCuYu 2020-08-15
xiaoboliu00 2020-08-15
songshijiazuaa 2020-08-15
xclxcl 2020-08-03
zmzmmf 2020-08-03
newfarhui 2020-08-03
likesyour 2020-08-01