Java操作RocketMQ
一、目录展示
二、导入依赖
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.0.10</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.0.10</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency>
三、提供者
package com.zn; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * 生产者 */ public class Provider { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer=new DefaultMQProducer("java-group"); producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i=0;i<10;i++){ //每秒发送一次MQ Thread.sleep(1000); //topic主题名称 tag临时值, body内容 Message message=new Message("itmayiedu-topic","TagA",("itmayiedu-"+i).getBytes()); SendResult sendResult=producer.send(message); System.out.println("来了来了:"+sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
四、消费者
package com.zn; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("java-group"); consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("itmayiedu-topic","TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt messageExt:list){ System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("consumer Started!"); } }
五、控制台效果
提供者:
消费者:
六、RocketMQ控制台效果
测试前:
测试后:
相关推荐
IT农场 2020-11-13
LCFlxfldy 2020-08-17
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30