kafka 2.12在linux下的安装部署及java客户端对接

一、下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0

二、配置kafka

 2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs

2.2 创建zookeeper数据目录:/tmp/zookeeper

2.3 配置/home/kafka_2.12-2.4.0/config/server.properties   内容如下(SSL证书在下面介绍):

ssl.keystore.location=/home/ca/server/server.keystore.jks
ssl.keystore.password=mima123
ssl.key.password=mima123
ssl.truststore.location=/home/ca/trust/server.truststore.jks
ssl.truststore.password=mima123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
#security.inter.broker.protocol=SSL
inter.broker.listener.name=SSL

############################# Server Basics #############################
broker.id=0
listeners=SSL://阿里云内网IP:9093
advertised.listeners=SSL://阿里云外网IP:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################

log.dirs=/home/kafka_2.12-2.4.0/logs
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

#log.flush.interval.messages=10000
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=阿里云内网IP:2181
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
delete.topic.enble=true

 2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

2.5 配置/etc/hosts文件,增加红色行,IP为阿里云内网IP

127.0.0.1    localhost    localhost.localdomain    localhost4    localhost4.localdomain4
::1    localhost    localhost.localdomain    localhost6    localhost6.localdomain6
172.18.54.18    iZwz9gq8vhwxtgpg21yonsZ    iZwz9gq8vhwxtgpg21yonsZ
172.18.54.18    kafka-single

三、生成SSL相关证书文件

3.1、创建四个文件夹 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client

3.2、签发相关证书

第一步:生成server.keystore.jks文件(即:生成服务端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single

第二步:生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"

第三步:通过CA证书创建一个客户端信任证书
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123

第四步:通过CA证书创建一个服务端器端信任证书
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123

第五步:服务器证书的签名处理
第1小步:导出服务器端证书server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA给服务器端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:将CA证书导入到服务器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:将已签名的服务器证书导入到服务器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123

客户端SSL证书签发
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:导出客户端证书client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA给客户端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:将CA证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:将已签名的证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123

四、启动和停止kafka和zookeeper服务

cd /home/kafka_2.12-2.4.0/bin

启动zookeeper:

./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &

启动kafka:

./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &

查看topic情况:

./kafka-topics.sh --list --zookeeper localhost:2181

关闭kafka:

./kafka-server-stop.sh

关闭zookeeper:

./zookeeper-server-stop.sh

查看 kafka 的 topic 情况:

./kafka-topics.sh --list --zookeeper 172.18.54.18:2181

查看topic详细信息:

./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1

生产者客户端命令:

./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1

消费者客户端命令:

./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning

五、JAVA客户端对接

5.1 Producer

package com.xrh.extend.kafka;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {

    public static String topic = "topic2";//定义主题

    public static void main(String[] args) throws InterruptedException {
        
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");//kafka地址,多个地址用逗号分割
//        acks:消息的确认机制,默认值是0。
//        acks=0:如果设置为0,生产者不会等待kafka的响应。
//        acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
//        acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
        props.put("acks", "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
        props.put("ssl.truststore.password", "mima123");
        props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
        props.put("ssl.keystore.password", "mima123");
        props.setProperty("ssl.endpoint.identification.algorithm", "");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        try {
            int i = 1;
            while (i < 20) {
                String msg = "测试 Hello," + new Random().nextInt(100);
 
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg);
                kafkaProducer.send(record, new MyProducerCallBack());
                System.out.println("消息发送成功:" + msg);
                ++ i;
                Thread.sleep(500);
            }
        } finally {
            kafkaProducer.close();
        }

    }
    
    private static class MyProducerCallBack implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if(null != e){
             e.printStackTrace();
             return;
          }
          System.out.println("时间戳,主题,分区,位移: " + recordMetadata.timestamp() 
              + ", " + recordMetadata.topic() + "," + recordMetadata.partition() 
              + " " + recordMetadata.offset());
       }
    };
    
