kafka 2.12在linux下的安装部署及java客户端对接
一、下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0
二、配置kafka
2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs
2.2 创建zookeeper数据目录:/tmp/zookeeper
2.3 配置/home/kafka_2.12-2.4.0/config/server.properties 内容如下(SSL证书在下面介绍):
ssl.keystore.location=/home/ca/server/server.keystore.jks ssl.keystore.password=mima123 ssl.key.password=mima123 ssl.truststore.location=/home/ca/trust/server.truststore.jks ssl.truststore.password=mima123 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.endpoint.identification.algorithm= #security.inter.broker.protocol=SSL inter.broker.listener.name=SSL ############################# Server Basics ############################# broker.id=0 listeners=SSL://阿里云内网IP:9093 advertised.listeners=SSL://阿里云外网IP:9093 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/home/kafka_2.12-2.4.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=阿里云内网IP:2181 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0 delete.topic.enble=true
2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
2.5 配置/etc/hosts文件,增加红色行,IP为阿里云内网IP
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 172.18.54.18 iZwz9gq8vhwxtgpg21yonsZ iZwz9gq8vhwxtgpg21yonsZ 172.18.54.18 kafka-single
三、生成SSL相关证书文件
3.1、创建四个文件夹 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client
3.2、签发相关证书
第一步:生成server.keystore.jks文件(即:生成服务端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single
第二步:生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"
第三步:通过CA证书创建一个客户端信任证书
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第四步:通过CA证书创建一个服务端器端信任证书
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:服务器证书的签名处理
第1小步:导出服务器端证书server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA给服务器端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:将CA证书导入到服务器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:将已签名的服务器证书导入到服务器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123
客户端SSL证书签发
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:导出客户端证书client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA给客户端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:将CA证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:将已签名的证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123
四、启动和停止kafka和zookeeper服务
cd /home/kafka_2.12-2.4.0/bin
启动zookeeper:
./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &
启动kafka:
./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &
查看topic情况:
./kafka-topics.sh --list --zookeeper localhost:2181
关闭kafka:
./kafka-server-stop.sh
关闭zookeeper:
./zookeeper-server-stop.sh
查看 kafka 的 topic 情况:
./kafka-topics.sh --list --zookeeper 172.18.54.18:2181
查看topic详细信息:
./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1
生产者客户端命令:
./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1
消费者客户端命令:
./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning
五、JAVA客户端对接
5.1 Producer
package com.xrh.extend.kafka; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; public class Producer { public static String topic = "topic2";//定义主题 public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");//kafka地址,多个地址用逗号分割 // acks:消息的确认机制,默认值是0。 // acks=0:如果设置为0,生产者不会等待kafka的响应。 // acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 // acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。 props.put("acks", "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks"); props.put("ssl.truststore.password", "mima123"); props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks"); props.put("ssl.keystore.password", "mima123"); props.setProperty("ssl.endpoint.identification.algorithm", ""); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); try { int i = 1; while (i < 20) { String msg = "测试 Hello," + new Random().nextInt(100); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg); kafkaProducer.send(record, new MyProducerCallBack()); System.out.println("消息发送成功:" + msg); ++ i; Thread.sleep(500); } } finally { kafkaProducer.close(); } } private static class MyProducerCallBack implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(null != e){ e.printStackTrace(); return; } System.out.println("时间戳,主题,分区,位移: " + recordMetadata.timestamp() + ", " + recordMetadata.topic() + "," + recordMetadata.partition() + " " + recordMetadata.offset()); } }; // acks = 1 // batch.size = 16384 //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。 // bootstrap.servers = [39.108.124.173:9092] // buffer.memory = 33554432 // client.dns.lookup = default // client.id = // compression.type = none // connections.max.idle.ms = 540000 // delivery.timeout.ms = 120000 // enable.idempotence = false // interceptor.classes = [] // key.serializer = class org.apache.kafka.common.serialization.StringSerializer // linger.ms = 0 // max.block.ms = 60000 // max.in.flight.requests.per.connection = 5 // max.request.size = 1048576 // metadata.max.age.ms = 300000 // metric.reporters = [] // metrics.num.samples = 2 // metrics.recording.level = INFO // metrics.sample.window.ms = 30000 // partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner // receive.buffer.bytes = 32768 // reconnect.backoff.max.ms = 1000 // reconnect.backoff.ms = 50 // request.timeout.ms = 30000 // retries = 2147483647 //配置为大于0的值的话,客户端会在消息发送失败时重新发送。 // retry.backoff.ms = 100 // sasl.client.callback.handler.class = null // sasl.jaas.config = null // sasl.kerberos.kinit.cmd = /usr/bin/kinit // sasl.kerberos.min.time.before.relogin = 60000 // sasl.kerberos.service.name = null // sasl.kerberos.ticket.renew.jitter = 0.05 // sasl.kerberos.ticket.renew.window.factor = 0.8 // sasl.login.callback.handler.class = null // sasl.login.class = null // sasl.login.refresh.buffer.seconds = 300 // sasl.login.refresh.min.period.seconds = 60 // sasl.login.refresh.window.factor = 0.8 // sasl.login.refresh.window.jitter = 0.05 // sasl.mechanism = GSSAPI // security.protocol = PLAINTEXT // security.providers = null // send.buffer.bytes = 131072 // ssl.cipher.suites = null // ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] // ssl.endpoint.identification.algorithm = https // ssl.key.password = null // ssl.keymanager.algorithm = SunX509 // ssl.keystore.location = null // ssl.keystore.password = null // ssl.keystore.type = JKS // ssl.protocol = TLS // ssl.provider = null // ssl.secure.random.implementation = null // ssl.trustmanager.algorithm = PKIX // ssl.truststore.location = null // ssl.truststore.password = null // ssl.truststore.type = JKS // transaction.timeout.ms = 60000 // transactional.id = null // value.serializer = class org.apache.kafka.common.serialization.StringSerializer }
5.2 Consumer
package com.xrh.extend.kafka; import java.util.Collections; import java.util.Iterator; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import javafx.util.Duration; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks"); props.put("ssl.truststore.password", "mima123"); props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks"); props.put("ssl.keystore.password", "mima123"); props.setProperty("ssl.endpoint.identification.algorithm", ""); // p.put("auto.offset.reset", "latest"); // bootstrap.servers: kafka的地址。 // group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。 // enable.auto.commit:是否自动提交,默认为true。 // auto.commit.interval.ms: 从poll(拉)的回话处理时长。 // session.timeout.ms:超时时间。 // max.poll.records:一次最大拉取的条数。 // auto.offset.reset:消费规则,默认earliest 。 // earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。 // latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。 // none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。 // key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 // value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息 while(true){ ConsumerRecords<String, String> consumerDatas = consumer.poll(100); if( consumerDatas.count() > 0 ){ Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator(); while(consumerIter.hasNext()){ ConsumerRecord<String, String> consumerData = consumerIter.next(); System.out.printf("offset = %d, key = %s, value = %s%n", consumerData.offset(), consumerData.key(), consumerData.value()); } }else{ System.out.println("KafkaConsumer1 is waiting message..."); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }