Spring中使用ActiveMQ
一、Spring提供的 JMS 相关接口
ConnectionFactory接口
是用来管理JMS服务厂商提供的ConnectionFactory的。在此案例中管理ActiveMQ的ActiveMQConnectionFactory。Spring提供了下面两个ConnectionFactory的实现:SingleConnectionFactory
对于建立JMS服务器连接的请求只会返回一个同一个Connection,也就是说在整个应用中只会使用一个连接进行操作。CachingConnectionFactory
继承了 SingleConnectionFactory 所以它拥有SingleConnectionFactory 的所有的功能,同时还新增了缓存功能,缓存会话 producer和consumer
JmsTemplate
用于发送和接收消息的模板类。- JmsTemplate 是线程安全的,可以在整个应用范围使用,但是并不代表整个应用中只能使用一个,我们可以创建多个。
- 是 Spring 提供的,只需要向Spring容器内注册这个类就可以使用 JmsTemplate 方便操作 jms ;
MessageListener 消息监听器,用于接收消息
- 实现一个onMessage方法,该方法只接收一个Message参数。
二、 Spring 配置
1. 整体项目结构:
2. pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.song.jms</groupId> <artifactId>spring-jms</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spring.version>4.3.7.RELEASE</spring.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> </project>
3. spring-common.xml(公共配置)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- ActiveMQ为我们提供的ConnectionFactory --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> </bean> <!-- spring jms 为我们提供的连接池 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- 一个队列的目的地,点对点 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue"/> </bean> </beans>
4. spring-producer.xml(生产者配置)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 添加注解扫描包 --> <context:component-scan base-package="com.song.jms.producer"/> <!-- 导入公共配置 --> <import resource="spring-common.xml"/> <!-- 配置JmsTemplate,用于发送消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
5. spring-consumer.xml(消费者配置)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 添加注解扫描包 --> <context:component-scan base-package="com.song.jms.consumer"/> <!-- 导入公共配置 --> <import resource="spring-common.xml"/> <!-- 配置消息监听容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 消息监听地址 --> <property name="destination" ref="queueDestination"/> <!-- 消息监听器 使用@Component来注入的consumerMessageListener--> <property name="messageListener" ref="consumerMessageListener"/> </bean> </beans>
三、队列消息模式
1. 创建生产者服务
接口
public interface ProducerService { void sendMessage(String message); }
实现
/** * 生产者服务 */ @Service public class ProducerServiceImpl implements ProducerService { @Autowired JmsTemplate jmsTemplate; @Resource(name = "queueDestination") private Destination queueDestination; public void sendMessage(final String message) { jmsTemplate.send(queueDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); System.out.println("发送消息 = [" + textMessage.getText() + "]"); return textMessage; } }); } }
2. 创建生产者启动类
/** * 生产者服务的启动类 */ public class AppProducer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-producer.xml"); ProducerService service = context.getBean(ProducerService.class); for (int i = 0; i < 10; i++) { service.sendMessage("test" + i); } context.close(); } }
2. 创建消费者
消息监听器
/** * 消费者的消息监听器 */ @Component public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收message: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
消费者启动类
/** * 消费者启动类 */ public class AppConsumer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("spring-consumer.xml"); } }
3. 启动消费者和生产者
分别启动消费者和生产者的main方法,在ActiveMQ的后台中查看状态:http://localhost:8161
四、主题消息模式
1.修改spring-common.xml
在spring-common.xml中加入:
<!-- 一个主题目的地,发布订阅模式 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic"/> </bean>
2. 修改ProducerServiceImpl
将 ProducerServiceImpl 中的:
@Resource(name = "queueDestination") Destination destination;
改为
@Resource(name = "topicDestination") Destination destination;
3. 修改spring-consumer.xml
将
<!-- 消息监听地址 --> <property name="destination" ref="queueDestination"/>
修改成:
<!-- 消息监听地址 --> <property name="destination" ref="topicDestination"/>
只需要以上3步即可将 队列模式 改为 主题模式 。
相关推荐
胡献根 2020-07-18
胡献根 2020-07-05
jiangtie 2020-06-10
onlylixiaobei 2020-06-09
xinglun 2020-06-02
方新德 2020-05-31
Java高知 2020-05-20
Java高知 2020-05-08
Java高知 2020-05-03
onlylixiaobei 2020-05-02
Java高知 2020-04-22
胡献根 2020-04-22
heweiyabeijing 2020-04-21
方新德 2020-04-20
胡献根 2020-04-10
onlylixiaobei 2020-04-10
方新德 2020-04-08
xuedabao 2020-03-30