认识kafka
kafka是一个高吞吐量,低延迟分布式的消息队列系统.kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
一. kafka模型
kafka提供了一个生产者,缓冲区,消费者的模型
1.1 模型分析
broker:kafka集群有多个broker服务器组成,用于存储数据(消息)
topic:不同的数据(消息)被分为不同的topic
producer:消息生产者,往broker中某个topic里面生产数据
consumer:消息的消费者,从broker中某个topic中获取数据
kafka将所有消息组织成多个topic的形式存储,而每个topic又可以拆分成多个partition,每个partition又由一个一个消息组成.每个消息都被标识了一个递增序列代表其进来的先后顺序,并按照顺序存储在partition中
producer选择一个topic,生产消息,消息会通过分配策略将消息追加到该topic下的某个partition分区的末尾.
consumer选择一个topic,通过ID指定从哪个位置开始消费消息.
消费完成之后保留ID,下次可以从这个位置尅是继续消费,也可以从其他任意位置开始消费.该ID,在kafka中称之为offset(偏移量),它能够唯一的标识该分区中的每个记录.kafka集群保留所有producer生产的消息记录,不管该消息是否被消费过,都会保留在kafka里.也就是说kafka里的所有消息都可以多次消费
kafka中有默认删除消息的时间,一般是7天,时间到了,kafka就会从磁盘层面将数据删除,当然我们也可以通过配置,将数据 保存时间进行调整
每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置.位移位置是由消费者控制,即就是消费者可以通过修改偏移量读取任何位置的数据
每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题,为并发消费提供了线程安全的保证
每个topic中的消息被组织成多个partition,partition均匀分配到集群server中.生产,消费消息的时候,会被路由指定partition,减少单台服务器的压力,增加了程序的并行能力
每个topic保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server.当机器负载的时候,通过扩容可以将消息重新均匀分配
消息消费完成之后不会删除,可以通过重置offset重新消费.
灵活的持久化策略.可以通过指定时间段来保存消息,节省broker存储空间
消息以partition分区为单位分配到多个server,并以partition为单位进行备份.策略为1个leader多个followers,leader接受读写请求,followers被动复制leader.leader和 followers会在集群中打散,保证partition高可用
1.2 消费者组
每个consumer将自己标记consumer group名称,之后系统会将consumer group按名称分组,将消息复制并分发给所有分组,每个分组只有一个consumer能消费这条消息。
二, kafka集群搭建
Zookeeper集群共三台服务器,分别为:node01、node02、node03。
Kafka集群共三台服务器,分别为:node01、node02、node03。
1.1 安装kafka
上传压缩包
解压压缩包
tar -zxvf kafka _0.8.2.1.tgz
修改配置文件 vim config/server.properties
broker.id=0
zookeeper.connect=node01:2181,node02:2181,node03:2181
参数说明
broker.id: broker集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
拷贝到其他虚拟机上
scp -r kafka _0.8.2.1.tgz node02:‘pwd‘
scp -r kafka _0.8.2.1.tgz node03:‘pwd‘
启动
启动zookeeper
启动kafka
进入kafka目录执行
bin/kafka-server-start.sh config/server.properties
测试
创建topic
in/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic test
参数说明
--replication-factor:指定每个分区的复制因子个数,默认1个
--partitions:指定当前创建的kafka分区数量,默认为1个
--topic:指定新建topic的名称**)
查看topic列表
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
查看“test”topic描述
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic test01
创建生产者
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test01
创建消费者
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic test01
三, kafka数据的一致性和ISR机制
3.1、简介
Kafka中topic里的每个Partition分区有一个leader与多个follower,producer往某个Partition中写入数据是,只会往leader中写入数据,然后数据才会被复制进其他的Replica中。
那么数据的同步是由leader push(推送)过去还是有flower pull(拉取)过来?Kafka是由follower到leader上拉取数据的方式进行同步的。
所以Kafka上的副本机制是,写都往leader上写,读也只在leader上读,flower只是数据的一个备份,保证leader被挂掉后顶上来,并不往外提供服务。
3.2、关于消息同步
关于复制,在分布式架构中分为两种:
同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,性能不高
异步复制: 只要leader拿到数据立即commit,等follower慢慢去复制,性能高,立即返回,一致性差一些。
kafka不是完全同步,也不是完全异步,是一种ISR机制:
leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
两个相关参数:
replica.lag.time.max.ms=10000
# 如果leader发现follower超过10秒没有向它发起fech请求,那么leader就把它从ISR中移除。
replica.lag.max.messages=4000
# follower与leader相差4000条数据,就将副本从ISR中移除
注意:当follower同时满足这两个条件后,leader又会将它加入ISR中,所以ISR是处于一个动态调整的情况
ISR里的replicas有什么用?
当partion的leader挂掉,则会优先从ISR列表里的挑选一个follower选举成新的leader,同时将旧leader移除出ISR列表。
四, kafka数据丢失和重复消费问题
1. 数据丢失
1.1 producer端导致数据丢失
producer丢失原因:
producer在生产消息传递给kafka时,才开始时存储在服务器上的pagecache上,然后定期flush到磁盘上,如果断电会导致数据丢失.
kafka备份机制,假如leader刚接收完producer传来的消息,partition的replicas副本还未来得及同步,就会造成数据丢失
producer解决方案:
针对第一个原因,我们可以提高flush的频率来减少数据丢失量.不建议这样做.官方建议通过备份机制来解决数据丢失问题
相关参数
log.flush.interval.messages
当缓存中有多少条数据时,触发溢写
log.flush.interval.ms
每隔多久时间,触发溢写
针对于备份机制而导致的数据丢失,要想数据不丢失,就要将ack设置为all ,即所有的备份分区也同步到这条数据了,才发第二条数据,但是这样就降低了我们的性能。所以在实际工作中,往往得结合业务来平衡数据的一致性和系统的性能。
1.2 consumer端数据丢失
数据丢失原因:在使用kafka高级API时,消费者会自动每隔一段时间将offset保存到zookeeper上,如果刚好将偏移量提交到zookeeper上后,但这条数据还没有消费完,发生宕机,此时数据丢失
解决方案:关闭偏移量自动提交,改成手动提交,每次数据处理完后,再进行提交
2. 重复消费
产生原因:在消费者自动提交offset到zookeeper后,程序又消费了几条数据,但没到下次提交offset,这个时候,如果机器宕机了,重启之后,消费者会按照之前zookeeper保存的offset进行消费,这就会导致数据重复消费
解决方案:关闭自动提交,改成手动提交
五. kafka高吞吐的本质
写数据时
kafka是基于操作系统的页缓存来实现文件写入的.
kafka会将数据线写入到pagecache(也称之为os cache)中即就是仅仅写入内存中,至于什么时候把pagecache中的数据刷出到磁盘上,完全由操作系统自己决定
kafka写数据的时候,是以磁盘顺序写的方式来刷出数据的,即就是仅仅将数据追加到文件的末尾.
磁盘顺序写的性能会比随机写快上几百倍
基于顺序写和pagecache两点,kafka就实现了写入数据的超高性能
消费数据
消费实际上就是从kafka的磁盘文件里读取某条数据然后发送给下游的消费者
从磁盘里读取文件,并且返回给哭护短消费者,经历一下四个阶段
OS从硬盘把数据读到内核区的PageCache。
用户进程把数据从内核区Copy到用户区的内存里。
然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。
OS再把数据从Socket Buffer中Copy到网卡的Buffer上,最后发送给客户端消费者。
流程图如下
零拷贝技术
直接让操作系统的cache中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存。
流程图如下
通过零拷贝技术,就不需要把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,两次拷贝都省略了,所以叫做零拷贝。
对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
在从磁盘读数据的时候,会先看看os cache内存中是否有,如果有的话,其实读数据都是直接读内存的。
kafka集群经过良好的调优,大量的数据都是直接写入pagecache中的,读也是从pagecache中读取的.换句话说就是kafka相当于完全基于内存提供的数据进行操作了,故整体性能会极其的高
六, kafka消息持久化
Kafka topic的数据存储在磁盘的时候,默认存储在/tmp/kafka-logs目录下,这个目录可以自己设置。同时在该目录下,又会按topic的每个partition分区来存储,一个分区一个目录,一个partition目录下面又会有多个segment文件.
如上图可以看到,test7-0目录下(”test7” topic的0号分区)有.index文件和.log文件。
index文件为索引文件,命名规则为从0开始到,后续的由上一个文件的最大的offset偏移量来开头
log文件为数据文件,存放具体消息数据
kafka从磁盘上查找数据时,会先根据offset偏移量,对index文件名字进行扫描,通过用二分法的查找方式,可以快速定位到此offset所在的索引文件,然后通过索引文件里的索引,去对应的log文件种查找数据。
比如:我要查找offset=30的数据,从上图中可以知道有0,29,58开头的index文件,说明offset=30的索引数据落在000029.index文件中。
Broker全局参数:
message.max.bytes (默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。
log.segment.bytes (默认: 1GB) – segment数据文件的大小,当segment文件大于此值时,会创建新文件,要确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。
log.roll.hours (默认:7天) - 当segment文件7天时间里都没达到log.segment.bytes 大小,也会产生一个新文件
replica.fetch.max.bytes (默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
Consumer端参数:
fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
由于segment默认1GB才能产生新文件,老师这里创建一个新topic,将segment的大小改成1KB,为了演示效果,企业里面这个参数可以不去动。
注意:log.segment.bytes 这是一个全局参数,即所有的topic都是这个配置值,老师这里只是要改变一个topic的参数值,所以用segment.bytes参数,这个参数是topic级别的参数。