springboot+kafka
1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕
2,我用的依赖是
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

配置文件
kafka: bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092 producer: retries: 1 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092 consumer: bootstrap-servers: 10.32.16.11:9092,10.32.16.96:9092,10.32.32.30:9092,10.32.16.70:9092 enable-auto-commit: true auto-offset-reset: latest auto-commit-interval: 1000 group-id: gzj
然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
消费者,监听
@KafkaListener(topics = {"gzj"})
public void receive(String content){
System.err.println("Receive:" + content);
}消费者还有另一种方法,
package com.gzj.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Description
* <p>
* </p>
* DATE 2018/10/23.
*
* @author guozhenjiang.
*/
@Component
public class KafkaConsumerTask implements Runnable,InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);
private Thread thread;
@Resource(name="kafkaConsumer")
private KafkaConsumer<String,String> kafkaConsumer;
@Override
public void run() {
logger.info("消费数据任务启动");
while(true){
try{
ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);
if(records!=null){
for(ConsumerRecord<String ,String > record:records){
logger.error(record.key());
logger.error(record.topic());
logger.error(record.value());
}
}
}catch(Exception e){
// logger.error("我也不知道哪儿错了");
}finally {
// logger.error("不放弃");
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
this.thread=new Thread(this);
this.thread.start();
}
}package com.gzj.demo.config;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Properties;
/**
* Description
* <p>
* </p>
* DATE 2018/10/23.
*
* @author guozhenjiang.
*/
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaConnectConfig {
@Bean(name = "kafkaConsumer")
public KafkaConsumer<String, String> kafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("group.id", "ggg");
props.setProperty("enable.auto.commit", enableAutoCommit);
props.setProperty("auto.offset.reset", autoOffsetReset);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("gzj"));
return consumer;
}
@Value("${server.port}")
private String port;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public String getBootstrapServers() {
return bootstrapServers;
}
public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public String getEnableAutoCommit() {
return enableAutoCommit;
}
public void setEnableAutoCommit(String enableAutoCommit) {
this.enableAutoCommit = enableAutoCommit;
}
public String getAutoOffsetReset() {
return autoOffsetReset;
}
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
}后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者
3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。
也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录

相关推荐
Kafka 2020-09-18
sweetgirl0 2020-07-27
wangying 2020-11-13
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
guicaizhou 2020-09-15
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01
weikaixxxxxx 2020-08-01
PoppyEvan 2020-08-01
guicaizhou 2020-08-01