springboot+kafka

1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕

2,我用的依赖是

<dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
</dependency>

springboot+kafka

配置文件

kafka:
 bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092
 producer:
 retries: 1
 batch-size: 16384
 buffer-memory: 33554432
 key-serializer: org.apache.kafka.common.serialization.StringSerializer
 value-serializer: org.apache.kafka.common.serialization.StringSerializer
 bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092
 consumer:
 bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092
 enable-auto-commit: true
 auto-offset-reset: latest
 auto-commit-interval: 1000
 group-id: gzj

然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

消费者,监听

@KafkaListener(topics = {"gzj"})
public void receive(String content){
 System.err.println("Receive:" + content);
}

消费者还有另一种方法,

package com.gzj.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Component
public class KafkaConsumerTask implements Runnable,InitializingBean {
 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);
 private Thread thread;
 @Resource(name="kafkaConsumer")
 private KafkaConsumer<String,String> kafkaConsumer;
 @Override
 public void run() {
 logger.info("消费数据任务启动");
 while(true){
 try{
 ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);
 if(records!=null){
 for(ConsumerRecord<String ,String > record:records){
 logger.error(record.key());
 logger.error(record.topic());
 logger.error(record.value());
 }
 }
 }catch(Exception e){
 // logger.error("我也不知道哪儿错了");
 }finally {
 // logger.error("不放弃");
 }
 }
 }
 @Override
 public void afterPropertiesSet() throws Exception {
 this.thread=new Thread(this);
 this.thread.start();
 }
}
package com.gzj.demo.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Properties;
/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaConnectConfig {
 @Bean(name = "kafkaConsumer")
 public KafkaConsumer<String, String> kafkaConsumer() {
 Properties props = new Properties();
 props.put("bootstrap.servers", bootstrapServers);
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.setProperty("group.id", "ggg");
 props.setProperty("enable.auto.commit", enableAutoCommit);
 props.setProperty("auto.offset.reset", autoOffsetReset);
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 consumer.subscribe(Arrays.asList("gzj"));
 return consumer;
 }
 @Value("${server.port}")
 private String port;
 @Value("${spring.kafka.consumer.group-id}")
 private String groupId;
 @Value("${spring.kafka.bootstrap-servers}")
 private String bootstrapServers;
 @Value("${spring.kafka.consumer.enable-auto-commit}")
 private String enableAutoCommit;
 @Value("${spring.kafka.consumer.auto-offset-reset}")
 private String autoOffsetReset;
 public String getGroupId() {
 return groupId;
 }
 public void setGroupId(String groupId) {
 this.groupId = groupId;
 }
 public String getBootstrapServers() {
 return bootstrapServers;
 }
 public void setBootstrapServers(String bootstrapServers) {
 this.bootstrapServers = bootstrapServers;
 }
 public String getEnableAutoCommit() {
 return enableAutoCommit;
 }
 public void setEnableAutoCommit(String enableAutoCommit) {
 this.enableAutoCommit = enableAutoCommit;
 }
 public String getAutoOffsetReset() {
 return autoOffsetReset;
 }
 public void setAutoOffsetReset(String autoOffsetReset) {
 this.autoOffsetReset = autoOffsetReset;
 }
}

后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者

3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。

也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录

springboot+kafka

相关推荐