Kafka学习笔记
Kafka 学习笔记
Kafka使用一个叫Franz Kafka的文学家的名字用来命名的。
Kafka是一款开源的消息引擎系统。也是一个分布式流处理平台。
Kafka同时支持点对点模型以及发布/订阅模型。
为什么要使用Kakfa?四个字:削峰填谷!
Kafka 术语
- Record:消息,指Kafka处理对象
- Topic:主题,用来承载消息的容器
- Partition:分区,一个有序不变的消息队列,一个主题下可以有多个分区
- Offset:消息位移,表示分区中每条信息的位置,是一个单调递增不变的值
Replica,副本,数据冗余。
- 领导者副本:对外提供服务,与客户端进行交互
- 追随者副本:不能与外界进行交互,只是被动地追随领导者副本
- Producer:生产者,向主题发布新消息的应用程序
- Consumer:消费者,向主题订阅新消息的应用程序
- Consumer Offset:消费者位移,表示消费者消费进度
- Consumer Group:消费者组,多个消费者实例共同组成的一个组,同时消费多个分区来实现高吞吐。
- Rebalance:重平衡,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。它是Kafka消费者端实现高可用的重要手段。
Kafka 种类
- Apache Kafka: 也称社区版Kafka,迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅仅提供基础核心组件,缺失一些高级特性
- Confluent Kafka: 优势在于集成了很多高级特性且由Kafka原班人马打造,质量保证;缺陷在于国内相关资料不全,普及率较低,没有太多可参考的范例。
- CDH/HDP Kafka: 优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度慢
Kafka 版本号
一个题外话
Kafka新版本客户端代码开始完全由java语言编写,于是有些人开始“JAVA VS SCALA”的大讨论。并从语言特性上分析为什么社区摈弃Scala转而投向Java的怀抱。
其实事情没有那么复杂,仅仅是因为社区来了一批Java程序猿,而以前老的scala程序猿隐退了罢了。
版本演进
Kafka总共演进了7个大版本
- 0.7版本: 上古版本,一旦有人向你推荐这个版本,怼他。
- 0.8版本: 开始引入副本机制,另外老版本需要制定zookeeper地址而不是Broker地址。在0.8.2.0版本社区引入了新版本Producer API,即指定Broker地址的Producer。
- 0.9版本: 重量级的大版本更迭。增加了基础的安全认证/权限功能,引入了Kafka Connect,新版本Producer API稳定。
- 0.10.0.0: 里程碑的大版本。该版本又有两个小版本,0.10.1和0.10.2。引入Kafka streams,正式升级为分布式流处理平台。0.10.2.2 新Consumer API稳定。
- 0.11.0.0: 目前最主流的版本之一。引入两个重量级功能变更:一个是提供幂等性Producer API以及事务 API, 另一个是对Kafka消息格式做了重构。
- 1.0和2.0: 如果你是Kafka Stream用户,至少选择2.0.0版本吧。
最后还有个建议,不论你使用的是哪个版本,都请尽量保持服务端版本和客户端版本一致,否则你将损失很多Kafka为你提供的性能优化收益。
江湖经验:不要轻易成为新版本的小白鼠。
集群部署
磁盘容量举例:
假设公司有个业务需要每天向Kafka集群发送 1 亿条信息。每条消息保存两份来防止数据丢失。消息默认保存两周时间。并假设消息的平均大小是1KB。问你的Kafka集群需要为这个业务预留多少磁盘空间?
总大小:1亿 1KB 2备份 * 14 ~= 2800G
加上Kafka的一些索引数据,为它预留10%,那么总大小变为 2800 * (1 + 10%) ~= 3TB
Kafka支持数据压缩,压缩比0.75的话,那么应该预留的存储空间为2.25TB左右。
带宽举例
与其说是带宽资源的规划,其实真正要规划的是Kafka服务器的数量。
假设公司机房环境1Gbps,现有个业务,需要在1小时内处理1TB的业务数据。
一般单台服务器 规划使用70%的带宽资源的1/3 ~= 240Mbps。
1TB需要1小时处理,则每秒差不多需要处理2336Mbps的数据,除 240Mbps,则差不多需要10台机器。如果消息还需要额外复制的话,那么还要对应乘上备份数。
集群配置参数
配置名称 | 示例 | 建议值 |
---|---|---|
log.dirs | /home/kafka1,/home/kafka2 | kafka写日志多路径,不仅能提升写性能,在1.1版本中还能支持故障转移功能。 |
zookeeper.connect | zk1:2181,zk2:2181,zk3:2181/kafka1 | |
listens | listeners=PLAINTEXT://dn1.ambari:6667 | |
auto.create.topics.enable | true | false,不建议可以自动创建主题 |
unclean.leader.election.enable | false | false,如果设置为true有丢数据风险 |
auto.leader.rebalance.enable | false | false,不定期进行leader副本的选举 |
log.retention.hours | 168 | 默认保持7天数据 |
log.retention.bytes | -1 | 保存多少数据都可以 |
message.max.bytes | 1000000 | 默认值建议调大。该值代表Broker能处理的最大消息大小 |
生产者分区策略
轮询策略
随机策略
按消息保存键策略
自定义策略
生产者压缩
压缩配置
compression.type
压缩算法
总结一下压缩和解压缩,Producer端压缩,Broker端保持,Consumer端解压缩。
无消息丢失最佳实践
- 不要使用producer.send(msg),而要使用producer.send(msg,callback)
- 设置acks=all,表明所有副本Broker都要接受消息,该消息才算是“已提交”
- 设置retries>0,表明Producer自动重试,当网络顺断时,防止消息丢失。
- 设置unclean.leader.election.enable=false
- 设置replication.factor >=3,增加副本数,保证数据冗余
- 设置min.insync.replicas > 1,控制的是消息至少要被写入多少个副本才算是 已提交。
- 确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。推荐设置replication.factor = min.insync.replicas + 1
- 确保消息消费完再提交。设置enable.aoto.commit=false
Kafka 拦截器
分为生产者拦截器和消费者拦截器。
典型的应用场景可以应用于客户端监控、端到端系统性能测试、消息审计等多种功能在内的场景。
Kafka是如何管理TCP连接的
java生产者是如何管理TCP连接的
- KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有的Broker的TCP连接。
- KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接
- 如果Producer端发送信息到某台Broker时,发现没有与该Broker的TCP连接,那么也会创建连接
- 如果设置
connections.max.idle.ms > 0
,则步骤一中的TCP连接会被自动关闭;如果设置该参数-1,那么步骤一中创建的连接无法被关闭,会成为僵尸进程。
Java消费者是如何管理TCP连接的
创建的3个时机
- 发起FindCoordinator请求时
- 连接协调者时
- 消费数据时
消费者程序会创建3类TCP连接
- 确定协调者和获取集群元数据
- 连接协调者,令其执行组成员管理操作
- 执行实际的消息获取
幂等生产者和事务生产者
消息交付可靠性保障,常见的承诺有以下三种
- 最多一次:消息可能会丢失,但绝不会重复发送
- 至少一次:消息不会丢失,但有可能被重复发送
- 精确一次:消息不会丢失,也不会被重复发送
Kafka默认是最少一次
要保证精确一次,就需要幂等和事务。不过性能会想对较差。
幂等生产者
幂等性有很多好处。其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们不会破坏我们的系统状态。
在0.11.0.0版本引入了幂等生产者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
。
使用幂等生产者要注意
- 它只能保证单分区的幂等,多分区无法实现
- 只能实现单会话上的幂等,重启之后幂等消失
事务生产者
设置事务型Producer
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
- 设置producer端参数
transctional.id
。最好为其设置一个有意义的名字
此外代码也要做一些调整变化。
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); }
重平衡
怎么避免Rebalance
Rebalance发生的时机有三个
- 组成员数据量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
后面两个通常是运维的主动操作,无法避免。主要还是针对组成员数量减少的情况。增加一般也是人为主动的。
那么避免因为参数或逻辑不合理而导致的成员退出,与之相关的主要参数
- session.timeout.ms,推荐设置6s
- heartbeat.interval.ms,推荐设置2s
- max.poll.interval.ms,推荐设置比你的业务逻辑处理要长
- GC参数,避免频繁的FULL GC
重平衡通知
重平衡过程是通过 消费者端的心跳线程来通知到其他消费者实例的。
0.10.1.0版本之前,发送心跳请求是在消费者主线程完成的,也就是kafkaConsumer.poll方法的那个线程。这样做有诸多弊端,因为消息处理也是在这个线程中完成的。因此当业务逻辑处理消耗了较长时间,心跳请求就无法及时发送到协调者那边了。导致协调者 错误地认为该消费者已经死了。
0.10.1.0版本开始,社区引入了一个单独的线程来专门执行心跳发送。
消费者组状态机
定义了5种状态
各个状态的流转
一个消费者组最开始是Empty状态,当重平衡过程开启后,它会被置为PreparingRebalance状态等待成员加入,之后变更到CompletingRebalance状态等待分配方案,最后流转到Stable状态完成重平衡。
当有新成员或已有成员退出时,消费者组的状态从Stable直接跳到PreparingRebalance状态,此时,所有现存成员就必须重新申请加入组。
当所有成员都退出组后,消费者组状态变更为Empty。
Kafka自动定期删除过期位移的条件就是,组要处于Empty状态。
重平衡流程
消费者端重平衡流程
JoinGroup请求
SyncGroup请求
Broker端重平衡场景分析
- 新成员入组
- 组成员主动离组
- 组成员崩溃离组
- 重平衡时协调者对组内成员提交位移的处理
位移提交
CommitFailedException怎么处理?
- 缩短消息处理的时间,该方法优先处理
- 增加Consumer端允许下游系统消费一批数据的最大时长。设置参数
max.poll.interval.ms
,新版本默认是5分钟。 - 减少下游系统一次性消费的消息总数。
max.poll.records
- 下游系统使用多线程来加速消费
多消费者实例
鉴于KafkaConsumer不是线程安全的事实,制定两套多线程方案。
- 每个线程维护专属的KafkaConsumer实例,负责完整的消息获取、消息处理流程
核心代码 ``` public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // 执行消息处理逻辑 } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } ```
- 消费者程序使用单或多线程获取消息,创建多个消费者线程执行消息处理逻辑
核心代码 ``` private final KafkaConsumer<String, String> consumer; private ExecutorService executors; ... private int workerNum = ...; executors = new ThreadPoolExecutor( workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); ... while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { executors.submit(new Worker(record)); } } ```
两种方案各有特点。
监控消费进度的3种方法
- 使用Kafka自带命令行工具kafka-consumer-groups脚本
- 使用Kafka Consumer API
- 使用Kafka自带的JMX监控指标
Kafka副本详解
副本机制的好处:
- 提供数据冗余
- 提供高伸缩性
- 改善数据局部性
但Kafka只有第一种好处,原因是这样的设计,Kafka有两点好处
- 方便实现 Read-your-writes
指当你用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息
- 方便实现单调读(Monotonic Reads)
在多次消费信息时,不会看到该消息一会存在一会不存在的情况。
判断Follower副本与Leader副本是否同步的标准,Broker参数replia.lag.time.max.ms
的参数值。Kafka有一个in-sync Replicas(ISR)集合的概念。
Kafka控制器
控制器组件(Controller),是Kafka的核心组件,它的主要作用是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。
控制器是怎么被选出来的
每台Broker都能充当控制器,在Broker启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举规则,第一个成功创建/controller节点的Broker会被指定为控制器。
控制器能做什么?
- 主题管理
- 分区重分配
- Prefered领导者选举
- 集群成员管理
- 数据服务,控制器上保存最全的集群元数据信息
控制器保存了什么数据?
这些数据其实也在Zookeeper中存储了一份。
控制器的故障转移
总结
小窍门分享:当你觉得控制器出现问题时,比如主题无法删除了,重分区hang住了,你可以不用重启broker或者控制器,快速简便的方法,直接去Zookeeper手动删除/controller节点。
这样做的好处是,既可以引发控制器的重选举,又可以避免重启Broker导致的消息中断。
Kafka请求处理
请求方案
Kafka方案类似于Reactor模式
那么Kafka类似的方案是这样的。网络线程池默认参数num.network.threads=3
好了,客户端发来的请求会被Aceptor线程分发到任意一个网络线程中,由他们进行处理。你可能会认为,网络线程池是顺序处理不就好了?实际上,Kafka在这个环节上又做了一层异步线程池的处理。
IO线程池执行真正的处理。如果是PRODUCER生产请求,则将消息写入到底层的磁盘日志中;如果是FETCH请求,则从磁盘或页缓存中读取消息。当IO请求处理完请求后,会将生成的响应放入网络线程池的响应队列中,并由对应的网络线程负责将Response反还给客户端。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。
IO线程池默认参数num.io.threads=8
图中还有一个Purgatory的组件,这是Kafka中著名的“炼狱”组件。
它是用来缓存延时请求的,所谓延时请求,就是那些一时未满足条件的不可立刻处理的请求。
Kafka高水位和Leader Epoch
高水位
高水位作用:
- 定义消息可见性,即用来标识分区下哪些消息是可以被消费者消费的
- 帮助kafka完成副本同步
高水位更新机制
副本更新机制
Leader Epoch
这块感觉有点晦涩难懂!需要有时间再研究!
该机制是用来防止数据丢失和不一致的问题。因为仅仅依靠高水位更新,是会有时间错配的问题的。
没有使用Leader Epoch前,丢失数据的一个场景。
假设生产者成功向Kafka发送了两条信息,Leader和副本B都写成功了。
Leader的高水位更新完成,但副本B高水位还未更新。
副本B所在的Broker宕机了,当它重启后,此时LEO值会变更为高水位值1。也就是说位移值为1的那条信息被副本B从磁盘中删除了。此时副本B的底层磁盘文件中只保存有一条信息,即位移值为0的那条消息。
执行完日志截断操作之后,副本B开始向Leader拉取消息,执行正常的消息同步。那么就在这个节骨眼,Leader又挂掉了。那么Kafka就别无选择,让副本B成为了Leader。那么当副本A回来之后,就会将高水位调整为与B相同的值,也是1。
这样操作之后,位移值为1的那条消息就彻底从这两个副本中被永远地抹去了。
场景和之前类似,但是引用了Leader Epoch机制后,副本B重启回来后,会向A发送一个特殊的请求去获取Leader的LEO值。
发现Leader LEO的值不比它自己的小,且缓存中也没有起始位移>2的Epoch条目,因此B不需要执行任何的日志截断操作。这就是对高水位机制的一个明显改进,副本是否执行日志截断不再依赖高水位进行判断。
A宕机了,B成为了Leader,同样地发送一个特殊的请求,发现也不需要执行日志截断操作。当后面生产者往B写入消息后,B生成了新的Epoch条目。
主题管理
kafka 2.2版本示例
创建主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
查询主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
查询某主题的详细信息
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
修改主题分区
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分区数 >
这里要注意的是,分区数必须大于原有分区数,否则会报错InvalidPartitionsException.
修改主题级别参数
假设我们要设置主题的级别参数为max.message.bytes
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
修改主题限速
先设置Broker端参数
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
注意--entity-name指的是Broker ID.如果该主题副本分别上0,1,2,3上,那么你还要为Broker 1,2,3执行这条命令。
设置好这个参数之后,还需要为该主题设置要限速的副本。示例中希望对所有的副本进行限速,使用通配符*。
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
删除主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
通常这个步骤,你需要耐心等待一段时间,Kafka只是标记成已删除状态,需要一段时间在后台默默删除。
当你碰到主题无法删除的情况,你可以采用这样的方法:
- 手动删除Zookeeper节点/admin/delete_topics下待删除的主题名的znode
- 手动删除该主题的磁盘上的分区目录
- 在Zookeeper中执行rmr /controller,触发Controller重选举。刷新Controller缓存。
事实上第三步要小心,其实执行前面两步就够了。
查看消费组提交的位移数据
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
查看消费组的状态信息
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
__consumer_offsets占用太多磁盘空间
要显示的使用jstack命令查看下kafka-log-cleaner-thread前缀的线程的状态。通常情况下,是由于该线程挂掉导致的。你只能重启相应的Broker了。
动态参数
1.1.0版本之前,都是通过配置server.properties。
当一些配置要更改的时候,我们需要修改这个配置文件,然后重启Broker,以此来达到修改配置生效。
但是生产环境的机子怎么能说重启就重启呢。针对这个痛点,社区1.1.0版本正式引入了动态参数。
动态参数种类
kafka broker config官网增加了DYNAMIC UPDATE MODE列。而该列有三类值。
- read-only: 被标记为read-only的参数跟之前一样,只有重启Broker,才能令修改生效。
- pre-broker: 被标记为pre-broker的参数属于动态参数,只会在对应的Broker上生效。
- cluster-wide: 属于动态参数,会在整个集群范围内生效。当然你也可以为具体的Broker修改cluster-wide值。
使用场景
- 动态调整Broker端各种线程池大小,实时应对突发量(最为常用)
- 动态调整Broker端连接信息或安全信息
- 动态更新SSL KeyStore有效期
- 动态调整Broker端Compact操作性能
- 实时变更JMX指标收集器
如何实现动态参数的?
其中config/brokers是真正保存动态Broker参数的地方。
- <default> : cluster-wide范围的动态参数
- broker.id : pre-broker范围的动态参数。该类参数子节点可能存在多个
参数优先级:pre-broker > cluster-wide > read-only > 默认值
如何配置?
设置cluster-wide值
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
删除cluster-wide值
`# 删除 cluster-wide 范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable`
设置pre-broker值
bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
删除pre-broker值
`# 删除 per-broker 范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable`
常用的动态参数
- log.retention.ms
- num.io.threads和num.network.threads
- SSL相关参数
- num.relica.fetchers
如何重设消费者位移
重设位移策略
重设位移方法
- java consumer api,主要就是seek方法
- 通过kafka-consumer-groups脚本实现
Kafka运维利器:KafkaAdminClient
引入原因
- 命令脚本不好集成到应用程序中
- 很多命令脚本都是直接连接zookeeper的,这会绕过kafka的安全设置
- 社区希望在client端就能运维管理集群,而不需要内部的server端代码
基于这些原因,社区于0.11版本正式推出了Java客户端版本的AdminClient。
它的功能
- 主题管理:包括主题的创建、删除和查询
- 权限管理:包括具体权限的配置与删除
- 配置参数管理:包括kafka各种资源的参数设置、详情查询。Kafka资源主要有Broker、主题、用户、Client-id等
- 副本日志管理:包括副本底层日志路径的变更和详情查询
- 分区管理:即创建额外的主题分区
- 消息删除:即删除指定位移之前的分区信息
- Delegation Token管理
- 消费者组管理:包括消费者组的查询、位移查询和删除
- Prefered领导者选举:推选指定主题分区的Prefered Broker为领导者