RocketMQ消息系统 - pull模式
RocketMQ消息系统 - pull模式
一、消费模式
消息系统常见的消费模式分为push模式和pull模式,push模式是服务端主动给客户端推送数据,也是比较常见的模式,pull模式则是客户端主动去服务端拉取数据。
1.1 push模式
① 服务端保存push状态,失败重发;
② 客户端无状态;
③ push是实时的;
④ 负载均衡,服务端统一处理和控制,需要根据消费者的消费能力做流控;
1.2 pull模式
① 服务端无状态;
② 客户端保存当前pull状态,以便在故障或者重启时恢复;
③ pull分长轮询(实时)和短轮询(与pull的时间间隔有关);
④ 负载均衡,消费者自行控制;
二、pull模式原理
PullMessageTask:
① 每3s拉取一次;
② 更新拉取的队列的offset到broker;
③ 每个队列启动一个PullMessageTask,负责这个队列的消息拉取;
④ 消费失败,或者listener执行的时候出错的消息,保存到H2数据库中;
ReconsumerMessgaeTask:
对于消费失败的消息,从数据库中获取并重新消费;
三、pull模式使用
3.1 消费者
消息topic:mengka-cc2
groupId: consumerG2
消息模式: PULL
拉取间隔时间:3000ms
<bean id="consumerNotifyManager" class="com.mengka.mq.client.NotifyManagerBean" init-method="init"> <property name="groupId" value="consumerG2" /> <property name="name" value="taaNotifyManager" /> <property name="topic" value="mengka-cc2"/> <property name="ctype" value="PULL"/> <property name="namesrvAddr" value="192.168.1.42:9876"/> </bean> <bean id="pullMessageManager" class="com.mengka.mq.listener.PullMessageManager"> <property name="consumerNotifyManager" ref="consumerNotifyManager"/> <property name="messageListener" ref="taaPullMessageListener" /> <property name="spaceTime" value="3000"/> </bean>
/** * User: mengka * Date: 15-8-8 */ @Component public class TaaPullMessageListener implements MessageListenerPull { private static final Logger log = LoggerFactory.getLogger(TaaPullMessageListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage(MessageExt msg) { log.info("---------------, receive message id = "+msg.getMsgId()+" , content = "+new String(msg.getBody())+" , tags = "+msg.getTags()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
3.2 生产者
消息topic:mengka-cc2
groupId: consumerG2
<bean id="producterNotifyManager" class="com.mengka.mq.client.NotifyManagerBean" init-method="initProducter"> <property name="groupId" value="consumerG2" /> <property name="name" value="taaNotifyManager" /> <property name="topic" value="mengka-cc2"/> <property name="namesrvAddr" value="192.168.1.42:9876"/> </bean>
String serviceConfigXMLs[] = new String[]{"rocketmq_pull_06/rocketmq-pull-producer.xml"}; ApplicationContext context = new ClassPathXmlApplicationContext(serviceConfigXMLs); NotifyManager producterNotifyManager = (NotifyManager) context.getBean("producterNotifyManager"); String content = "Just for test[" + TimeUtil.toDate(new Date(), TimeUtil.format_1); Message message = new BytesMessage(content.getBytes()); SendResult result = producterNotifyManager.sendMessage(message);
3.3 拉取消息消费
生产者发送消息之后,在下一次的轮询中,消费者就拉取到了未消费的消息数据,消息消费成功后并更新offset到服务端;
相关推荐
LCFlxfldy 2020-08-17
IT农场 2020-11-13
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30