RocketMQ
RocketMQ整理
概念
broker
MQ服务节点topic
主题message queue
消息队列,类似于kafka
中的partition
producer
生产者,先去连接
name server
,查询到对应的broker
信息,再去连接broker
consumer
消费者,先去连接
name server
,查询到对应的broker
信息,再去连接broker
name server
命名服务器,用于存储
broker
信息
启动命令
先启动name server
./bin/mqnamesrv
然后再启动broker
./bin/mqbroker -n {name.server.url} ./bin/mqbroker -c {config} #-n 指定nameserver的ip和端口 #-c 以指定配置文件为参数启动 #端口默认为 9876
这时候启动应该会报错
原因:
RocketMQ
的默认配置是生产环境级别的,JVM的内存达到8G,如果机器配置不够就会报错
解决:
修改
runbroker.sh
脚本,将JVM参数 -Xms,-Xmx,设置为机器可以承受的大小修改
runserver.sh
脚本,将JVM参数 -Xms,-Xmx,设置为机器可以承受的大小
Console端测试
RocketMQ提供了quick start
的example
配置环境变量
NAMESRV_ADDR
为name server
的url执行
./bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
来启动一个生产者
,自动发送测试消息执行
./bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
来启动一个消费者
来接收消息
JAVA客户端测试
引入RocketMQ
依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency>
Java示例代码
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; ? public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("test-group"); // Specify name server addresses. producer.setNamesrvAddr("172.16.55.185:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest11" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
执行main
方法会报错,提示timeout
原因:
因为RocketMQ
默认启动broker
的时候,分配的ip只有服务器内网访问权限,外网无法访问
解决:
通过配置文件指定broker
的启动ip
为服务器外网能够访问的ip
然后重启broker
,使用-c
参数指定配置文件路径
UI管理工具(了解)
rocketmq-console
,可以同过github
下载
具体使用方式上网搜索
Topic创建
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM"); producer.setNamesrvAddr("172.16.55.185:9876"); producer.start(); /** * key:broker名称 * newTopic:topic名称 * queueNum:队列数(分区)默认4个 */ producer.createTopic("broker_name", "topic", 8);
消息发送
同步模式
String msgStr = "用户A发送消息给用户B"; Message msg = new Message("haoke_im_topic","SEND_MSG", msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息 SendResult sendResult = producer.send(msg); System.out.println("消息状态:" + sendResult.getSendStatus()); System.out.println("消息id:" + sendResult.getMsgId()); System.out.println("消息queue:" + sendResult.getMessageQueue()); System.out.println("消息offset:" + sendResult.getQueueOffset());
异步模式
String msgStr = "用户A发送消息给用户B"; Message msg = new Message("broker_name","SEND_MSG",msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 异步发送消息 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息状态:" + sendResult.getSendStatus()); System.out.println("消息id:" + sendResult.getMsgId()); System.out.println("消息queue:" + sendResult.getMessageQueue()); System.out.println("消息offset:" + sendResult.getQueueOffset()); } @Override public void onException(Throwable e) { System.out.println("发送失败!" + e); } });
Message数据结构
字段名 | 默认值 | 说明 |
---|---|---|
topic | null | 必填,线下环境不需要申请,线上环境需要申请后才能使用 |
Body | null | 必填,二进制形式,序列化由应用决定,producer 与 consumer 要协商好序列化形式 |
Tags | null | 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个 tag ,所以也可以类比为 Notify 的 MessageType 概念 |
Keys | null | 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等 |
Flag | 0 | 选填,完全由应用来设置,RocketMQ 不做干预 |
DelayTimeLevel | 0 | 选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费 |
WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答 |
消息接收
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("172.16.55.185:9876"); // 订阅topic,* 表示接收此Topic下的所有消息,相当于一个匹配符 //如果为 "tag1"就表示只订阅tag为"tag1"的消息 还可以组合订阅 "tag1 || tag2 || tag3" consumer.subscribe("topic_name", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } System.out.println("收到消息->" + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
消息过滤器
RocketMQ
支持根据用户自定义属性进行过滤
过滤表达式类似于SQL的where,如:a > 5 AND b = ‘abc‘
RocketMQ默认没有开启消息过滤器
所以需要先开启才能使用
开启方式:
在配置文件中新增enablePropertyFilter = true
生产者发送带有条件的消息
String msgStr = "大家好我叫小美,今年18岁"; Message msg = new Message("topic_name","tag1", msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.putUserProperty("age", "18"); msg.putUserProperty("sex", "女"); // 发送消息 SendResult sendResult = producer.send(msg);
消费者使用过滤器
接收
// 订阅topic,接收此Topic下的所有消息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("172.16.55.185:9876"); // 订阅topic,* 表示接收此Topic下的所有消息,相当于一个匹配符 consumer.subscribe("topic_name", MessageSelector.bySql("age>=18 AND sex=‘女‘")); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println(new String(msg.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } System.out.println("收到消息->" + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
消息顺序
在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的
比如在电商系统中,订 单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了
生产者代码实现
DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("172.16.55.185:9876"); producer.start(); for (int i = 0; i < 100; i++) { String msgStr = "order --> " + i; int orderId = i % 10; // 模拟生成订单id Message message = new Message("topic","tag", msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET)); //orderId实际传入到lambda表达式(MessageQueueSelector)第三个参数 SendResult sendResult = producer.send(message, (mqs, msg, arg) -> { //orderId范围0-9之间 Integer id = (Integer) arg; //topic默认queue个数为4 所以index范围0-4之间 int index = id % mqs.size(); //凡是orderId对4取余为0的选用第1个queue //凡是orderId对4取余为1的选用第2个queue //凡是orderId对4取余为2的选用第3个queue //凡是orderId对4取余为3的选用第4个queue //这样一来就保证了满足某一条件的消息能够总是发送到特定的queue中 return mqs.get(index); }, orderId); System.out.println(sendResult); } producer.shutdown();
消费者代码
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group"); consumer.setNamesrvAddr("172.16.55.185:9876"); consumer.subscribe("topic", "tag"); //使用MessageListenerOrderly这种顺序监听器,可以满足按顺序消费同一队列中的消息,不同的线程处理不同的队列 //比如thread-pool-1专门消费queue-0中的消息 //比如thread-pool-2专门消费queue-1中的消息 //比如thread-pool-3专门消费queue-2中的消息 //比如thread-pool-4专门消费queue-3中的消息 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start();
分布式事务
随着项目越来越复杂,越来越服务化,就会导致系统间的事务问题,这个就是分布式事务问题。 分布式事务分类有这几种:
基于单个JVM,数据库分库分表了(跨多个数据库)。
基于多JVM,服务拆分了(不跨数据库)。
基于多JVM,服务拆分了 并且数据库分库分表了。
解决分布式事务问题的方案有很多,使用消息实现只是其中的一种
实现原理
半消息
生产者发送一条
半消息
MQ不会将
半消息
发送给消费者等生产者发送一条
commit
消息,MQ才会将半消息
发送给消费者生产者发送一条
rollback
消息,MQ将半消息
给回滚消息回查
当
commit
或者rollback
消息在网络中丢失MQ会回调生产者提供的
check
方法,来确认消息的状态是commit
或者rollback
生产者代码
TransactionMQProducer producer = new TransactionMQProducer("producer_group"); producer.setNamesrvAddr("172.16.55.185:9876"); // 设置自定义事务监听器 producer.setTransactionListener(new TransactionListenerImpl()); producer.start(); // 发送消息 Message message = new Message("pay_topic", "用户A给用户B转账500元".getBytes("UTF-8")); producer.sendMessageInTransaction(message, null); Thread.sleep(999999); producer.shutdown();
TransactionListenerImpl.java
public class TransactionListenerImpl implements TransactionListener { private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>(); /** * 执行具体的业务逻辑 * * @param msg 发送的消息对象 * @param arg * @return */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { System.out.println("用户A账户减500元."); Thread.sleep(500); //模拟调用服务 // System.out.println(1/0); System.out.println("用户B账户加500元."); Thread.sleep(800); STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE); // 二次提交确认 return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { e.printStackTrace(); } STATE_MAP.put(msg.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE); // 回滚 return LocalTransactionState.ROLLBACK_MESSAGE; } /** * 消息回查 * * @param msg * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return STATE_MAP.get(msg.getTransactionId()); } }
消费者push和pull模式
push
模式客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端
pull
模式客户端不断的轮询请求服务端,来获取新的消息
但在具体实现时,push
和pull
模式都是采用消费端主动拉取的方式,即consumer
轮询从broker
拉取消息
push
方式里,consumer
把轮询过程封装了,并注册MessageListener
监听器,取到消息后,唤醒 MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的pull
方式里,取消息的过程需要用户自己写,首先通过打算消费的topic
拿到MessageQueue
的集合,遍历MessageQueue
集合,然后针对每个MessageQueue
批量取消息,一次取完后,记录该队列下一次要取的开 始offset,直到取完了,再换另一个MessageQueue
消息模式
集群模式
同一个
Group
下的消费者每个消费者消费一部分
topic
数据同一个
Group
下的消费者合起来消费的数据就是topic
的所有数据广播模式
同一个
Group
下的消费者每一个消费的数据就是
topic
的所有数据
// 集群模式 consumer.setMessageModel(MessageModel.CLUSTERING); // 广播模式 consumer.setMessageModel(MessageModel.BROADCASTING);
重复消息
造成消息重复的根本原因是:网络不可达
只要通过网络交换数据,就无法避免这个问题
所以解决这个问题的办 法就是绕过这个问题
那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理
消费端处理消息的业务逻辑保持
幂等性
保证每条消息都有
唯一编号
且保证消息处理成功与去重表的日志同时出现
持久化
RocketMQ中的消息数据存储,采用了零拷贝技术(使用 mmap
+ write
方式)
文件系统采用 Linux Ext4
文件系 统进行存储
在RocketMQ
中,消息数据是保存在磁盘文件中,为了保证写入的性能,RocketMQ
尽可能保证顺序写入
,顺序写 入的效率比随机写入的效率高很多。 RocketMQ
消息的存储是由ConsumeQueue
和CommitLog
配合完成的
CommitLog
是真正存储数据的文件
ConsumeQueue
是索引文件
,存储数据指向到物理文件的配置。
Consume Queue
相当于kafka
中的partition
,是一个逻辑队列,存储了这个Queue在CommiLog中的起始 offset
,log
大小和MessageTag
的hashCode
每次读取消息队列先读取consumerQueue
,然后再通过consumerQueue
去CommitLog
中拿到消息主体
同步异步写入
RocketMQ
为了提高性能,会尽可能地保证磁盘的顺序写入
消息在通过 producer
写入 RocketMQ
的时候,有两 种写磁盘方式
分别是同步写入
与异步写入
同步
在返回写成功状态时,消息已经被写入磁盘
消息写入内存的缓冲区后,立刻通知写入线程写入
等待写入完成,写入线程 执行完成后唤醒等待的线程,返回消息写成功的状态
异步
在返回写成功状态时,消息可能只是被写入了内存的 缓冲区
写操作的返回快,吞吐量大
当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入
配置指定写入方式
#-- 异步 flushDiskType = ASYNC_FLUSH #-- 同步 flushDiskType = SYNC_FLUSH