Spring 整合 ActiveMq
Spring 整合 ActiveMq
整合步骤如下:
- 添加依赖
- 连接 mq 消息服务器
- 定义生产者/消费者
- 发送/接收消息
添加依赖
<properties> <spring_version>4.2.4.RELEASE</spring_version> </properties> <dependencies> <!--Spring--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring_version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring_version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring_version}</version> </dependency> <!--ActiveMq--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.commons</groupId>--> <!--<artifactId>commons-pool2</artifactId>--> <!--<version>2.5</version>--> <!--</dependency>--> <!--servlet--> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>RELEASE</version> <scope>provided</scope> </dependency> <!--Test--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--fast Json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies>
连接服务器
定义一个类实现 ConnectionFactory 接口,类属性包括 brokerURL(服务器地址),userName,password,maxConnection(最大连接数, 连接池使用参数);
/** * author: getthrough * date: 2018/3/8 * description: 连接工厂的包装类 */ public class ActiveMqConnectionFactoryDecoration implements ConnectionFactory { /**apache 提供的连接池*/ // private PooledConnectionFactory pooledConnectionFactory; private String brokerURL; private String userName; private String password; private String maxConntection; private ActiveMQConnectionFactory activeMQConnectionFactory; public ActiveMqConnectionFactoryDecoration() { } public void run() throws JMSException { activeMQConnectionFactory.setBrokerURL(brokerURL); activeMQConnectionFactory.setUserName(userName); activeMQConnectionFactory.setPassword(password); activeMQConnectionFactory.createConnection(); // pooledConnectionFactory = new PooledConnectionFactory(); // pooledConnectionFactory.setMaxConnections(Integer.parseInt(maxConntection)); // pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory); // pooledConnectionFactory.createConnection(userName, password); } public void stop() { // if (null != pooledConnectionFactory) { // pooledConnectionFactory.stop(); // } } public Connection createConnection() throws JMSException { // return pooledConnectionFactory.createConnection(); return activeMQConnectionFactory.createConnection(); } public Connection createConnection(String userName, String password) throws JMSException { // return pooledConnectionFactory.createConnection(userName, password); return activeMQConnectionFactory.createConnection(userName, password); } public void setActiveMQConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) { this.activeMQConnectionFactory = activeMQConnectionFactory; } // ... 其他属性的getters&setters
在 spring 配置文件中定义这个 bean:
<bean id="activeMqConnectionFactoryDecoration " class="mq.ActiveMqConnectionFactoryDecoration "> <property name="activeMQConnectionFactory"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/> </property> <property name="brokerURL" value="${brokerURL}"/> <property name="userName" value="${userName}"/> <property name="password" value="${password}"/> <property name="maxConntection" value="${maxConntection}"/> </bean>
创建生产者/消费者
创建一个消息发送类, 简单包装下发送消息流程
/** * author: getthrough * date: 2018/3/8 * description: */ public class ActiveMqSender { private static JmsTemplate jmsTemplate; private static Destination destination; public static void sendMqMessage(final String content) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(content); } }); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void setDestination(Destination destination) { this.destination = destination; } }
创建一个监听器, 实现 MessageListener 接口
/** * author: getthrough * date: 2018/3/8 * description: */ // 此处ParentMessageListener实现了 MessageListener接口 public class QueueMessageListener extends ParentMessageListener { private Logger logger = LoggerFactory.getLogger(QueueMessageListener.class); /** * 消息前处理 */ public void beforeHandling() { // doSomething ... logger.info("before hanlding queue msg ..."); } /** * 消息后处理 */ public void afterHandling() { // doSomething ... logger.info("after hanlding queue msg ..."); } @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############"); } catch (JMSException e) { logger.info("########## failed to get message text! ###########"); e.printStackTrace(); } } }
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30