2018年第28周-Kafka环境搭建和其Java例子
安装kafka
下载
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
配置文件
我部署在服务器
s1.jevoncode.com s2.jevoncode.com s3.jevoncode.com
都在目录/mydata1/kafka_2.11-1.1.0
修改config/server.properties文件的broker.id和zk,broker.id要集群上唯一
broker.id=1 zookeeper.connect=s1.jevoncode.com:2181,s2.jevoncode.com:2181,s3.jevoncode.com:2181 #如只能让内网访问,则需配置这个 listeners=PLAINTEXT://192.168.56.4:9092
启动kafka
先在每个服务器启动zookeeper
zkServer.sh start
再在每个服务器启动以下命令
nohup ./bin/kafka-server-start.sh config/server.properties &
Kafka的java例子
项目配置
项目使用spring-boot和和spring-kafka,pom文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jc.demo</groupId> <artifactId>jc-demo-kafka</artifactId> <version>0.0.1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>oneapp-archetype-test</name> <url>http://www.jevoncode.com</url> <properties> <!-- Every File in Project Enconding --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- Compiling Time Enconding --> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <!-- Compiling Time JDK Version --> <java.version>1.7</java.version> <!-- Test --> <junit.version>4.12</junit.version> <!-- Logging --> <slf4j.version>1.7.21</slf4j.version> <logback.version>1.1.7</logback.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- http://kafka.apache.org/documentation.html#producerapi--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- Log依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <!-- logback --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> </dependencies> </project>
配置文件
application.properties
#kafka地址 jc.kaHost=s1.jevoncode.com:9092,s2.jevoncode.com:9092,s3.jevoncode.com:9092
Topic配置
package com.jc.demo.springboot.config; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.KafkaAdmin; import java.util.HashMap; import java.util.Map; /** * 创建kafka的topic * 如果kafka不存在此topic则会自动创建,存在则不改变kafka的topic */ @Configuration @EnableKafka public class TopicConfig { public static final String TOPIC_JC_KAFKA_DEMO = "jc-demo-kafka"; @Value("${jc.kaHost}") String kaHost; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost); return new KafkaAdmin(configs); } @Bean public NewTopic foo() { //第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数 //当broker个数为1个时会创建topic失败, //提示:replication factor: 2 larger than available brokers: 1 //只有在集群中才能使用kafka的备份功能 //以kafka的分区机制来说,我将其numParitions个数设置为broker个数,也就是3 return new NewTopic(TOPIC_JC_KAFKA_DEMO, 3, (short) 2); } // // @Bean // public NewTopic topic1(){ // return new NewTopic("jc-demo-kafka2", 10, (short) 2); // } }
启动类
package com.jc.demo.springboot; import com.jc.demo.springboot.config.ApplicationConfig; import com.jc.demo.springboot.service.DoService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.PropertySource; @SpringBootApplication @Import({ApplicationConfig.class}) @PropertySource("classpath:application.properties") public class DemoApplication implements CommandLineRunner{ @Autowired private DoService doService; public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } @Override public void run(String... args) throws Exception { doService.HelloWorld(); } }
生产者
配置类KafkaProducerConfig
package com.jc.demo.springboot.config; import com.jc.demo.springboot.service.MyListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${jc.kaHost}") String kaHost; /* --------------producer configuration-----------------**/ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /* --------------kafka template configuration-----------------**/ @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
服务接口和类(生产发送消息)
DoService接口
package com.jc.demo.springboot.service; public interface DoService { void HelloWorld(); }
DoService实现类,调用kafkaTemplate完成发送消息到kafka
package com.jc.demo.springboot.service; import com.jc.demo.springboot.config.TopicConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.math.BigInteger; @Service public class DoServiceImpl implements DoService { @Autowired KafkaTemplate kafkaTemplate; @Override public void HelloWorld() { String phone = "18689206965"; while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } kafkaTemplate.send(TopicConfig.TOPIC_JC_KAFKA_DEMO, partition(phone), phone, "jevoncode" + System.currentTimeMillis()); } } /** * 根据手机号计算分区 * @return */ private int partition(String phone) { int hash = phone.hashCode(); int partition = new BigInteger(Integer.toString(hash)).mod(new BigInteger("3")).intValue(); //由于总共有3个分区,所以得去3的模 System.out.println(partition); return partition; } }
此时执行DemoApplication的main方法,就可以生产一个字符串"jevoncode"到kafka的0分区上。
可以使用命令查看:
[jevoncode@s1 kafka_2.11-1.1.0]$ ./bin/kafka-console-consumer.sh --bootstrap-server s1.jevoncode.com:9092 --topic jc-demo-kafka --from-beginning jevoncode1531654603522 jevoncode1531654689283 jevoncode1531654690331 jevoncode1531654691332 jevoncode1531654692332 ....
消费者
此时application.properties配置文件需增加消费者配置
#kafka地址 jc.kaHost=s1.jevoncode.com:9092,s2.jevoncode.com:9092,s3.jevoncode.com:9092 #############以下是消费者端的配置########################### #kafka消费者 groupId配置 jc.consumer.group.id=jc-consumer-group-1 #kafka消费者 分区配置,这样就可以指定每个消费者所消费的分区,提高吞吐量 jc.consumer.partitions=0,1,2 #一次从kafka拉的最大消息数 jc.max.poll.records=100
消费者配置类
package com.jc.demo.springboot.config; import com.jc.demo.springboot.listener.KafkaListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.BatchLoggingErrorHandler; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${jc.kaHost}") String kaHost; /** * 批量拉去消息的个数 */ @Value("${jc.max.poll.records}") String maxPool; /* --------------consumer configuration-----------------**/ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost); props.put(ConsumerConfig.GROUP_ID_CONFIG, "0"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPool); return props; } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler()); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean//消息监听器 public KafkaListener kafkaListener() { return new KafkaListener(); } }
kafka消费监听类(接收和处理消息)
package com.jc.demo.springboot.listener; import com.jc.demo.springboot.config.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import java.util.List; public class KafkaListener { private static Logger logger = LoggerFactory.getLogger(KafkaListener.class); @org.springframework.kafka.annotation.KafkaListener(id = "${jc.consumer.group.id}",topicPartitions =//配置topic和分区 { @TopicPartition(topic = TopicConfig.TOPIC_JC_KAFKA_DEMO, partitions ="#{'${jc.consumer.partitions}'.split(',')}")}) public void receive(@Payload List<String> messages, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack) { for (int i = 0; i < messages.size(); i++) { String msg = "message='" + messages.get(i) + "' with partition-offset='" + partitions.get(i) + "-" + offsets.get(i) + "'"; logger.debug("receive messages {}",msg); } ack.acknowledge(); logger.debug("all batch messages {} consumed",messages.size()); } }
此时运行kafka就可以DemoApplication就可以看到如下日志:
07-15 19:39:25 [jc-consumer-group-1-0-C-1] DEBUG com.jc.demo.springboot.listener.KafkaListener - receive messages message='jevoncode1531654765409' with partition-offset='0-17' 07-15 19:39:25 [jc-consumer-group-1-0-C-1] DEBUG com.jc.demo.springboot.listener.KafkaListener - all batch messages 1 consumed
相关推荐
Kafka 2020-09-18
guicaizhou 2020-09-15
jiangkai00 2020-07-18
CobingLiu 2020-06-16
yangyutong00 2020-06-12
sweetgirl0 2020-06-09
yanghuashuiyue 2020-11-14
liuxingen 2020-11-13
wangying 2020-11-13
王谦 2020-11-03
huangwei00 2020-10-14
shenzhenzsw 2020-10-09
guicaizhou 2020-09-30
jiaomrswang 2020-09-23
jyj0 2020-09-21
hannuotayouxi 2020-08-20
amwayy 2020-08-03
yangyutong00 2020-08-01