SpringBoot ActiveMq JmsTemplate 异步发送、非持久化

ActiveMq事务

ActiveMq事务的作用就是在发送、接收处理消息过程中,如果出现问题,可以回滚。

ActiveMq异步/同步发送

以下摘抄自https://blog.csdn.net/songhai...

同步发送:
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。

异步发送
如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。

当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。

想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true
如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步
     1) 当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”
     2) 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。
总结:默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。
   jms.sendTimeout:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送!

官方连接:http://activemq.apache.org/as...

配置使用异步发送方式

1.在连接上配置
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

2.通过ConnectionFactory
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

3.通过connection
((ActiveMQConnection)connection).setUseAsyncSend(true);

SpringBoot JMS实现异步发送

1.如果在配置中使用了连接池,那么SpringBoot默认会使用PooledConnectionFactory,ActiveMQConnectionFactory的useAsyncSend默认会true。使用连接池配置如下

activemq:
    in-memory: true
    broker-url: tcp://127.0.0.1:61616
    pool:
      enabled: true
      max-connections: 5
    user:
    password:

2.修改JmsTemplate 默认参数

JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
//设备为true,deliveryMode, priority, timeToLive等设置才会起作用
template.setExplicitQosEnabled(true);
//设为非持久化模式
template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

完整代码如下:

@Slf4j
@Configuration
public class ActiveConfig {
    /**
     * 配置用于异步发送的非持久化JmsTemplate
     */ 
    @Autowired
    @Bean
    @Primary
    public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
        JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
        template.setExplicitQosEnabled(true);
        template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        log.info("jsmtemplate ------------->sessionTransacted:{}",template.isSessionTransacted());
        log.info("jsmtemplate ------------->ExplicitQosEnabled:{}",template.isExplicitQosEnabled());
        return template;
    }

    /**
     * 配置用于同步发送的持久化JmsTemplate
     */  
    @Autowired
    @Bean
    public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
        JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
        log.info("jsmtemplate ------------->sessionTransacted:{}",template.isSessionTransacted());
        log.info("jsmtemplate ------------->ExplicitQosEnabled:{}",template.isExplicitQosEnabled());
        return template;
    }

//如果对于SpringBoot自动生成的PooledConnectionFactory需要调优,可以自己生PooledConnectionFactory调优参数
//    private PooledConnectionFactory getPooledConnectionFactory(String userName,String password,String brokerURL) {
//        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);
//        ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
//        activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
//        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
//        pooledConnectionFactory.setMaxConnections(5);
//        return pooledConnectionFactory;
//    }

相关推荐