RocketMq producer 发送一条消息所经过的流程
前言:
RocketMq producer 在发送一条消息时候,从 producer --nameSrv -- Broker 中间经过了什么样子的数据交互
开始:
如下是 Producer 发送消息的一个demo例子:
//1. 初始化 mq producer DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test"); //2.设置nameServer 地址 mqProducer.setNamesrvAddr("localhost:9876"); //3. 开启mq producer,这一步是必须的,会做一些连接初始化检测工作 mqProducer.start(); //4.创建 Message Message msg = new Message("test-topis", "iscys-test".getBytes()); //5.发送消息,设置回调,消息发送成功会回调函数 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //在消息发送成功之后,我们收到broker的响应通知后,会进行回调 System.out.println("send success"); } @Override public void onException(Throwable e) { System.out.println("send fail"); } });
构建发送消息:
public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { try { //默认异步发送,超时3s this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); } catch (MQBrokerException e) { throw new MQClientException("unknownn exception", e); } }
从NameSrv 中获取topic 配置的相关信息,比如 broker 地址,队列数 之类的。
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; //1.尝试取获取从NameSrv 中获取topic 相关信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //2.选择一个消息队列,默认为4个,在创建新的Topic时候 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); //3.发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY:
主要看一下如上代码第一步 尝试获取Topic 信息 tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //1.从 topicPublishInfoTable 从尝试从Map中获取,如果没有获取到,请求NameSrv TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //2.从NameSrv 中拉取topic 信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { //3.说明获取到TOPIC 的信息 return topicPublishInfo; } else { //4.如果第2步执行后 NameSrv 中没有topic 信息,获取默认的TBW102 topic 的信息,这个是肯定能获取到的 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
1. 会先从 topicPublishInfoTable 缓存中获取topic 配置信息
2.缓存没有,就从NameSrv 中拉取。
3.如果获取到了,则返回。
4.NameSrv 没有得到相关到topic 信息,说明是新到topic ,则就请求获取TBW102 topic 配置信息,这个肯定能获取到,封装使用TBW102的配置。
请求NameSrv 非默认的topic
public boolean updateTopicRouteInfoFromNameServer(final String topic) { //1.非默认的topic ,默认Topic 为TBW102 return updateTopicRouteInfoFromNameServer(topic, false, null); }
执行从NameSrv 获取topic 请求
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { // 1.如果请求的是默认的Topic 请求会走到这里 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { // 2.新的Topic 会先从NameSrv 中获取一遍,如果NameSrv 中没有获取到,会抛出异常 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); }
1. 从NameSrv 中获取到 TBW102 的topic 信息,这个一般都是有的。
2. 新的topic 会从NameSrv 中获取信息,如果不存在,返回false。
获取到topic信息后封装成 TopicPublishInfo:
public class TopicPublishInfo { private boolean orderTopic = false; //用来检测Topic 在Broker 真实存在的,不存在false private boolean haveTopicRouterInfo = false; //消息队列的 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //请求NameSrv 返回的TOPIC 具体信息 private TopicRouteData topicRouteData;
相关推荐
LCFlxfldy 2020-08-17
IT农场 2020-11-13
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30
LCFlxfldy 2020-05-29