Spring 整合 ActiveMq

Spring 整合 ActiveMq

整合步骤如下:

  1. 添加依赖
  2. 连接 mq 消息服务器
  3. 定义生产者/消费者
  4. 发送/接收消息

添加依赖

<properties>
    <spring_version>4.2.4.RELEASE</spring_version>
  </properties>

  <dependencies>
    <!--Spring-->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring_version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>${spring_version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.2.4.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring_version}</version>
    </dependency>
    <!--ActiveMq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.3</version>
    </dependency>

    <!--<dependency>-->
      <!--<groupId>org.apache.commons</groupId>-->
      <!--<artifactId>commons-pool2</artifactId>-->
      <!--<version>2.5</version>-->
    <!--</dependency>-->

    <!--servlet-->
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>servlet-api</artifactId>
      <version>RELEASE</version>
      <scope>provided</scope>
    </dependency>
    <!--Test-->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <!--fast Json-->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.47</version>
    </dependency>
  </dependencies>

连接服务器

定义一个类实现 ConnectionFactory 接口,类属性包括 brokerURL(服务器地址),userName,password,maxConnection(最大连接数, 连接池使用参数);

/**
 * author: getthrough
 * date: 2018/3/8
 * description: 连接工厂的包装类
 */
public class ActiveMqConnectionFactoryDecoration implements ConnectionFactory {
    /**apache 提供的连接池*/
//    private PooledConnectionFactory pooledConnectionFactory;
    private String brokerURL;
    private String userName;
    private String password;
    private String maxConntection;

    private ActiveMQConnectionFactory activeMQConnectionFactory;

    public ActiveMqConnectionFactoryDecoration() {
    }

    public void run() throws JMSException {
        activeMQConnectionFactory.setBrokerURL(brokerURL);
        activeMQConnectionFactory.setUserName(userName);
        activeMQConnectionFactory.setPassword(password);
        activeMQConnectionFactory.createConnection();
//        pooledConnectionFactory = new PooledConnectionFactory();
//        pooledConnectionFactory.setMaxConnections(Integer.parseInt(maxConntection));
//        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
//        pooledConnectionFactory.createConnection(userName, password);
    }

    public void stop() {
//        if (null != pooledConnectionFactory) {
//            pooledConnectionFactory.stop();
//        }
    }

    public Connection createConnection() throws JMSException {
//        return pooledConnectionFactory.createConnection();
        return activeMQConnectionFactory.createConnection();
    }

    public Connection createConnection(String userName, String password) throws JMSException {
//        return pooledConnectionFactory.createConnection(userName, password);
        return activeMQConnectionFactory.createConnection(userName, password);
    }

    public void setActiveMQConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        this.activeMQConnectionFactory = activeMQConnectionFactory;
    }

// ... 其他属性的getters&setters

在 spring 配置文件中定义这个 bean:

<bean id="activeMqConnectionFactoryDecoration " class="mq.ActiveMqConnectionFactoryDecoration ">
        <property name="activeMQConnectionFactory">
            <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/>
        </property>
        <property name="brokerURL" value="${brokerURL}"/>
        <property name="userName" value="${userName}"/>
        <property name="password" value="${password}"/>
        <property name="maxConntection" value="${maxConntection}"/>
    </bean>

创建生产者/消费者

 创建一个消息发送类, 简单包装下发送消息流程

/**
 * author: getthrough
 * date: 2018/3/8
 * description:
 */
public class ActiveMqSender {
    private static JmsTemplate jmsTemplate;
    private static Destination destination;

    public static void sendMqMessage(final String content) {
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(content);
            }
        });
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }
}

 创建一个监听器, 实现 MessageListener 接口

/**
 * author: getthrough
 * date: 2018/3/8
 * description:
 */
// 此处ParentMessageListener实现了 MessageListener接口
public class QueueMessageListener extends ParentMessageListener {
    private Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);

    /**
     * 消息前处理
     */
    public void beforeHandling() {
        // doSomething ...
        logger.info("before hanlding queue msg ...");
    }

    /**
     * 消息后处理
     */
    public void afterHandling() {
        // doSomething ...
        logger.info("after hanlding queue msg ...");
    }

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############");
        } catch (JMSException e) {
            logger.info("########## failed to get message text! ###########");
            e.printStackTrace();
        }
    }
}
 

相关推荐