activemq的使用
一.常用的消息队列:
1 activemq java,apache
2 rabbitmq c的mq
3 kafuka 大数据mq
4 zeromq 简单版的mq
5 mateMq 基于amqp
6 RocketMQ 阿里
二.mq的使用
1 解压和启动mq
activeMq start
三 .mq的角色
producer消息的发送者
Comsumer消息的消费者
queue方式; 把消息发给activemq服务器,消费端监听到只要有一个执行完成其他就不会再执行了.
topic方式: 把消息发给activemq服务器,消费端监听到都会执行.
四: 加入Pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
五:整合activemq客户端到项目中
@Configurationpublic class ActiveMQConfig { @Value("${spring.activemq.broker-url:disabled}") String brokerURL ; @Value("${activemq.listener.enable:disabled}") String listenerEnable; @Bean public ActiveMQUtil getActiveMQUtil() throws JMSException { if(brokerURL.equals("disabled")){ return null; } ActiveMQUtil activeMQUtil=new ActiveMQUtil(); activeMQUtil.init(brokerURL); return activeMQUtil; } //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂 @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); if(!listenerEnable.equals("true")){ return null; } factory.setConnectionFactory(activeMQConnectionFactory); //设置并发数 factory.setConcurrency("5"); //重连间隔时间 factory.setRecoveryInterval(5000L); factory.setSessionTransacted(false); factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return factory; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory ( ){/* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){ url=brokerURL; }*/ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( brokerURL); return activeMQConnectionFactory; }}
public class ActiveMQUtil { PooledConnectionFactory pooledConnectionFactory=null; public ConnectionFactory init(String brokerUrl) { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); //加入连接池 pooledConnectionFactory=new PooledConnectionFactory(factory); //出现异常时重新连接 pooledConnectionFactory.setReconnectOnException(true); // pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setExpiryTimeout(10000); return pooledConnectionFactory; } public ConnectionFactory getConnectionFactory(){ return pooledConnectionFactory; }}
例子: 支付controller 层支付成功后,发送系统消息 =》 更新订单状态 锁定商品库存 物流订单等等,例如在支付的service层发送消息告诉订单需要更改状态, 然后在订单服务里面写component注解,里面实现监听更改业务即可.
@RequestMapping("alipay/callback/return")public String alipay_callback(Model model, HttpServletRequest request){ String alipay_trade_no = request.getParameter("trade_no");//支付宝的交易单号 String order_sn = request.getParameter("out_trade_no");// 外部订单号total_amount String pay_amount = request.getParameter("total_amount"); // 更新支付信息 PaymentInfo paymentInfo = new PaymentInfo(); // 交易单号 // 支付状态 String payment_status = "已支付"; // 回调内容 String callback_content = request.getQueryString(); // 回调时间 Date callback_time = new Date(); paymentInfo.setOrderSn(order_sn); paymentInfo.setPaymentStatus(payment_status); paymentInfo.setCallbackTime(callback_time); paymentInfo.setAlipayTradeNo(alipay_trade_no); paymentInfo.setCallbackContent(callback_content); paymentInfo.setTotalAmount(new BigDecimal(pay_amount)); paymentService.updatePayment(paymentInfo); // 发送系统消息 =》 更新订单状态 锁定商品库存 物流订单等等 paymentService.sendPaymentResult(paymentInfo); return "finish";}
@AutowiredActiveMQUtil activeMQUtil;@Overridepublic void sendPaymentResult(PaymentInfo paymentInfo) { ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory(); Connection connection = null; Session session = null;// 开启消息事务 Queue paymentResultQueue = null; // 队列 MessageProducer producer = null; try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true, Session.SESSION_TRANSACTED); paymentResultQueue = session.createQueue("PAYMENT_SUCCESS_QUEUE"); //text文本格式,map键值格式 MapMessage mapMessage=new ActiveMQMapMessage(); mapMessage.setString("out_trade_no",paymentInfo.getOrderSn()); mapMessage.setDouble("pay_amount",paymentInfo.getTotalAmount().doubleValue()); producer = session.createProducer(paymentResultQueue);// 消息的生成者 producer.send(mapMessage); session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally { try { producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } }}
@Componentpublic class OrderConsumer { @Autowired OrderService orderService; @JmsListener(containerFactory = "jmsQueueListener",destination = "PAYMENT_SUCCESS_QUEUE") public void consumePaymentSuccess(MapMessage mapMessage) throws JMSException { String out_trade_no = mapMessage.getString("out_trade_no"); Double pay_amount = mapMessage.getDouble("pay_amount"); // 根据支付状态,更新订单信息 OmsOrder omsOrder = new OmsOrder(); omsOrder.setPayAmount(new BigDecimal(pay_amount)); omsOrder.setPaymentTime(new Date()); omsOrder.setOrderSn(out_trade_no); omsOrder.setStatus("1"); orderService.updateOrder(omsOrder); System.out.println("已监听到"+out_trade_no+"号订单,订单消费PAYMENT_SUCCESS_QUEUE队列"); }}