2018年第28周-Kafka环境搭建和其Java例子

安装kafka

下载

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

配置文件

我部署在服务器

s1.jevoncode.com
s2.jevoncode.com
s3.jevoncode.com

都在目录/mydata1/kafka_2.11-1.1.0
修改config/server.properties文件的broker.id和zk,broker.id要集群上唯一

broker.id=1
zookeeper.connect=s1.jevoncode.com:2181,s2.jevoncode.com:2181,s3.jevoncode.com:2181
#如只能让内网访问,则需配置这个
listeners=PLAINTEXT://192.168.56.4:9092

启动kafka

先在每个服务器启动zookeeper

zkServer.sh start

再在每个服务器启动以下命令

nohup ./bin/kafka-server-start.sh config/server.properties &

Kafka的java例子

项目配置

项目使用spring-boot和和spring-kafka,pom文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jc.demo</groupId>
    <artifactId>jc-demo-kafka</artifactId>
    <version>0.0.1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>oneapp-archetype-test</name>
    <url>http://www.jevoncode.com</url>

    <properties>
        <!-- Every File in Project Enconding -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <!-- Compiling Time Enconding -->
        <maven.compiler.encoding>UTF-8</maven.compiler.encoding>

        <!-- Compiling Time JDK Version -->
        <java.version>1.7</java.version>

        <!-- Test -->
        <junit.version>4.12</junit.version>


        <!-- Logging -->
        <slf4j.version>1.7.21</slf4j.version>
        <logback.version>1.1.7</logback.version>


    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- http://kafka.apache.org/documentation.html#producerapi-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- Log依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- logback -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>${logback.version}</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>${logback.version}</version>
        </dependency>

    </dependencies>

</project>

配置文件

application.properties

#kafka地址
jc.kaHost=s1.jevoncode.com:9092,s2.jevoncode.com:9092,s3.jevoncode.com:9092

Topic配置

package com.jc.demo.springboot.config;

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaAdmin;

import java.util.HashMap;
import java.util.Map;

/**
 * 创建kafka的topic
 * 如果kafka不存在此topic则会自动创建,存在则不改变kafka的topic
 */
@Configuration
@EnableKafka
public class TopicConfig {


    public static final String TOPIC_JC_KAFKA_DEMO = "jc-demo-kafka";

    @Value("${jc.kaHost}")
    String kaHost;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic foo() {
        //第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数
        //当broker个数为1个时会创建topic失败,
        //提示:replication factor: 2 larger than available brokers: 1
        //只有在集群中才能使用kafka的备份功能
        //以kafka的分区机制来说,我将其numParitions个数设置为broker个数,也就是3
        return new NewTopic(TOPIC_JC_KAFKA_DEMO, 3, (short) 2);
    }

//
//    @Bean
//    public NewTopic topic1(){
//        return new NewTopic("jc-demo-kafka2", 10, (short) 2);
//    }
}

启动类

package com.jc.demo.springboot;


import com.jc.demo.springboot.config.ApplicationConfig;
import com.jc.demo.springboot.service.DoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;

@SpringBootApplication
@Import({ApplicationConfig.class})
@PropertySource("classpath:application.properties")
public class DemoApplication implements CommandLineRunner{

    @Autowired
    private DoService doService;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        doService.HelloWorld();
    }
}

生产者

配置类KafkaProducerConfig

package com.jc.demo.springboot.config;

import com.jc.demo.springboot.service.MyListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${jc.kaHost}")
    String kaHost;


    /* --------------producer configuration-----------------**/
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }


    /* --------------kafka template configuration-----------------**/
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }

}

服务接口和类(生产发送消息)

DoService接口

package com.jc.demo.springboot.service;

public interface DoService {

    void HelloWorld();
}

DoService实现类,调用kafkaTemplate完成发送消息到kafka

package com.jc.demo.springboot.service;

import com.jc.demo.springboot.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.math.BigInteger;

@Service
public class DoServiceImpl implements  DoService {
    @Autowired
    KafkaTemplate kafkaTemplate;

    @Override
    public void HelloWorld() {
        String phone = "18689206965";
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            kafkaTemplate.send(TopicConfig.TOPIC_JC_KAFKA_DEMO, partition(phone), phone, "jevoncode" + System.currentTimeMillis());
        }
    }


    /**
     * 根据手机号计算分区
     * @return
     */
    private int partition(String phone) {
        int hash = phone.hashCode();
        int partition = new BigInteger(Integer.toString(hash)).mod(new BigInteger("3")).intValue();     //由于总共有3个分区,所以得去3的模
        System.out.println(partition);
        return partition;
    }

}

此时执行DemoApplication的main方法,就可以生产一个字符串"jevoncode"到kafka的0分区上。
可以使用命令查看:

[jevoncode@s1 kafka_2.11-1.1.0]$ ./bin/kafka-console-consumer.sh --bootstrap-server s1.jevoncode.com:9092 --topic jc-demo-kafka --from-beginning
jevoncode1531654603522
jevoncode1531654689283
jevoncode1531654690331
jevoncode1531654691332
jevoncode1531654692332
....

消费者

此时application.properties配置文件需增加消费者配置

#kafka地址
jc.kaHost=s1.jevoncode.com:9092,s2.jevoncode.com:9092,s3.jevoncode.com:9092

#############以下是消费者端的配置###########################
#kafka消费者 groupId配置
jc.consumer.group.id=jc-consumer-group-1
#kafka消费者 分区配置,这样就可以指定每个消费者所消费的分区,提高吞吐量
jc.consumer.partitions=0,1,2
#一次从kafka拉的最大消息数
jc.max.poll.records=100

消费者配置类

package com.jc.demo.springboot.config;

import com.jc.demo.springboot.listener.KafkaListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${jc.kaHost}")
    String kaHost;
    /**
     * 批量拉去消息的个数
     */
    @Value("${jc.max.poll.records}")
    String maxPool;



    /* --------------consumer configuration-----------------**/
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kaHost);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "0");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPool);
        return props;
    }


    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean//消息监听器
    public KafkaListener kafkaListener() {
        return new KafkaListener();
    }

}

kafka消费监听类(接收和处理消息)

package com.jc.demo.springboot.listener;

import com.jc.demo.springboot.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.List;

public class KafkaListener {
    private static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
    @org.springframework.kafka.annotation.KafkaListener(id = "${jc.consumer.group.id}",topicPartitions =//配置topic和分区
                    { @TopicPartition(topic = TopicConfig.TOPIC_JC_KAFKA_DEMO, partitions ="#{'${jc.consumer.partitions}'.split(',')}")})
    public void receive(@Payload List<String> messages,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                         @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack) {

        for (int i = 0; i < messages.size(); i++) {
            String msg = "message='" + messages.get(i) + "' with partition-offset='" + partitions.get(i) + "-" + offsets.get(i) + "'";

            logger.debug("receive messages {}",msg);
        }
        ack.acknowledge();
        logger.debug("all batch messages {} consumed",messages.size());
    }


}

此时运行kafka就可以DemoApplication就可以看到如下日志:

07-15 19:39:25 [jc-consumer-group-1-0-C-1] DEBUG com.jc.demo.springboot.listener.KafkaListener - receive messages message='jevoncode1531654765409' with partition-offset='0-17'
07-15 19:39:25 [jc-consumer-group-1-0-C-1] DEBUG com.jc.demo.springboot.listener.KafkaListener - all batch messages 1 consumed

相关推荐