Kafka基础(一)
1. 概述
Kafka是一个分布式、支持分区的、多副本的,基于zookeeper协调的分布式消息系统。
2. Kafka的特性
(1)高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题topic可以分为多个分区partition,消费者组consumer group对分区进行消费操作。
(2)可扩展性:Kafka集群支持热扩展
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
(5)高并发:支持数千个客户端同时读写。
3. Kafka中的术语解释
3.1 概述
上图中一个topic配置了3个partition。Partition1有两个offset:0和1,Partition2有4个offset,Partition3有1个offset。副本的id和副本所在的机器id恰好相同。
如果一个topic的副本数为3,那么kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
3.2 broker
kafka集群包含一个或多个服务器,服务器节点称为broker。
broker存储topic的数据:
(1)如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
(2)如果某topic有N个partition,集群有N+M个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
(3)如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,应避免这种情况的发生,这种情况容易导致kafka集群数据不均衡。
3.3 topic
每条发布到kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)类似于数据库的表名。
3.4 partition
topic中的数据分割为一个或多个的partition。,每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有过个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
3.5 producer
生产者即数据的发布者,该角色将消息发布到kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
3.6 Consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
3.7 Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name),若不指定group name则属于默认的group。
3.8 Leader
每个partition有多个副本,其中有且有一个作为Leader,Leader是当前负责数据读写的partition。
3.9 Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader实效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,Leader会把这个Follower从ISR列表中删除,重新创建一个Follower。
4. Kafka的设计思想
4.1 Kafka Broker Leader的选举
Kafka Broker(节点)集群受zookeeper管理。
(1)Kafka Controller在zookeeper注册Watch:所有的Kafka Broker节点会一起去zookeeper上注册一个临时节点,但是只有一个Kafka Broker会注册成功,注册成功的Kafka Broker会成为Kafka Broker Controller,其他的Broker成为Kafka Broker Follower。这个Controller会监听其他的Kafka Broker的所有信息,如果Kafka Broker Controller宕机了,在zookeeper上的临时节点就会消失,此时所有的Kafka Broker又会一起去zookeeper上注册临时节点,产生新的Kafka Broker Controller。
例如:一旦有一个Kafka Broker宕机了,这个Kafka Broker Controller会读取该宕机Kafka Broker上所有的partition在zookeeper上的状态,并选取ISR列表中的一个副本replication作为partition leader。如果ISR列表中的副本全挂,选一个幸存的副本作为leader;如果该partition的所有副本都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个副本活过来,选取它为Leader;或者选择第一个活过来的副本(不一定是ISR中的)作为leader。这个Broker宕机的事情,Kafka Controller也会通知zookeeper,zookeeper就会通知其他的Kafka Broker。
5. Kafka的架构
5.1 Topics和Partition
(1)Topic原理
Topic在逻辑上可以被认为是一个queue,每条消费都必须指定他的Topic,可以简单理解为必须指明吧这条消息放进哪个queue里。
为了使得kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时可会导致更高的不可用性。
kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高。
(2)旧消息的处理策略
Kafka集群会保留所有的消息,无论被消费与否。当然,由于磁盘限制,不可能永久保留所有数据。因此kafka提供两种策略删除旧数据:
① 基于时间
例如:可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据
② 基于Partition文件大小。
例如:在Partition文件超过1GB时删除旧数据
# The minimum age of a log file to be eligible for deletion log.retention.hours=168 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according to the retention policies log.retention.check.interval.ms=300000 # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false
Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。可根据具体的需求选择对应的删除策略。
Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然, Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个 Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
5.2 Producer消息路由
Producer发送消息到broker时,会根据Partition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为一个Topic的性能瓶颈。而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐量。
可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Partition。Partition机制可以通过指定Producer的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。
5.3 消费者组Consumer group
各个consumer线程可以组成一个组,我们叫做消费者组。
(1)同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
(2)Kafka为了保证吞吐量,只允许同一个消费者组下的一个consumer线程去访问一个分区。如果想要提高效率,可以增加partition的数量来横向扩展,同时在增加新的consumer线程去消费。
(3)当启动一个消费者组去消费一个主题topic时,无论主题里面有多少个分区,无论我们消费者组里面配置了多少个consumer线程,这个消费者组一定会把这个topic下的partition都消费了。当消费者组里面的consumer线程数量小于这个topic下的分区数量的话,就会出现一个consumer线程消费多个partition的情况。所以,最优的设计是:消费者组下的consumer线程数量等于partition数量,这样效率是最高的。因此我们在设定消费者组的时候,只需要指明里面有几个consumer线程数量即可,无需指定对应的消费分区partition的序号,consumer会自动进行rebalance。
如某个Group下有20个Consumer实例,它订阅了一个具有100个分区的Topic.正常情况下,Kafka平均会为每个Consumer分配5个分区。这个分配的过程就叫做Rebalance。
Rabalance的触发条件有三个:
① 组成员数发生变更。
② 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscribe就表明Group订阅所有以字母t开头、字母c结尾的主题。在Consumer Group的运行过程中,你创建了一个满足这样条件的主题,该Group就会发生Rebalance.
③ 订阅主题的分区数发生变更,Kafka当前只能允许增加一个主题的分区数,当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance
(4)这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。
6. Kafka的存储策略
Kafka通过topic分主题存放数据,而topic中又有很多的分区,分区还可以有多个副本。在分区内部,还存在一个个的segment(在分区对应的文件夹下产生的文件)。一个分区会被划分成大小相等的若干segment,既保证了每个分区中不会产生体积过大的文件,又可以基于这些segment文件进行历史数据的删除,从而提高效率。 一个segment由一个**.log文件(存放数据)和一个.index**(索引文件,保存对应的**.log**文件的索引信息)文件组成,这两个文件的命名规则为:partition第一个segment从0开始,后续每个segment文件为上一个segment文件最后一条消息的offset序号值,数值大小为64位,20位数字字符长度,没有数字用0填充。
所以真正开始读取指定分区中某个offset对应的数据时 ,先根据offset和当前分区 的所有segment的名称做比较 ,确定出数据在哪个segment中 ,查找该segment的索引文件 ,确定当前offset在数据文件中的开始位置 ,从该位置开始读取数据文件, 再根据数据格式判断结果 ,最终获取到完整数据。
7. Kafka的可靠性保证
数据提交的条件:有2种方法:①只要leader写入成功,就认为写入成功。②要求所有的follower都必须同步完成,才认为写入成功。这2种方法都比较极端,前者容易丢失数据,后者存在单点故障问题。
7.1 Kafka采用了AR,ISR ,OSR列表机制
在Kafka的分区中,维护了一个AR列表,包括当前分区的所有副本的编号。AR=ISR+OSR
① ISR同步列表,写入数据时,要求所有的ISR中的副本都同步数据完成后,才能被提交,才可以被消费者访问。
② OSR是非同步列表,OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower只是尽力的去同步leader,存在数据版本可能落后的情况。
最开始所有的副本都在ISR中,在Kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR 存入OSR ,如果后续速度恢复可以回到ISR中。这种方案是一种介于leader独裁和所有follower民主方式之间的方案,同时兼顾了可用性和性能,相对于zookeeper的过半同意、过半存活机制,提供了更好的可用性。
7.2 Kafka中的可靠性保障 - LEO 、HW
(1)介绍
① LEO - LogEndOffset:分区的最新的数据的offset,只要有数据写入分区,LEO就指向最新的数据,无论这个数据是否已经在ISR中同步完成。
② HW - HighWatermark:消费者能够看到的最大的offset,这个offset及小于这个offset的数据时可以被消费者访问的,而大于这个offset的数据,要么不存在,要么没有同步完成,外界无法访问。HW永远小于等于LEO.
(2)Kafka分区同步数据的流程
最开始,分区中HW等于LEO,外界可以看到所有的数据,之后生产者写入数据,先写入leader中,此时LEO指向最后一条数据,而此时ISR中其他follower还未完成数据同步,HW扔然指向最初位置,外界也只能访问HW位置及其之前的数据,之后follower同步数据,任何一条数据同步完,HW指向新的offset,外界就可以访问到该数据,直到最后,所有数据都同步完成,HW重新等于LEO,外界可以访问到所有数据。
7.3 Kafka分区同步数据时截断机制
当leader宕机,重新选举leader后,之前未同步完成的数据该如何处理呢?
如果leader宕机 选出了新的leader,所有副本都会先将数据截断到leader宕机之前的HW位,保证所有副本不会持有未同步完成的数据,这个机制就称之为截断机制。另外,即使此时旧的leader恢复,成为一个follower,也要先截断数据到宕机之前的HW位,再和新的leader同步数据。保证了数据的一致性。利用截断机制,保证了不会有未同步完成的数据干扰从新选举leader后的数据一致性。
7.4 Kafka消费数据时的可靠性保证
对于消费数据的过程,所谓的可靠性,指的是被访问到的数据一直能够被访问到,绝对不会存在之前还能访问的数据,突然之间就无法访问了。
利用LEO-HW机制,保证了在读取数据时,能够读取到的仅仅是HW之前也即ISR列表的副本完成了同步的数据,这样即使leader宕机,从ISR列表中选取一个follower成为新的leader,也会持有HW之前的数据,保证了读取数据的可靠性不会因为重新选举leader而造成影响。而在leader宕机的过程中,新leader选举后,会先进行截断操作,保证数据是从之前已经同步过的位置继续增加,未同步的数据全部都截断丢弃掉,保证了切换leader时,对于外界消费者没有影响。唯一存风险的情况就是,当ISR列表中只有leader,其他副本都在OSR中,此时如果leader宕机,ISR列表中不存在任何存活的副本,如果坚持要ISR列表中的副本才可以成为新的leader,则除非原来的leader恢复,否则无法选出新的leader,无法进行写入操作。或者可以允许OSR列表中的follower成为新的leader,但此时存在读取数据不一致的风险。
7.5 Kafka消息传输的一致性
通常在分布式系统中可靠性保障可以体现在三个层面:
① 可能丢数据,但不会多数据,数据至多一次 — 0
② 不会丢数据,但可能会多数据,数据至少一次 — 1(默认)
③ 不丢也不多,数据恰好一次 — -1
而Kafka中对于生产者来说,可以通过request.required.acks参数来控制发送数据过程中要求的可靠性级别。Kafka原生的机制最多可以实现到不丢数据但可能多数据,对于恰好一次的语义,Kafka建议根据业务的幂等性 或 业务本身特点 由开发人员自己来实现去重。
7.6 Kafka中leader的选举
当leader宕机时,Kafka会选择ISR中的一个follower成为新的leader,由于ISR中的所有follower都同步着leader中的数据,再基于截断机制,保证未同步的数据在新leader选举时被抛弃,所以保证了新leader产生后不会有数据一致的问题。但在极端情况下,如果ISR列表中只有一个leader存在,其他副本都进入了OSR,此时leader宕机,ISR列表中为空,则该如何选举新的leader?此时存在两种策略:
① 策略1:不允许OSR中的follower成为新的leader,则必须等待原来的leader恢复,才可以继续工作,如果原来的leader一直无法回复,则集群一直无法使用写功能。这种方式,追求一致性,但放弃了可用性。如果将unclean.leader.election.enable=false则Kafka集群将猜中这种策略。
② 策略2:允许OSR中的follower成为新的leader,则可以快速的恢复集群工作,但可能会造成数据丢失。这种方式,追求了可用性,但放弃了极端情况下的一致性。如果将unclean.leader.election.enable=true则Kafka集群将采用这种策略.
分析:策略1,一致性有保证,但是可用性低,只有最后挂了leader活过来Kafka才能恢复
策略2,可用性高,一致性没有保证,任何一个副本活过来就可以继续工作,但是有可能存在数据不一致的情况。
本质上是 一致性 和 可用性之间的矛盾,需要根据业务需求进行取舍.