RocketMq在SparkStreaming中的应用
其实Rocketmq的给第三方的插件已经全了,如果大家有兴趣的话请移步https://github.com/apache/rocketmq-externals。本文主要是结合笔者已有的rmq在spark中的应用经验对rocketmq做简单介绍以及经验总结,当然免不了会将rocketmq和如今特别火爆的kafka做一些对比(Ps:为了方便打字rmq后面会是rocketmq的缩写)。
首先对rocktmq做一些流行的消息队列对比。
提到mq不得不提消息队列,对应于数据结构里面的“先进先出”的队列。而rocketmq就是应用于大数据时代拥有高吞吐低延迟特性的分布式消息拥有发布订阅功能的队列系统。这样的分布式消息系统主要提供应用解耦、流量消峰、消息分发等功能。本片不会对安装集群做过多的介绍,安装单机版本rmq的教程移步官方文档http://rocketmq.apache.org/docs/quick-start/。rocktmq是阿里研发主要作用于双十一这样的高峰期实时流数据处理,起初是基于activemq,但是随着对吞吐量的要求逐步提高,阿里的开发者们逐渐把眼光向kafka转移,但是kafka并不具备低延迟和高可靠性。因此阿里决定研究这样一个兼并传统的订阅消息系统的发布订阅场景与高并发零误差低延时的传输系统。
下面这个表是官网在2016年提供的activemq、kafka以及rocketmq的对比图。或许对比有点落后,或许开发者比较的眼光比较偏向于rockemq但是仅作为参考(比如数据的有序性,kafka因为需要要有序性和高并发获得一个平衡只能保证一个partition下的消息通过offset来保持消费有序(当一个主题只有一个Partition的时候就能保持全局消息有序性),rocketmq是通过主题与消息队列的一对一对应的来确保全局有序性的,实际上这两种都是可以保证全局有序性,前提都是失去消息的多线程消费)。
Messaging Product | Client SDK | Protocol and Specification | Ordered Message | Scheduled Message | Batched Message | BroadCast Message | Message Filter | Server Triggered Redelivery | Message Storage | Message Retroactive | Message Priority | High Availability and Failover | Message Track | Configuration | Management and Operation Tools |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ActiveMQ | Java, .NET, C++ etc. | Push model, support OpenWire, STOMP, AMQP, MQTT, JMS | Exclusive Consumer or Exclusive Queues can ensure ordering | Supported | Not Supported | Supported | Supported | Not Supported | Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB | Supported | Supported | Supported, depending on storage,if using kahadb it requires a ZooKeeper server | Not Supported | The default configuration is low level, user need to optimize the configuration parameters | Supported |
Kafka | Java, Scala etc. | Pull model, support TCP | Ensure ordering of messages within a partition | Not Supported | Supported, with async producer | Not Supported | Supported, you can use Kafka Streams to filter messages | Not Supported | High performance file storage | Supported offset indicate | Not Supported | Supported, requires a ZooKeeper server | Not Supported | Kafka uses key-value pairs format for configuration. These values can be supplied either from a file or programmatically. | Supported, use terminal command to expose core metrics |
RocketMQ | Java, C++, Go | Pull model, support TCP, JMS, OpenMessaging | Ensure strict ordering of messages,and can scale out gracefully | Supported | Supported, with sync mode to avoid message loss | Supported | Supported, property filter expressions based on SQL92 | Supported | High performance and low latency file storage | Supported timestamp and offset two indicates | Not Supported | Supported, Master-Slave model, without another kit | Supported | Work out of box,user only need to pay attention to a few configurations | Supported, rich web and terminal command to expose core metrics |
上表的对比并不是最新的,对比于2016年。如今,拥有众多粉丝的kafka在上千家公司得到应用,社区的活跃性让kafka做了从架构等方面的优化。这里需要提及两点,目前在kafka官网文档没有看到改进说明。一、kafka作为中间件而言,消费模式只有集群消费,广播消费只存在于同一个主题下不同消费组之间,同一个消费组内的不同消费组进程必须且只能消费某个消息主题下的不同partition,这也造成当消费主题过多时,多个消费者在消费状态下会有过多磁盘IO读取文件操作,造成kafka的延时性远远高于rocketmq;但是作为高并发,一个主题分成多个partition会使得kafka的高吞吐能力远远高于其他中间件。二、消息的重新消费。rmq支持通过指定某个时间点或者offset甚至选择特定消费决策(latest或者earliest)来重置offset的两种方式来重新获取消息,而当前了解是kafka只支持后者一种方式。研究rmq如何实现高并发低延迟的机制请移步http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/。
组成rmq的各个角色介绍。
Producer:生产者。类似于邮件系统中发消息的角色。
ProducerGroup:相同角色的生产者分为一组(考虑到生产者的高效率为了避免不必要的消息初始化,一个组内只允许一个生产者实例)。
Consumer:消费者。类似于邮件系统中收消息的角色。
ConsumerGroup:类似于生产者组,相同角色的消费组分为一个组(在集群模式下,同一个消费组内的消费者均衡的分摊队列中的消息,不同消费组内不同消费者可以同时接受相同的消息,这就实现了加载平衡和高容错的目标)。
Topic:主题。是生产者和消费这之间传输之前确定好的消息类别。生产者发消息之前需要创建Topic,然后消费者想要获取这个Topic下的消息需要订阅这个主题。一个消费者组可以订阅多个主题,只要这个组内的所有消费者订阅的主题保持一致性。
Message:消息。就是发送信息的载体,里面包含需要发送的具体信息以及必须要带Topic(可以理解这里的Topic就是邮件的地址,生产者作为发信人需要写对的收件人地址,消费者需要登陆对应收件人的邮箱才能收到生产者发送到这个邮箱地址上的邮件)。
MessageQueue:消息队列。类似于kafka中的partition,只不过这里的分区是逻辑分区不是partition这样的物理分区,因此如果某个topic下的数据量特别多,可以通过分为不同的消息队列来获得高并发量,生产者可以高并发的发送消息,消费者可以高并发的读取消息,此外需要说明的每个队列管理一个offset,这里的offset准确的定义是某个topic下的指定队列里的位置,通过offset可以定位具体的消息,用来指示消费者从offset开始处理。
Broker:接受来自Produer的消息,存储消息,提供管道给Consumer获取消息。也会存储元数据信息,包括消费组、消费进程的offset以及主题甚至队列的相关信息(HA架构中Broker可以是M/S模式消除单点故障,甚至是多M/S模式可以提供存储量和吞吐量)。
NameServer:管理Broker的路由信息。Producer和Cosumer需要拿Topics去NameServer中找到对应的Broker的清单(多NameServer可以消除单点故障)。
MessageModel:集群消费和广播消费。集群消费就是同一个主题下的所有消费者均衡的分摊消息队列中的消息从而做到负载均衡,广播消费是所有消费者都消费这个队列的全量消息。
讲完了rocktmq,我们再简单介绍sparkstreaming。
Spark Streaming是提供高吞吐,拥有容错能力的实时数据量处理的基于Spark Core的扩展。输入数据源可以是Kafka、Flume、HDFS以及TCP套接字,并且拥有许多高级算子比如map、reduce、join和window。输出可以是HDFS、数据库或者实时仪表盘。甚至可以在这些数据量上执行机器学习和图论相关的算法。其实,与其说streaming是实时处理,更确切的描述应该是micro-batch的伪实时流数据处理引擎。
在实时性要求不高的场景,是可以秒级的护理该单位时间内的所有数据。具体的接口文档见https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/spark-streaming-rocketmq.md,这里只介绍编写入口函数RocketMqUtils.createMQPullStream时
需要重点关注的几个参数。
forceSpecial:Boolean。默认情况下是false,每个消息队列如果拥有checkpoint就不管我们是否指定offset消费者都会从checkpoint开始消费数据,但是如果设置为true,那么rmq就会从指定的可以获取的offset开始消费,这个时候下面的ConsumerStrategy参数就会生效。
ConsumerStrategy:ConsumerStrategy。分为earliest、lastest、specificOffset(queueToOffset: ju.Map[MessageQueue, Long])以及specificTime(queueToTime: ju.Map[MessageQueue, String])这四种类型。如果是第一种则是从队列的最小位移开始消费,这时候可能会重复消费之前以及消费过的消息;第二种是从最大位移开始消费也就是会错过消费进程启动前的生产者发的消息;第三种是直接设置指定队列的offset,如果这个offset小于最小位移就直接从该队列的最小位移开始消费,否则直接从指定offset开始消费;第四种就是获取某个时间点转换为时间戳的的offset。对于没有指定offset的队列默认从最小位移开始消费。
autoCommit:Boolean。是否自动提交offset给rmq服务器。true的情况是一旦接受到就自动提交offset;false的情况是异步提交,消息处理并callback后才会提交offset。
failOnDataLoss:Boolean。当查询的数据丢失(比如topic被删除或者offset超出范围)是否报异常退出程序还是仅仅日志警告输出。
这里就对遇到的坑位做一些总结:
1、找不到Topic。要么是打包少了fastjson这个依赖,要么是nameserver地址写错了或者topic写错了。
2、数据丢失。第一种丢失是配置设错了,forceSpecial为true的情况下