springboot 整合activeMQ
yml配置文件
- spring:
- activemq:
- broker-url: tcp://127.0.0.1:61616
- in-memory: true
- user: admin
- password: admin
- pool:
- enabled: false
- use-exponential-back-off: true # 是否在每次尝试重新发送失败后,增长这个等待时间
- maximum-redeliveries: 10 # 重发次数,默认为6次 这里设置为10次
- initial-redelivery-delay: 1 # 重发时间间隔,默认为1秒
- back-off-multiplier: 2 # 第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
- use-collision-avoidance: false # 是否避免消息碰撞
- maximum-redelivery-delay: -1 # 设置重发最大拖延时间-1 表示没有拖延只有use-collision-avoidance(true)为true时生效
ActiveMQConfig配置
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.RedeliveryPolicy;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.jms.annotation.EnableJms;
- import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
- import org.springframework.jms.config.JmsListenerContainerFactory;
- import org.springframework.jms.core.JmsTemplate;
- /**
- * @description: active配置类
- * @author: Mr.ZHAO
- * @cereate: 2018/08/23 09:20:50
- */
- @EnableJms
- @Configuration
- public class ActiveMQConfig {
- @Value("${spring.activemq.user}")
- private String user;
- @Value("${spring.activemq.password}")
- private String password;
- @Value("${spring.activemq.broker-url}")
- private String brokerUrl;
- @Value("${spring.activemq.use-exponential-back-off}")
- private Boolean useExponentialBackOff;
- @Value("${spring.activemq.maximum-redeliveries}")
- private Integer maximumRedeliveries;
- @Value("${spring.activemq.initial-redelivery-delay}")
- private Integer initialRedeliveryDelay;
- @Value("${spring.activemq.back-off-multiplier}")
- private Integer backOffMultiplier;
- @Value("${spring.activemq.use-collision-avoidance}")
- private Boolean useCollisionAvoidance;
- @Value("${spring.activemq.maximum-redelivery-delay}")
- private Integer maximumRedeliveryDelay;
- @Bean
- public RedeliveryPolicy redeliveryPolicy() {
- RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
- //是否在每次尝试重新发送失败后,增长这个等待时间
- redeliveryPolicy.setUseExponentialBackOff(useExponentialBackOff);
- //重发次数,默认为6次 这里设置为10次
- redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
- //重发时间间隔,默认为1秒
- redeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelay);
- //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
- redeliveryPolicy.setBackOffMultiplier(backOffMultiplier);
- //是否避免消息碰撞
- redeliveryPolicy.setUseCollisionAvoidance(useCollisionAvoidance);
- //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
- redeliveryPolicy.setMaximumRedeliveryDelay(maximumRedeliveryDelay);
- return redeliveryPolicy;
- }
- @Bean
- public ActiveMQConnectionFactory activeMQConnectionFactory(RedeliveryPolicy redeliveryPolicy) {
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(user, password, brokerUrl);
- activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
- return activeMQConnectionFactory;
- }
- @Bean
- public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
- JmsTemplate jmsTemplate = new JmsTemplate();
- //进行持久化配置 1表示非持久化,2表示持久化
- jmsTemplate.setDeliveryMode(2);
- jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
- //客户端签收模式
- jmsTemplate.setSessionAcknowledgeMode(4);
- return jmsTemplate;
- }
- @Bean
- public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory activeMQConnectionFactory) {
- DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
- bean.setPubSubDomain(true);
- bean.setConnectionFactory(activeMQConnectionFactory);
- return bean;
- }
- @Bean
- public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory activeMQConnectionFactory) {
- DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
- bean.setConnectionFactory(activeMQConnectionFactory);
- return bean;
- }
- }
生产者
- import com.renren.common.utils.JsonUtils;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.activemq.command.ActiveMQTopic;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.stereotype.Service;
- /**
- * @description: 生产者
- * @author: Mr.ZHAO
- * @cereate: 2018/08/23 09:30:42
- */
- @Service
- public class ActiveProducer {
- @Autowired
- private JmsTemplate jmsTemplate;
- /**
- * 发送队列
- *
- * @param queue 队列名
- * @param message 信息内容
- */
- public void sendQueue(final String queue, final Object message) {
- this.jmsTemplate.convertAndSend(new ActiveMQQueue(queue), JsonUtils.objectToJson(message));
- }
- /**
- * 发送主题
- *
- * @param topic 主题名
- * @param message 主题消息
- */
- public void sendTopic(final String topic, final Object message) {
- this.jmsTemplate.convertAndSend(new ActiveMQTopic(topic), JsonUtils.objectToJson(message));
- }
- }
消费者
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Component;
- /**
- * @description: 消费者
- * @author: Mr.ZHAO
- * @cereate: 2018/08/23 09:32:48
- */
- @Component
- public class ActiveConsumer {
- /**
- * 消费话题(topic)
- *
- * @param text
- */
- @JmsListener(destination = "topic_test", containerFactory = "jmsListenerContainerTopic")
- public void receiveTopic2(String text) {
- System.out.println("Topic Consumer2:" + text);
- }
- /***
- * 消费队列(queue)
- * @param text
- */
- @JmsListener(destination = "queue_test")
- public void reviceQueue(String text) {
- System.out.println("Queue Consumer:" + text);
- }
- }
测试
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class DynamicDataSourceTest {
- @Autowired
- private ActiveProducer producer;
- @Test
- public void MQ() {
- producer.sendQueue("queue_test", "你好");
- producer.sendTopic("topic_test", "你------好");
- while (true) {
- }
- }
- }
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30