SpringBoot之ActiveMQ实现延迟消息
一、安装activeMQ
安装步骤参照网上教程,本文不做介绍
二、修改activeMQ配置文件
broker新增配置信息 schedulerSupport="true"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" > <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" > <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
三、创建SpringBoot工程
]()
配置ActiveMQ工厂信息,信任包必须配置否则会报错
package com.example.demoactivemq.config; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.ArrayList; import java.util.List; /** * @author shanks on 2019-11-12 */ @Configuration public class ActiveMqConfig { @Bean public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); // 设置信任序列化包集合 List<String> models = new ArrayList<>(); models.add("com.example.demoactivemq.domain"); factory.setTrustedPackages(models); return factory; } }
消息实体类
package com.example.demoactivemq.domain; import lombok.Builder; import lombok.Data; import java.io.Serializable; /** * @author shanks on 2019-11-12 */ @Builder @Data public class MessageModel implements Serializable { private String titile; private String message; }
生产者
package com.example.demoactivemq.producer; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ScheduledMessage; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jms.JmsProperties; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.*; import java.io.Serializable; /** * 消息生产者 * * @author shanks */ @Service @Slf4j public class Producer { public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue"); @Autowired private JmsMessagingTemplate template; /** * 发送消息 * * @param destination destination是发送到的队列 * @param message message是待发送的消息 */ public <T extends Serializable> void send(Destination destination, T message) { template.convertAndSend(destination, message); } /** * 延时发送 * * @param destination 发送的队列 * @param data 发送的消息 * @param time 延迟时间 */ public <T extends Serializable> void delaySend(Destination destination, T data, Long time) { Connection connection = null; Session session = null; MessageProducer producer = null; // 获取连接工厂 ConnectionFactory connectionFactory = template.getConnectionFactory(); try { // 获取连接 connection = connectionFactory.createConnection(); connection.start(); // 获取session,true开启事务,false关闭事务 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 producer = session.createProducer(destination); producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue()); ObjectMessage message = session.createObjectMessage(data); //设置延迟时间 message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); // 发送消息 producer.send(message); log.info("发送消息:{}", data); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (Exception e) { e.printStackTrace(); } } } }
消费者
package com.example.demoactivemq.producer; import com.example.demoactivemq.domain.MessageModel; import lombok.extern.slf4j.Slf4j; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; /** * 消费者 */ @Component @Slf4j public class Consumer { @JmsListener(destination = "delay.queue") public void receiveQueue(MessageModel message) { log.info("收到消息:{}", message); } }
- application.yml
spring: activemq: broker-url: tcp://localhost:61616
- 测试类
package com.example.demoactivemq; import com.example.demoactivemq.domain.MessageModel; import com.example.demoactivemq.producer.Producer; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = DemoActivemqApplication.class) @RunWith(SpringRunner.class) class DemoActivemqApplicationTests { /** * 消息生产者 */ @Autowired private Producer producer; /** * 及时消息队列测试 */ @Test public void test() { MessageModel messageModel = MessageModel.builder() .message("测试消息") .titile("消息000") .build(); // 发送消息 producer.send(Producer.DEFAULT_QUEUE, messageModel); } /** * 延时消息队列测试 */ @Test public void test2() { for (int i = 0; i < 5; i++) { MessageModel messageModel = MessageModel.builder() .titile("延迟10秒执行") .message("测试消息" + i) .build(); // 发送延迟消息 producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L); } try { // 休眠100秒,等等消息执行 Thread.currentThread().sleep(100000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果
2019-11-12 22:18:52.939 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息0) 2019-11-12 22:18:52.953 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息1) 2019-11-12 22:18:52.958 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息2) 2019-11-12 22:18:52.964 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息3) 2019-11-12 22:18:52.970 INFO 17263 --- [ main] c.e.demoactivemq.producer.Producer : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息4) 2019-11-12 22:19:03.012 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息0) 2019-11-12 22:19:03.017 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息1) 2019-11-12 22:19:03.019 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息2) 2019-11-12 22:19:03.020 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息3) 2019-11-12 22:19:03.021 INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息4)
比你优秀的人比你还努力,你有什么资格不去奋斗!!!
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30