KAFKA
Kafka核心组件
Topic:消息根据Topic进行归类,可以理解为一个队里。
Producer:消息生产者,就是向kafka broker发消息的客户端。
Consumer:消息消费者,向kafka broker取消息的客户端。
broker:每个kafka实例(server),一台kafka服务器就是一个broker,一个集群由多个broker组成,一个broker可以容纳多个topic。
Zookeeper:依赖集群保存meta信息。
测试实例:
- 查看topic 列表(--zookeeper 指定任意一个zk节点即可,也可以全部列出,用于获取集群信息)
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --describe - 创建topic(--replication-factor表示复制到多少个节点,--partitions表示分区数,一般设置为2或与节点数相等,不能大于总节点数)
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --create --topic topic1 --replication-factor 2 --partitions 2 - 发送消息(--topic 指定topic ,--6667端口按个人配置的listeners监听端口)
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-producer.sh --broker-list 172.69.1.221:6667 --topic topic1
>message1
>message2 - 消费消息
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --topic topic1
或者
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-console-consumer.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --topic topic1 --from-beginning - replica检查
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-replica-verification.sh --broker-list master:6667 - 查询已创建的topic
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --zookeeper slave227:2181,slave223:2181,slave228:2181 --list
- Kafka 删除topic(如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion)
/usr/hdp/2.6.5.0-292/kafka/bin/kafka-topics.sh --delete --zookeeper 【zookeeper server】 --topic 【topic name】 - zookeeper client 删除topic(通过zookeeper-client 删除掉broker下的topic)
/usr/hdp/2.6.5.0-292/zookeeper/bin/zkCli.sh -server slave227:2181,slave223:2181,slave228:2181
找到topic所在的目录:ls /brokers/topics
找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
另外被标记为marked for deletion的topic你可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失 - 最后删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
- 清楚topic数据
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:
清理超过指定时间清理:
log.retention.hours=16
超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824
为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。
主要看server.properties配置
log.cleanup.interval.mins=10
log.dirs=/kafka-logs
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.bytes=-1
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
总结:
彻底删除topic:
1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
2、如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉broker下的topic即可。
相关推荐
ljcsdn 2020-07-27
woaishanguosha 2020-07-18
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
MojitoBlogs 2020-06-09
猫咪的一生 2020-06-03
guicaizhou 2020-05-05
猫咪的一生 2020-05-03
jiangkai00 2020-04-15
方新德 2020-04-08
jiangkai00 2020-03-20
那年夏天0 2020-02-22
sweetgirl0 2020-02-21
sweetgirl0 2020-02-09
MrZhangAdd 2020-01-13
jiangkai00 2020-01-12
GoatSucker 2020-01-11
yangyutong00 2019-12-30