RocketMQ源码学习(四)-Consumer


这次源码学习的方法是带着问题学习源码实现,问题列表如下

  • Consumer Group的概念是什么?

  • Consumer pull过程是怎样的?

  • Consumer 支持push吗?

  • Consumer 怎么实现单队列并行消费?

  • Consumer 怎么过滤消息?

  • Consumer 怎么保证一条消息只被Group中的一个服务消费?

  • Consumer 负载均衡怎么实现?

  • Consumer 消费失败怎么办?

  • Consumer 可以回溯消费吗?

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

Consumer Group的概念是什么?

一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。一般情况下group中Consumer的数量不能超过订阅的topic中queue的数量,不然会有闲置的Consumer.

Consumer pull过程是怎样的?

分析过Producer,看Consumer有种似曾相识的感觉

主要逻辑

1. 根据mq信息去找broker路由信息
2. 根据相关参数构建请求头
3. 委托netty去broker获取消息

代码走读

MQPullConsumer.pull的参数需指定MessageQueue,和offset(位置偏移)的.

PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

再看Pull操作的返回,有本次获取的数据信息MessageExt,即位置信息offset

public class PullResult {
    //pull状态
    private final PullStatus pullStatus;
    //下次pull的偏移量
    private final long nextBeginOffset;
    //最小偏移量
    private final long minOffset;
    //最大偏移量
    private final long maxOffset;
    //获取到的消息
    private List<MessageExt> msgFoundList;
}

MQPullConsumer.pull
-> DefaultMQPullConsumer.pull
-> DefaultMQPullConsumerImpl.pull
-> DefaultMQPullConsumerImpl.pullSyncImpl
-> DefaultMQPullConsumerImpl.pullKernelImpl

public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //获取broker信息
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            // 构建pull请求头
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            //委托Netty去获取信息
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

pull消息比较简单,一次请求返回,由Consumer管理offset.一般来说一个Consumer Group中Consumer的数量不能大于MessageQueue的数量.

Consumer 支持push吗?

Push Consumer

Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立

刻回调 Listener 接口方法。JMS标准中为MessageListener类的onMessage方法.

Pull Consumer

Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

RocketMQ的Consumer都是从Broker pull消息来消费,但是为了能做到实时收消息,RocketMQ 使用长轮询方式,可以保证消息实时性同Push方式一致。这种长轮询方式类似于WebQQ收发消息机制。请参考以下信息了解更多Comet:基于 HTTP 长连接的“服务器推”技术

虽然RocketMQ的consumer都是通过pull来实现的但是其封装了push接口,我们先来看其使用方法

public static void main(String[] args) throws InterruptedException, MQClientException {  
        /** 
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例 
         * 注意:ConsumerGroupName需要由应用来保证唯一 
         */  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup");  
        consumer.setNamesrvAddr("ip:port");  
          
        /** 
         * 订阅指定topic下tags分别等于TagA或TagB 
         */  
        consumer.subscribe("broker-a", "TagB || TagA");  
  
        /** 
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 
         * 如果非第一次启动,那么按照上次消费的位置继续消费 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        //真正的处理消息逻辑在这里
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            /** 
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息 
             */  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,  
                    ConsumeConcurrentlyContext context) {  
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                MessageExt msg = msgs.get(0);  
                if (msg.getTopic().equals("broker-a")) {  
                    // 执行TopicTest1的消费逻辑  
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {  
                        // 执行TagA的消费  
                        String message = new String(msg.getBody());  
                        System.out.println(message);  
                    }  
                    else if (msg.getTags() != null && msg.getTags().equals("TagB")) {  
                        // 执行TagB的消费  
                        String message = new String(msg.getBody());  
                        System.out.println(message);  
                    }  
                    
                }  
                //消费者向mq服务器返回消费成功的消息  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
       //Consumer对象在使用之前必须要调用start初始化,初始化一次即可
        consumer.start();  
  
    }

RocketMQ push的实现 :

消息的拉取逻辑

维护一个pullRequestQueue,先放入一个pullRequest,当pullResult为成功时,再构建新的pullRequest放入pullRequestQueue,另起一个线程监测pullRequestQueue,当起不为空时,轮询pull消息

