Kafka v2.3.1 操作命令详细介绍
Kafka支持Linux和Windows环境,Linux 用于生产环境,Windows平台可用于学习和开发环境。本文运行环境使用Windows进行演示,文章介绍内容中也包含有Linux 命令。Windows命令和Linux 命令基本一致,差别是命令行后缀.bat(Windows平台)和.sh(Linux平台)。
另外,Linux平台下,Kafka 命令在kafka_2.12-2.3.1\bin 目录;Windows平台下,Kafka 命令在\kafka_2.12-2.3.1\bin\windows 目录下。
一、Kafka 常用的几个命令
- kafka-server-start.sh
- kafka-topics.sh
- kafka-console-producer.sh
- kafka-console-consumer.sh
- kafka-server-stop.sh
在这几个命令中,第一个用于启动Kafka server,最后一个用于停止Kafka server;
kafka-console-producer.sh和kafka-console-consumer.sh 分别是kafka 在console控制台的生产者和消费者命令;
kafka-topic.sh 命令用于管理主题。
二、kafka-server-start.sh
用法:bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
这个命令后面可以有多个参数,第一个是可选参数,该参数可以让当前命令以后台服务方式执行,第二个必须是 Kafka 的配置文件。后面还可以有多个--override开头的参数,其中的property可以是Broker Configs中提供的所有参数。这些额外的参数会覆盖配置文件中的设置。
命令行演示:
bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1
windows平台:
bin\windows\kafka-server-start.bat .\config\server.properties
成功启动之后,可以使用 jps 命令获取 kafka 进程PID 值。
三、kafka-topics.sh
kafka-topics.sh 用来管理主题topic,如创建topic、查看topic、修改topic和删除topic等等。
查看该命令行支持的参数:
bin\windows\kafka-topics.bat --help
如下2个参数需要了解一下:--bootstrap-server 配置连接的kafka server。
--Zookeeper 标识该参数过时了,配置连接的Zookeeper server。
Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。一定要注意两者参数值所指向的集群地址是不同的。
(1)创建Topic
bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3 --replication-factor 2
创建一个名为 topicName 的 Topic,其中指定分区个数为3,副本个数为2。
创建topic过程的问题,replication-factor个数不能超过 broker 的个数,否则有如下错误信息:
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(2)查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
查询出来的结果仅有 Topic 的名称信息。
(3)查看指定 Topic 明细
bin/kafka-topics.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
输出结果说明:
- PartitionCount:partition 个数。
- ReplicationFactor:副本个数。
- Partition:partition 编号,从 0 开始递增。
- Leader:当前 partition 起作用的 breaker.id。
- Replicas:当前副本数据所在的 breaker.id,是一个列表,包括Leader。
- ISR(In-Sync Replicas):指的是同步副本队列,当前 kakfa 集群中可用的 breaker.id 列表。
(4)删除 Topic
bin/kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
若 delete.topic.enable=true,直接彻底删除该 Topic。
若 delete.topic.enable=false,如果当前 Topic 没有使用过即没有传输过信息:可以彻底删除。如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic,只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。
注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(较新的版本中无显式配置,默认为 true)。
(5)修改 Topic
增加分区数
bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3
修改分区数时,仅能增加分区个数。若是用其减少 partition 个数,则会报如下错误信息:
org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test_topic currently has 3 partitions, 2 would not be an increase.
四、kafka-console-producer.sh
这个命令用来发送消息,该命令支持的参数使用 --help 输出。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行producer并在控制台中输入一些消息,这些消息将被发送到服务端。
进入Kafka安装目录,在cmd命令窗口,输入如下命令:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic helloTopic
其中 --broker-list 和 --topic 是两个必须提供的参数。
--broker-list参数指定了所使用的broker。
--topic参数指定了发送Topic的名称。
从文件读取:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt
五、kafka-console-consumer.sh
这个命令用来消费消息,并输出到标准控制台。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
其中:--bootstrap-server 必须指定,通常--topic也要指定查看的主题。如果想要从头消费消息,还可以指定--from-beginning参数。
还可以通过下面的命令指定分区:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0
六、kafka-server-stop.sh
kafka-server-stop.sh 关闭 kafka进程,比较简单。
或者通过jps 命令获取 Kafka 进程的PID值,然后使用 kill 命令关闭 kafka 进程。
上面是kafka集群的一些基本操作命令,使用kafka系统提供的脚本来管理topic、发送消息和消费消息等等。