//    acks = 1
//    batch.size = 16384 //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
//    bootstrap.servers = [39.108.124.173:9092]
//    buffer.memory = 33554432
//    client.dns.lookup = default
//    client.id = 
//    compression.type = none
//    connections.max.idle.ms = 540000
//    delivery.timeout.ms = 120000
//    enable.idempotence = false
//    interceptor.classes = []
//    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
//    linger.ms = 0
//    max.block.ms = 60000
//    max.in.flight.requests.per.connection = 5
//    max.request.size = 1048576
//    metadata.max.age.ms = 300000
//    metric.reporters = []
//    metrics.num.samples = 2
//    metrics.recording.level = INFO
//    metrics.sample.window.ms = 30000
//    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
//    receive.buffer.bytes = 32768
//    reconnect.backoff.max.ms = 1000
//    reconnect.backoff.ms = 50
//    request.timeout.ms = 30000
//    retries = 2147483647   //配置为大于0的值的话,客户端会在消息发送失败时重新发送。
//    retry.backoff.ms = 100
//    sasl.client.callback.handler.class = null
//    sasl.jaas.config = null
//    sasl.kerberos.kinit.cmd = /usr/bin/kinit
//    sasl.kerberos.min.time.before.relogin = 60000
//    sasl.kerberos.service.name = null
//    sasl.kerberos.ticket.renew.jitter = 0.05
//    sasl.kerberos.ticket.renew.window.factor = 0.8
//    sasl.login.callback.handler.class = null
//    sasl.login.class = null
//    sasl.login.refresh.buffer.seconds = 300
//    sasl.login.refresh.min.period.seconds = 60
//    sasl.login.refresh.window.factor = 0.8
//    sasl.login.refresh.window.jitter = 0.05
//    sasl.mechanism = GSSAPI
//    security.protocol = PLAINTEXT
//    security.providers = null
//    send.buffer.bytes = 131072
//    ssl.cipher.suites = null
//    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
//    ssl.endpoint.identification.algorithm = https
//    ssl.key.password = null
//    ssl.keymanager.algorithm = SunX509
//    ssl.keystore.location = null
//    ssl.keystore.password = null
//    ssl.keystore.type = JKS
//    ssl.protocol = TLS
//    ssl.provider = null
//    ssl.secure.random.implementation = null
//    ssl.trustmanager.algorithm = PKIX
//    ssl.truststore.location = null
//    ssl.truststore.password = null
//    ssl.truststore.type = JKS
//    transaction.timeout.ms = 60000
//    transactional.id = null
//    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    
}

5.2 Consumer

package com.xrh.extend.kafka;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import javafx.util.Duration;

public class Consumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
        props.put("ssl.truststore.password", "mima123");
        props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
        props.put("ssl.keystore.password", "mima123");
        props.setProperty("ssl.endpoint.identification.algorithm", "");
//        p.put("auto.offset.reset", "latest");
//        bootstrap.servers: kafka的地址。
//        group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
//        enable.auto.commit:是否自动提交,默认为true。
//        auto.commit.interval.ms: 从poll(拉)的回话处理时长。
//        session.timeout.ms:超时时间。
//        max.poll.records:一次最大拉取的条数。
//        auto.offset.reset:消费规则,默认earliest 。
//        earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
//        latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
//        none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
//        key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
//        value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
        while(true){
            
            ConsumerRecords<String, String> consumerDatas = consumer.poll(100);
            if( consumerDatas.count() > 0 ){
                Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator();
                while(consumerIter.hasNext()){
                    ConsumerRecord<String, String>  consumerData = consumerIter.next();
                    System.out.printf("offset = %d, key = %s, value = %s%n", 
                            consumerData.offset(), consumerData.key(), consumerData.value());
                }
            }else{
                System.out.println("KafkaConsumer1 is waiting message...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

       }
    }
    
}

相关推荐