聊聊rocketmq的ProducerImpl

本文主要研究一下rocketmq的ProducerImpl

ProducerImpl

io/openmessaging/rocketmq/producer/ProducerImpl.java

public class ProducerImpl extends AbstractOMSProducer implements Producer {

    public ProducerImpl(final KeyValue properties) {
        super(properties);
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public SendResult send(final Message message) {
        return send(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public SendResult send(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return send(message, timeout);
    }

    private SendResult send(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
            if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                log.error(String.format("Send message to RocketMQ failed, %s", message));
                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
            }
            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
            return OMSUtil.sendResultConvert(rmqResult);
        } catch (Exception e) {
            log.error(String.format("Send message to RocketMQ failed, %s", message), e);
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message) {
        return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return sendAsync(message, timeout);
    }

    private Promise<SendResult> sendAsync(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        final Promise<SendResult> promise = new DefaultPromise<>();
        try {
            this.rocketmqProducer.send(rmqMessage, new SendCallback() {
                @Override
                public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
                    promise.set(OMSUtil.sendResultConvert(rmqResult));
                }

                @Override
                public void onException(final Throwable e) {
                    promise.setFailure(e);
                }
            }, timeout);
        } catch (Exception e) {
            promise.setFailure(e);
        }
        return promise;
    }

    @Override
    public void sendOneway(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            this.rocketmqProducer.sendOneway(rmqMessage);
        } catch (Exception ignore) { //Ignore the oneway exception.
        }
    }

    @Override
    public void sendOneway(final Message message, final KeyValue properties) {
        sendOneway(message);
    }
}
  • 发送消息的方法主要是代理给rocketmqProducer
  • 另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message
  • 对于异步采用的是DefaultPromise,其callback为SendCallback

OMSUtil.msgConvert

io/openmessaging/rocketmq/utils/OMSUtil.java

public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
        rmqMessage.setBody(omsMessage.getBody());

        KeyValue headers = omsMessage.headers();
        KeyValue properties = omsMessage.properties();

        //All destinations in RocketMQ use Topic
        if (headers.containsKey(MessageHeader.TOPIC)) {
            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
        } else {
            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
        }

        for (String key : properties.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
        }

        //Headers has a high priority
        for (String key : headers.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
        }

        return rmqMessage;
    }
  • 这里主要是转换header及topic信息

SendCallback

org/apache/rocketmq/client/producer/SendCallback.java

public interface SendCallback {
    void onSuccess(final SendResult sendResult);

    void onException(final Throwable e);
}
  • 对于成功,将SendResult传递过来,对于异常则传递Throwable

DefaultPromise

io/openmessaging/rocketmq/promise/DefaultPromise.java

public class DefaultPromise<V> implements Promise<V> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
    private final Object lock = new Object();
    private volatile FutureState state = FutureState.DOING;
    private V result = null;
    private long timeout;
    private long createTime;
    private Throwable exception = null;
    private List<PromiseListener<V>> promiseListenerList;

    public DefaultPromise() {
        createTime = System.currentTimeMillis();
        promiseListenerList = new ArrayList<>();
        timeout = 5000;
    }

    //......

    @Override
    public boolean set(final V value) {
        if (value == null)
            return false;
        this.result = value;
        return done();
    }

    @Override
    public boolean setFailure(final Throwable cause) {
        if (cause == null)
            return false;
        this.exception = cause;
        return done();
    }

    private boolean done() {
        synchronized (lock) {
            if (!isDoing()) {
                return false;
            }

            state = FutureState.DONE;
            lock.notifyAll();
        }

        notifyListeners();
        return true;
    }

    private void notifyListeners() {
        if (promiseListenerList != null) {
            for (PromiseListener<V> listener : promiseListenerList) {
                notifyListener(listener);
            }
        }
    }

    private void notifyListener(final PromiseListener<V> listener) {
        try {
            if (exception != null)
                listener.operationFailed(this);
            else
                listener.operationCompleted(this);
        } catch (Throwable t) {
            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
        }
    }

    //......
}
  • set或者setFailure方法都会调用done方法
  • done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed

小结

  • ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口
  • 异步采用自定义的SendCallback回调和DefaultPromise

doc

相关推荐