kafka/rocketmq

kafka

官方:发布订阅,流处理管道和存储
https://kafka.apache.org/docu...

组件

![clipboard.png](/img/bVbr1Aj)

broker:负责消息存储转发,包含topic(一个queue)=》partition(物理分布,一个topic包含一个或多个partition,可以分布在不同的broker上)
producer(与broker leader直连,负载均衡指定partition,可批次发,可设置要ack的副本数)
consumer/consumer group
其中partition的集群和单机物理结构如下:
![clipboard.png](/img/bVbr1yi)

![clipboard.png](/img/bVbr132)

index全部映射到内存,每个partition下自增id

元数据放在zk上.分partition,每个partition副本分散在broker上,单partition+单消费才能保证顺序(rocketmq一样)。每个partition一个索引,顺序写一个文件。流处理+批量处理(累计部分数据才发送),实时上有取舍。

https://kafka.apache.org/documentation/#design

高可用和可扩展

1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,成功的为Broker controller,失效后zk后发现重新注册节点,controller负责各broker内partition的选主(ISR中,记录replica进度,随便选)ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.如果ISR的大小超过某个最小值,则分区将仅接受写入,以防止丢失仅写入单个副本的消息(只关注ISR,而不是共识多个都写入,多数(两个故障需要5个副本,一个要三个)对于主数据的写代价大)【与ES类似都使用的Microsoft的PacificA】
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。
broker,partition,customer组内线程可扩展。

消费

只保证一个partition被一个customer消费有序
producter推,customer拉(拉需要存日志)
partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,若多个同时要配多个Consumer group。
kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息被consumer接收之后,负责维护消息的消费记录(JMS等都是broker维护),consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.也没有ACK
消息消费的可靠性,消费者控制,最多一次,先保存offset再处理;至少一次,先处理再保存offset;只一次:最少1次+消费者的输出中额外增加已处理消息最大编号

日志压缩

确保有每个分区数据日志中每个key有最后已知值,offset不能变。对同一partition的多个文件一起压缩合并。
position是文件的bytes偏移吧?压缩过程中要重建索引和位置?【个人理解是要重建的】
active不动(不影响写入),对cleaner point后面的做压缩,选择日志tail和header比例小的,合并压缩每组log不超过1G,index不超过10M。
![clipboard.png](/img/bVbr17J)
对于tail的压缩过程:【position不变???个人理解这是错误的,position是变得】
每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型如下图所示。第二次遍历检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理掉

![clipboard.png](/img/bVbr18b)

rocketmq

activemq 不能分片。kafka性能(上面知道基本上partition和consumer需要配置一样的,一个consumer group的线程数和partition数量一致,受partition限制,rocketmq多partition的扩展在于都用一个commitlog,而不是一个partition单独一份顺序log(太多就不是顺序的了),cq只存储位置,在commitlog中找数据。http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/)

组件

kafka/rocketmq

broker :主从,topic,queue,tag
nameserver:几乎无状态,可集群内部署,节点对等,不同步。数据是broker同步过来的
producer:连接ns,主从brokers(心跳),无状态
consumer/group :连接ns,主从brokers(心跳)

高可用和可扩展

kafka/rocketmq

  • 负载均衡
    Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Producer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
  • 主从
    机器级别,不依赖zk,元数据:在 Broker 启动的时候,其会将自己在本地存储的配置文件 (默认位于$HOME/store/config/topics.json 目录) 中的所有话题加载到内存中去,然后会将这些所有的话题全部同步到所有的 Name 服务器中。与此同时,Broker 也会启动一个定时任务,默认每隔 30 秒来执行一次话题全同步.主从写commitlog保证持久性和同步和其他一样,就不再说了。
  • Broker与Namesrv的心跳机制:
    单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息,如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Producer、Consumer有Broker宕机。
  • 消息存储持久化
    所有broker上的所有topic都顺序写入内存文件mapedfile(1G),mapedfilelist记录每个mapedfile在磁盘的偏移量,新消息写入最后一个文件。
  • 动态伸缩能力
    (非顺序消息,消息分散;有序消息只能放在一个queue中,切不支持迁移,只保证一个queue内顺序,但可以多消费线程保证顺序):Broker的伸缩性体现在两个维度:Topic, Broker。
    1)Topic维度:假如一个Topic的消息量特别大,但集群水位压力还是很低,就可以扩大该Topic的队列数,Topic的队列数跟发送、消费速度成正比。
    2)Broker维度:如果集群水位很高了,需要扩容,直接加机器部署Broker就可以。Broker起来后想Namesrv注册,Producer、Consumer通过Namesrv发现新Broker,立即跟该Broker直连,收发消息。

消费

  1. 消费者注册,消费者上有多有topic的broker地址和队列,消费者负载均衡选择;
    1)广播模式:每个costumer全量消费,消费偏移量保存在costumer中
    2)集群模式:constumer均匀消费部分,每个消息只有一个costumer消费,保存在broker上
  2. 新消息发送到q:brocker上commit log和消费组信息

    kafka/rocketmq
    每个commmit log消息发给topic的随机queue中(生产者的负载均衡,每个msg只发送到一个q中),每个queue有很多consumequeue,发给所有。广播模式,cq会在所有q上,集群模式cq会负载均衡到某个q上,消息根据这些配置数据落到q的所有cq上。
    kafka/rocketmq

  3. 消费
    3.1)普通的并发消费:queue的所有cq都直接发,所有cq发送后删除(q以TreeMap结构存储)。内部RocketMQ 的消息树是用 TreeMap 实现的,其内部基于消息偏移量维护了消息的有序性。每次消费请求都会从消息数中拿取偏移量最小的几条消息 (默认为 1 条)给用户,以此来达到有序消费的目的。
    3.2)有序消费:在3.1的基础上加两个锁,costumer client给消费的每个queue会加锁,保证同一时刻只有一个costumer client在消费queue(否则发给一个client删除了消息,此消息在另一个client和后面的client的消息无法保证顺序),默认20s加一次,queue检测60s没有就释放,每次成功后才取下一条,反正只有一个客户端消费。第二把锁是在client中,将堆积的消息按照顺序加锁的写入线程池task队列中。

    kafka/rocketmq

其他

bridgequeue

内存。redis实现。适合小型系统
kafka/rocketmq

ddmq 对大型延时系统的支持,引入chronos

kafka/rocketmq

这里的kafka去掉了。普通的直接用哪个rocketmq.延时消息和事务消息

  • 延时消息
    放入rocketmq一个内部的消费topic中,消费入chronos中(存RocksDB,seektimestamp, while从leveldb中取符合时间的再放入rocketmq中)
  • 事务消息
    A执行后要发送消息给B,因为ddmq一旦接收是保证被消费的,所以增加发送方事务回查。
    kafka/rocketmq

对比

kafka/rocketmq
分析:少topic时kafka性能好,rockemq需要读mq后去读一个大的cl。多topic是rockemq好,处理线程多。

相关推荐