Kafka v2.3.1 操作命令详细介绍

Kafka支持Linux和Windows环境,Linux 用于生产环境,Windows平台可用于学习和开发环境。本文运行环境使用Windows进行演示,文章介绍内容中也包含有Linux 命令。Windows命令和Linux 命令基本一致,差别是命令行后缀.bat(Windows平台)和.sh(Linux平台)。

另外,Linux平台下,Kafka 命令在kafka_2.12-2.3.1\bin 目录;Windows平台下,Kafka 命令在\kafka_2.12-2.3.1\bin\windows 目录下。

一、Kafka 常用的几个命令

  • kafka-server-start.sh
  • kafka-topics.sh
  • kafka-console-producer.sh
  • kafka-console-consumer.sh
  • kafka-server-stop.sh

在这几个命令中,第一个用于启动Kafka server,最后一个用于停止Kafka server;

kafka-console-producer.sh和kafka-console-consumer.sh 分别是kafka 在console控制台的生产者和消费者命令;

kafka-topic.sh 命令用于管理主题。

二、kafka-server-start.sh

用法:bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

Kafka v2.3.1 操作命令详细介绍

这个命令后面可以有多个参数,第一个是可选参数,该参数可以让当前命令以后台服务方式执行,第二个必须是 Kafka 的配置文件。后面还可以有多个--override开头的参数,其中的property可以是Broker Configs中提供的所有参数。这些额外的参数会覆盖配置文件中的设置。

命令行演示:

bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1

windows平台:

bin\windows\kafka-server-start.bat .\config\server.properties

成功启动之后,可以使用 jps 命令获取 kafka 进程PID 值。

Kafka v2.3.1 操作命令详细介绍

三、kafka-topics.sh

kafka-topics.sh 用来管理主题topic,如创建topic、查看topic、修改topic和删除topic等等。

查看该命令行支持的参数:

bin\windows\kafka-topics.bat --help

Kafka v2.3.1 操作命令详细介绍

如下2个参数需要了解一下:--bootstrap-server 配置连接的kafka server。

Kafka v2.3.1 操作命令详细介绍

--Zookeeper 标识该参数过时了,配置连接的Zookeeper server。

Kafka v2.3.1 操作命令详细介绍


Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。一定要注意两者参数值所指向的集群地址是不同的。

(1)创建Topic

bin/kafka-topics.sh --create --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3 --replication-factor 2

创建一个名为 topicName 的 Topic,其中指定分区个数为3,副本个数为2。

创建topic过程的问题,replication-factor个数不能超过 broker 的个数,否则有如下错误信息:

Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

Kafka v2.3.1 操作命令详细介绍

(2)查看 Topic 列表

bin/kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092

查询出来的结果仅有 Topic 的名称信息。

Kafka v2.3.1 操作命令详细介绍

(3)查看指定 Topic 明细

bin/kafka-topics.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

Kafka v2.3.1 操作命令详细介绍

输出结果说明:

  • PartitionCount:partition 个数。
  • ReplicationFactor:副本个数。
  • Partition:partition 编号,从 0 开始递增。
  • Leader:当前 partition 起作用的 breaker.id。
  • Replicas:当前副本数据所在的 breaker.id,是一个列表,包括Leader。
  • ISR(In-Sync Replicas):指的是同步副本队列,当前 kakfa 集群中可用的 breaker.id 列表。

(4)删除 Topic

bin/kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName

若 delete.topic.enable=true,直接彻底删除该 Topic。

若 delete.topic.enable=false,如果当前 Topic 没有使用过即没有传输过信息:可以彻底删除。如果当前 Topic 有使用过即有过传输过信息:并没有真正删除 Topic,只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。

注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(较新的版本中无显式配置,默认为 true)。

(5)修改 Topic

增加分区数

bin/kafka-topics.sh --alter --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName --partitions 3

修改分区数时,仅能增加分区个数。若是用其减少 partition 个数,则会报如下错误信息:

org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test_topic currently has 3 partitions, 2 would not be an increase.

Kafka v2.3.1 操作命令详细介绍

四、kafka-console-producer.sh

这个命令用来发送消息,该命令支持的参数使用 --help 输出。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test


运行producer并在控制台中输入一些消息,这些消息将被发送到服务端。

进入Kafka安装目录,在cmd命令窗口,输入如下命令:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic helloTopic

其中 --broker-list 和 --topic 是两个必须提供的参数。

--broker-list参数指定了所使用的broker。

--topic参数指定了发送Topic的名称。

Kafka v2.3.1 操作命令详细介绍


从文件读取:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt


五、kafka-console-consumer.sh

这个命令用来消费消息,并输出到标准控制台。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

其中:--bootstrap-server 必须指定,通常--topic也要指定查看的主题。如果想要从头消费消息,还可以指定--from-beginning参数。

Kafka v2.3.1 操作命令详细介绍

还可以通过下面的命令指定分区:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0

六、kafka-server-stop.sh

kafka-server-stop.sh 关闭 kafka进程,比较简单。

或者通过jps 命令获取 Kafka 进程的PID值,然后使用 kill 命令关闭 kafka 进程。

Kafka v2.3.1 操作命令详细介绍

上面是kafka集群的一些基本操作命令,使用kafka系统提供的脚本来管理topic、发送消息和消费消息等等。