SpringBoot ActiveMq JmsTemplate 异步发送、非持久化
ActiveMq事务
ActiveMq事务的作用就是在发送、接收处理消息过程中,如果出现问题,可以回滚。
ActiveMq异步/同步发送
以下摘抄自https://blog.csdn.net/songhai...
同步发送:
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。
异步发送
如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。
当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。
想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true
如果设置了alwaysSyncSend=true系统将会忽略useAsyncSend设置的值都采用同步
1) 当alwaysSyncSend=false时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”
2) 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。
总结:默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。
jms.sendTimeout:发送超时时间,默认等于0,如果jms.sendTimeout>0将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送!
官方连接:http://activemq.apache.org/as...
配置使用异步发送方式
1.在连接上配置
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
2.通过ConnectionFactory
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3.通过connection
((ActiveMQConnection)connection).setUseAsyncSend(true);
SpringBoot JMS实现异步发送
1.如果在配置中使用了连接池,那么SpringBoot默认会使用PooledConnectionFactory,ActiveMQConnectionFactory的useAsyncSend默认会true。使用连接池配置如下
activemq: in-memory: true broker-url: tcp://127.0.0.1:61616 pool: enabled: true max-connections: 5 user: password:
2.修改JmsTemplate 默认参数
JmsTemplate template = new JmsTemplate(pooledConnectionFactory); //设备为true,deliveryMode, priority, timeToLive等设置才会起作用 template.setExplicitQosEnabled(true); //设为非持久化模式 template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
完整代码如下:
@Slf4j @Configuration public class ActiveConfig { /** * 配置用于异步发送的非持久化JmsTemplate */ @Autowired @Bean @Primary public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) { JmsTemplate template = new JmsTemplate(pooledConnectionFactory); template.setExplicitQosEnabled(true); template.setDeliveryMode(DeliveryMode.NON_PERSISTENT); log.info("jsmtemplate ------------->sessionTransacted:{}",template.isSessionTransacted()); log.info("jsmtemplate ------------->ExplicitQosEnabled:{}",template.isExplicitQosEnabled()); return template; } /** * 配置用于同步发送的持久化JmsTemplate */ @Autowired @Bean public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) { JmsTemplate template = new JmsTemplate(pooledConnectionFactory); log.info("jsmtemplate ------------->sessionTransacted:{}",template.isSessionTransacted()); log.info("jsmtemplate ------------->ExplicitQosEnabled:{}",template.isExplicitQosEnabled()); return template; } //如果对于SpringBoot自动生成的PooledConnectionFactory需要调优,可以自己生PooledConnectionFactory调优参数 // private PooledConnectionFactory getPooledConnectionFactory(String userName,String password,String brokerURL) { // ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL); // ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy(); // activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy); // PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); // pooledConnectionFactory.setMaxConnections(5); // return pooledConnectionFactory; // }