Kafka 消费者API
消费者api,自动提交offset
public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); //连接的集群 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //开启自动提交(消费偏移量) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //自动提交的延迟 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //KV的反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc"); //消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); //订阅主题 kafkaConsumer.subscribe(Collections.singletonList("first")); while (true){ //获取数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); //解析数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()+"-"+consumerRecord.value()); } } } }
手动提交offset,同步提交
public class ConsumerOffsetSync { public static void main(String[] args) { Properties props = new Properties(); //连接的集群 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //关闭自动提交(消费偏移量) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //KV的反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1"); //重置offset。 //earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在 //latest:默认值,消费最新的数据。 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //消费者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); //订阅主题 kafkaConsumer.subscribe(Collections.singletonList("first")); while (true){ //获取数据 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); //解析数据 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()+"-"+consumerRecord.value()); } //同步提交,当前线程会阻塞直到 offset 提交成功 kafkaConsumer.commitSync(); } } }
手动提交offset,异步提交
//异步提交 kafkaConsumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed for" + offsets); } });
相关推荐
sweetgirl0 2020-06-28
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
jacktangj 2020-10-14
ChaITSimpleLove 2020-10-06
Andrea0 2020-09-18
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26