DefaultMQPushConsumer.start
-> DefaultMQPushConsumerImpl.start
-> MQClientInstance.start
-> PullMessageService.start
我们来看PullMessageService的run方法,

//请求消息阻塞链表
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        //只要有请求就去pull消息
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

pullRequestQueue在在哪里put呢?
在class里找到在executePullRequestLater方法内会put

public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    }

  public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

查看此方法的调用关系,发现在run中的pullMessage方法中onSuccess回调中会构建下一次的pullRequestQueue待下次请求

PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
              //请求成功就构建新的pullRequest              pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);
//放到pullRequestQueue
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
                                    pullResult.getNextBeginOffset(), //
                                    firstMsgOffset, //
                                    prevRequestOffset);
                            }

                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}", //
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);

                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }

至此获取消息已经搞定,再看怎么触发MessageListener的消费方法.
还是在DefaultMQPushConsumerImpl.pullMessage方法内的回调,有下列代码,把消息提供给consumeMessageService处理.

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);

构建ConsumeRequest,然后提交至线程池消费

@Override
    public void submitConsumeRequest(//
        final List<MessageExt> msgs, //
        final ProcessQueue processQueue, //
        final MessageQueue messageQueue, //
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

终于在ConsumeRequest的run方法中找到了listner的consumeMessage

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

这下整个pull逻辑就完成了.

Consumer 怎么实现单队列并行消费

上节代码就是取得并行的例子,简单来说就是把消息提交给线程池,而不阻塞,就单队列并行消费了

Consumer 怎么过滤消息?

入口还是在DefaultMQPushConsumerImpl.pullMessage

pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

往里面看,发现有过滤消息的逻辑

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

            List<MessageExt> msgListFilterAgain = msgList;
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }
            //消息过滤
            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            for (MessageExt msg : msgListFilterAgain) {
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                    Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                    Long.toString(pullResult.getMaxOffset()));
            }

            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

Consumer 怎么保证一条消息只被Group中的一个服务消费?

因为topic的MessageQueue只能对应Group中的一个Consumer,所以一条消息只被Group中的一个服务消费

Consumer 负载均衡怎么实现?

概念:

consumer同时消费多个MessageQueue,当topic中的MessageQueue变更时,动态调整消费MessageQueue的数量

//RebalanceImpl
 public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

我们只关心集群模式
主要逻辑:

1. 获取topic所有MessageQueue
2. 获取同ConsumerGroup组所有Consumer信息
3. 根据制定策略分配给此Consumer
private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}", //
                            consumerGroup, //
                            topic, //
                            mqSet, //
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
            //获取该topic所有MessageQueue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            //获取同consumerGroup信息
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                    //根据分配策略分配MessageQueue给当前Consumer
                        allocateResult = strategy.allocate(//
                            this.consumerGroup, //
                            this.mQClientFactory.getClientId(), //
                            mqAll, //
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

RocketMQ提供了几种策略供使用

实现类策略名
AllocateMessageQueueAveragelyByCircle轮询平均分配策略
AllocateMessageQueueByMachineRoom根据机房分配策略
AllocateMessageQueueConsistentHash一致Hash分配策略

本节编写参考分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点

Consumer 消费失败怎么办

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为 有以下几种情况

  1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过 10s 秒后再重试。

  1. 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再 消费下一条消息,这样可以减轻 Broker 重试消息的压力。

具体到代码实现,会根据消费状态进行处理,当无返回时会重试.

if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                //设置状态为重试
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
public void processConsumeResult(//
        final ConsumeConcurrentlyStatus status, //
        final ConsumeConcurrentlyContext context, //
        final ConsumeRequest consumeRequest//
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                //请求重试消费
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

Consumer 可以回溯消费吗?

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费 1 小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

逻辑: 请求broker按参数返回offset,按照offset重置消费offset,从而实现回溯消费

public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
        final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
        throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setForce(isForce);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
        if (isC) {
            request.setLanguage(LanguageCode.CPP);
        }

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                if (response.getBody() != null) {
                    ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                    return body.getOffsetTable();
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

相关推荐