聊聊rocketmq的ProducerImpl
序
本文主要研究一下rocketmq的ProducerImpl
ProducerImpl
io/openmessaging/rocketmq/producer/ProducerImpl.java
public class ProducerImpl extends AbstractOMSProducer implements Producer { public ProducerImpl(final KeyValue properties) { super(properties); } @Override public KeyValue properties() { return properties; } @Override public SendResult send(final Message message) { return send(message, this.rocketmqProducer.getSendMsgTimeout()); } @Override public SendResult send(final Message message, final KeyValue properties) { long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return send(message, timeout); } private SendResult send(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); try { org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { log.error(String.format("Send message to RocketMQ failed, %s", message)); throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); } message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); return OMSUtil.sendResultConvert(rmqResult); } catch (Exception e) { log.error(String.format("Send message to RocketMQ failed, %s", message), e); throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); } } @Override public Promise<SendResult> sendAsync(final Message message) { return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); } @Override public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) { long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return sendAsync(message, timeout); } private Promise<SendResult> sendAsync(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); final Promise<SendResult> promise = new DefaultPromise<>(); try { this.rocketmqProducer.send(rmqMessage, new SendCallback() { @Override public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); promise.set(OMSUtil.sendResultConvert(rmqResult)); } @Override public void onException(final Throwable e) { promise.setFailure(e); } }, timeout); } catch (Exception e) { promise.setFailure(e); } return promise; } @Override public void sendOneway(final Message message) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); try { this.rocketmqProducer.sendOneway(rmqMessage); } catch (Exception ignore) { //Ignore the oneway exception. } } @Override public void sendOneway(final Message message, final KeyValue properties) { sendOneway(message); } }
- 发送消息的方法主要是代理给rocketmqProducer
- 另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message
- 对于异步采用的是DefaultPromise,其callback为SendCallback
OMSUtil.msgConvert
io/openmessaging/rocketmq/utils/OMSUtil.java
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); rmqMessage.setBody(omsMessage.getBody()); KeyValue headers = omsMessage.headers(); KeyValue properties = omsMessage.properties(); //All destinations in RocketMQ use Topic if (headers.containsKey(MessageHeader.TOPIC)) { rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); } else { rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); } for (String key : properties.keySet()) { MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); } //Headers has a high priority for (String key : headers.keySet()) { MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); } return rmqMessage; }
- 这里主要是转换header及topic信息
SendCallback
org/apache/rocketmq/client/producer/SendCallback.java
public interface SendCallback { void onSuccess(final SendResult sendResult); void onException(final Throwable e); }
- 对于成功,将SendResult传递过来,对于异常则传递Throwable
DefaultPromise
io/openmessaging/rocketmq/promise/DefaultPromise.java
public class DefaultPromise<V> implements Promise<V> { private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class); private final Object lock = new Object(); private volatile FutureState state = FutureState.DOING; private V result = null; private long timeout; private long createTime; private Throwable exception = null; private List<PromiseListener<V>> promiseListenerList; public DefaultPromise() { createTime = System.currentTimeMillis(); promiseListenerList = new ArrayList<>(); timeout = 5000; } //...... @Override public boolean set(final V value) { if (value == null) return false; this.result = value; return done(); } @Override public boolean setFailure(final Throwable cause) { if (cause == null) return false; this.exception = cause; return done(); } private boolean done() { synchronized (lock) { if (!isDoing()) { return false; } state = FutureState.DONE; lock.notifyAll(); } notifyListeners(); return true; } private void notifyListeners() { if (promiseListenerList != null) { for (PromiseListener<V> listener : promiseListenerList) { notifyListener(listener); } } } private void notifyListener(final PromiseListener<V> listener) { try { if (exception != null) listener.operationFailed(this); else listener.operationCompleted(this); } catch (Throwable t) { LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); } } //...... }
- set或者setFailure方法都会调用done方法
- done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed
小结
- ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口
- 异步采用自定义的SendCallback回调和DefaultPromise
doc
相关推荐
IT农场 2020-11-13
LCFlxfldy 2020-08-17
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30