kafka 相关命令 偏移重置

相关命令:

主题相关命令:

  • kafka 启动:

    kafka-server-start.sh  ./config/server.properties 1>/dev/null 2>&1 &
  • kafka 停止:

    kill pid
  • 创建topic replication 取决于可用分区数

    kafka-topics.sh  --create --zookeeper 10.202.13.196:2181 --replication-factor 1 --partitions 1 --topic cjw-test
  • 删除topic

    kafka-topics.sh  --delete --zookeeper 10.202.13.196:2181 --topic cjw-test
  • 获取创建分区相关参数

    kafka-topics.sh  --help
  • 列出分区信息

    kafka-topics.sh --list --zookeeper 10.202.13.196:2181
  • 列出节点详细信息

    kafka-topics.sh --describe cjw-test --zookeeper 10.202.13.196:2181
    ##或者
    kafka-topics.sh --describe  --bootstrap-server 10.202.13.28:9092 --topic  cjw-test
  • 写数据

    kafka-console-producer.sh  --broker-list 10.202.13.27:9092 --topic cjw-test
  • 推送测试数据

    kafka-producer-perf-test.sh  --topic  cjw-test --num-records  50000 --throughput -1 --record-size 100 --producer-props bootstrap.servers=10.202.13.27:9092 acks=-1
  • 消费数据

    kafka-console-consumer.sh  --bootstrap-server 10.202.13.27:9092 --topic cjw-test --from-beginning
    ##设置消费组
    kafka-console-consumer.sh  --bootstrap-server 10.202.13.27:9092 --topic cjw-test --from-beginning  --consumer-property  group.id=cjw
  • 查看消费者的 消费进度

    ## group需要 启动时设置相关的 consumer-properties group.id
    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group cjw --describe
  • 重置消费进度

    ## 详情通过help 命令查看
    ## all topic  重置到最早消费进度
    kafka-consumer-groups.sh  --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --all-topics --to-earliest --execute
    ##指定主题
    kafka-consumer-groups.sh  --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --topic cjw-test --to-earliest --execute
    ##指定偏移量
    kafka-consumer-groups.sh  --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --topic cjw-test --to-offset 2 --execute
    ##指定主题、分区、消费偏移
    kafka-consumer-groups.sh  --bootstrap-server 10.202.13.27:9092 --group cjw --reset-offsets --topic cjw-test:1 --to-offset 2 --execute

zookeeper相关

  • 查看kafka分区:

    ls /brokers/topics/cjw-test/partitions
  • 查看消费组信息

    ls /consumers

kafka导入 导出数据

  • 创建连接器

    ## 第一个配置为基本配置 kafka 代理、数据的序列化格式
    ## 第二个配置源文件  和相关topic名称 
    ## 第三个配置文件 为目标文件相关sink
    connect-standalone.sh /usr/local/kafka_2.12-2.3.0/config/connect-standalone.properties  /usr/local/kafka_2.12-2.3.0/config/connect-file-source.properties /usr/local/kafka_2.12-2.3.0/config/connect-file-sink.properties  1>/dev/null 2>&1 &
  • 消费相关数据

    ##  connect-test  可以设置为自己的topic
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

相关推荐