Spring + RocketMQ入门
RocketMQ简单介绍
RocketMQ 是阿里出品的一款MQ,现在已经捐给Apache并成为Apache顶级项目,更多介绍请 移步
在这里向大家介绍一个学习RocketMQ的好文章:
RocketMQ实战(一)
RocketMQ实战(二)
RocketMQ实战(三)
RocketMQ实战(四)
一些说明
- 本文给出的代码均为代码片段,并非完整代码
- 阅读本文前,您需要具备以下知识:
- 了解RocketMQ是什么并安装它
- 了解JUnit并会简单使用
- 了解Spring并会简单使用
- 了解 maven 并会简单使用
引入相关库
maven 引入:
<!-- junit5 --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>${junit5.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-runner</artifactId> <version>${junit5-platform.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-console-standalone</artifactId> <version>${junit5-platform.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq-client.version}</version> </dependency>
相关库版本:
<junit5.version>5.1.0</junit5.version> <junit5-platform.version>1.1.0</junit5-platform.version> <rocketmq-client.version>4.2.0</rocketmq-client.version> <spring.version>5.0.4.RELEASE</spring.version>
创建spring配制文件
创建2个spring配制文件,一个用于生产者,一个用于消费者,相关参数含义就不再介绍
applicationContext-producer.xml 生产者
<bean id="rocketmqProduct" class="org.apache.rocketmq.client.producer.DefaultMQProducer" init-method="start" destroy-method="shutdown"> <property name="producerGroup" value="producer1"/> <property name="namesrvAddr" value="127.0.0.1:9876"/> </bean>
applicationContext-consumer.xml 消费者
<bean id="consumerImplTest" class="org.klw.test.rocketMqTest.spring.ConsumerTestImpl" /> <bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start" destroy-method="shutdown"> <property name="consumerGroup" value="concurrent_consumer"/> <property name="namesrvAddr" value="127.0.0.1:9876"/> <property name="messageListener" ref="consumerImplTest"/> <property name="subscription"> <map> <entry key="TopicTest"> <value>*</value> </entry> </map> </property> </bean>
大家应该注意到了消费者的配制中有一个org.klw.test.rocketMqTest.spring.ConsumerTestImpl,现在我们去实现它:
public class ConsumerImplTest implements MessageListenerConcurrently { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消费状态 //CONSUME_SUCCESS 消费成功 //RECONSUME_LATER 消费失败,需要稍后重新消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
到这里,一个简单的生产者就配制好了,一个简单的消费者也配制并实现了,下面来使用它们
接下来我们要创建2个JUnit测试类,一个是生产者一个是消费者
生产者:
@RunWith(JUnitPlatform.class) // org.junit.platform.runner.JUnitPlatform @ExtendWith(SpringExtension.class) // org.springframework.test.context.junit.jupiter.SpringExtension @ContextConfiguration({"classpath*:applicationContext-producer.xml"}) public class JUnitProducer { @Autowired private DefaultMQProducer producer; @Test public void producerData() throws InterruptedException { for (int i = 0; i < 10; i++) { // 发10条消息 try { Message msg = new Message("TopicTest", // topic "TagA", // tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); // 调用producer的send()方法发送消息 // 这里调用的是同步的方式,所以会有返回结果 SendResult sendResult = producer.send(msg); // 打印返回结果,可以看到消息发送的状态以及一些相关信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } } }
消费者:
@RunWith(JUnitPlatform.class) @ExtendWith(SpringExtension.class) @ContextConfiguration({"classpath*:applicationContext-consumer.xml"}) public class JUnitConsumer { @Test public void runConsumer() { System.out.println("Consumer Started."); // 下面的代码把线程阻塞住,这样就可以先运行消费者再运行生产者.当然不要也可以,不要的化就得先运行生产者, //再运行消费者,生产者先把消息发送到MQ上,消费者启动后从MQ上拿消息 synchronized (JUnitConsumer.class) { while (true) { try { JUnitConsumer.class.wait(); } catch (Throwable e) { e.printStackTrace(); } } } } }
代码差不多了,通过JUnit先运行消费者,再运行生产者,在消费者的控制台中能看到生产者发送的消息已经打印出来
写在结束
本文只是简单的描述了如何在spring中配制RocketMQ
作者也是初次接触RocketMQ,正在学习中,欢迎大家一起讨论学习
相关推荐
IT农场 2020-11-13
LCFlxfldy 2020-08-17
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30