springboot+kafka
1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕
2,我用的依赖是
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件
kafka: bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092 producer: retries: 1 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092 consumer: bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092 enable-auto-commit: true auto-offset-reset: latest auto-commit-interval: 1000 group-id: gzj
然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
消费者,监听
@KafkaListener(topics = {"gzj"}) public void receive(String content){ System.err.println("Receive:" + content); }
消费者还有另一种方法,
package com.gzj.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Description * <p> * </p> * DATE 2018/10/23. * * @author guozhenjiang. */ @Component public class KafkaConsumerTask implements Runnable,InitializingBean { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class); private Thread thread; @Resource(name="kafkaConsumer") private KafkaConsumer<String,String> kafkaConsumer; @Override public void run() { logger.info("消费数据任务启动"); while(true){ try{ ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000); if(records!=null){ for(ConsumerRecord<String ,String > record:records){ logger.error(record.key()); logger.error(record.topic()); logger.error(record.value()); } } }catch(Exception e){ // logger.error("我也不知道哪儿错了"); }finally { // logger.error("不放弃"); } } } @Override public void afterPropertiesSet() throws Exception { this.thread=new Thread(this); this.thread.start(); } }
package com.gzj.demo.config; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Properties; /** * Description * <p> * </p> * DATE 2018/10/23. * * @author guozhenjiang. */ @Configuration @ConfigurationProperties(prefix = "spring.kafka") public class KafkaConnectConfig { @Bean(name = "kafkaConsumer") public KafkaConsumer<String, String> kafkaConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("group.id", "ggg"); props.setProperty("enable.auto.commit", enableAutoCommit); props.setProperty("auto.offset.reset", autoOffsetReset); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("gzj")); return consumer; } @Value("${server.port}") private String port; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getEnableAutoCommit() { return enableAutoCommit; } public void setEnableAutoCommit(String enableAutoCommit) { this.enableAutoCommit = enableAutoCommit; } public String getAutoOffsetReset() { return autoOffsetReset; } public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; } }
后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者
3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。
也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录
相关推荐
Kafka 2020-09-18
sweetgirl0 2020-07-27
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01