activemq的使用

一.常用的消息队列:

1 activemq javaapache

2 rabbitmq cmq

3 kafuka 大数据mq

4 zeromq 简单版的mq

5 mateMq 基于amqp

6 RocketMQ 阿里

二.mq的使用

解压和启动mq


activeMq start

activemq的使用

 activemq的使用

 三 .mq的角色

producer消息的发送者

Comsumer消息的消费者

queue方式; 把消息发给activemq服务器,消费端监听到只要有一个执行完成其他就不会再执行了.

topic方式: 把消息发给activemq服务器,消费端监听到都会执行.

 activemq的使用

 activemq的使用

四: 加入Pom依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

五:整合activemq客户端到项目中

@Configurationpublic class ActiveMQConfig {    @Value("${spring.activemq.broker-url:disabled}")    String brokerURL ;    @Value("${activemq.listener.enable:disabled}")    String listenerEnable;    @Bean    public ActiveMQUtil getActiveMQUtil() throws JMSException {        if(brokerURL.equals("disabled")){            return null;        }        ActiveMQUtil activeMQUtil=new ActiveMQUtil();        activeMQUtil.init(brokerURL);        return  activeMQUtil;    }    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂    @Bean(name = "jmsQueueListener")    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();        if(!listenerEnable.equals("true")){            return null;        }        factory.setConnectionFactory(activeMQConnectionFactory);        //设置并发数        factory.setConcurrency("5");        //重连间隔时间        factory.setRecoveryInterval(5000L);        factory.setSessionTransacted(false);        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);        return factory;    }    @Bean    public ActiveMQConnectionFactory activeMQConnectionFactory ( ){/*        if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){            url=brokerURL;        }*/        ActiveMQConnectionFactory activeMQConnectionFactory =                new ActiveMQConnectionFactory(  brokerURL);        return activeMQConnectionFactory;    }}

public class ActiveMQUtil {    PooledConnectionFactory pooledConnectionFactory=null;    public ConnectionFactory init(String brokerUrl) {        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);        //加入连接池        pooledConnectionFactory=new PooledConnectionFactory(factory);        //出现异常时重新连接        pooledConnectionFactory.setReconnectOnException(true);        //        pooledConnectionFactory.setMaxConnections(5);        pooledConnectionFactory.setExpiryTimeout(10000);        return pooledConnectionFactory;    }    public ConnectionFactory getConnectionFactory(){        return pooledConnectionFactory;    }}

例子: 支付controller 层支付成功后,发送系统消息 =》 更新订单状态  锁定商品库存  物流订单等等,例如在支付的service层发送消息告诉订单需要更改状态, 然后在订单服务里面写component注解,里面实现监听更改业务即可.
@RequestMapping("alipay/callback/return")public String alipay_callback(Model model, HttpServletRequest request){    String alipay_trade_no = request.getParameter("trade_no");//支付宝的交易单号    String order_sn = request.getParameter("out_trade_no");// 外部订单号total_amount    String pay_amount = request.getParameter("total_amount");    // 更新支付信息    PaymentInfo paymentInfo = new PaymentInfo();    // 交易单号    // 支付状态    String payment_status = "已支付";    // 回调内容    String callback_content = request.getQueryString();    // 回调时间    Date callback_time = new Date();    paymentInfo.setOrderSn(order_sn);    paymentInfo.setPaymentStatus(payment_status);    paymentInfo.setCallbackTime(callback_time);    paymentInfo.setAlipayTradeNo(alipay_trade_no);    paymentInfo.setCallbackContent(callback_content);    paymentInfo.setTotalAmount(new BigDecimal(pay_amount));    paymentService.updatePayment(paymentInfo);    // 发送系统消息 =》 更新订单状态  锁定商品库存  物流订单等等    paymentService.sendPaymentResult(paymentInfo);    return "finish";}

@AutowiredActiveMQUtil activeMQUtil;@Overridepublic void sendPaymentResult(PaymentInfo paymentInfo) {    ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();    Connection connection = null;    Session session = null;// 开启消息事务    Queue paymentResultQueue = null; // 队列    MessageProducer producer = null;    try {        connection = connectionFactory.createConnection();        connection.start();        session = connection.createSession(true, Session.SESSION_TRANSACTED);        paymentResultQueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");        //text文本格式,map键值格式        MapMessage mapMessage=new ActiveMQMapMessage();        mapMessage.setString("out_trade_no",paymentInfo.getOrderSn());        mapMessage.setDouble("pay_amount",paymentInfo.getTotalAmount().doubleValue());        producer = session.createProducer(paymentResultQueue);// 消息的生成者        producer.send(mapMessage);        session.commit();    } catch (JMSException e) {        e.printStackTrace();    }finally {        try {            producer.close();            session.close();            connection.close();        } catch (JMSException e) {            e.printStackTrace();        }    }}

@Componentpublic class OrderConsumer {    @Autowired    OrderService orderService;    @JmsListener(containerFactory = "jmsQueueListener",destination = "PAYMENT_SUCCESS_QUEUE")    public void consumePaymentSuccess(MapMessage mapMessage) throws JMSException {        String out_trade_no = mapMessage.getString("out_trade_no");        Double pay_amount = mapMessage.getDouble("pay_amount");        // 根据支付状态,更新订单信息        OmsOrder omsOrder = new OmsOrder();        omsOrder.setPayAmount(new BigDecimal(pay_amount));        omsOrder.setPaymentTime(new Date());        omsOrder.setOrderSn(out_trade_no);        omsOrder.setStatus("1");        orderService.updateOrder(omsOrder);        System.out.println("已监听到"+out_trade_no+"号订单,订单消费PAYMENT_SUCCESS_QUEUE队列");    }}
 

相关推荐