Spring-ActiveMQ的点对点和Topic
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.activemq</groupId> <artifactId>activemq-test</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>activemq-test</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <org.springframework.version>3.1.1.RELEASE</org.springframework.version> </properties> <repositories> <repository> <id>kxcomm-maven</id> <name>Maven kxcomm Repository</name> <url>http://122.13.0.56:8088/nexus/content/groups/public/</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-asm</artifactId> <version>${org.springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${org.springframework.version}</version> </dependency> <dependency> <groupId>com.davidkarlsen.commonstransaction.spring</groupId> <artifactId>commons-transaction-spring</artifactId> <version>0.9</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.2-beta1</version> </dependency> <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>0.9.27</version> <scope>compile</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>0.9.27</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.6.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> </dependencies> </project>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd" default-autowire="byName" default-lazy-init="true"> <import resource="activemq-test.xml"/> </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- 创建工厂连接 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="rantzDestination" /> </bean> <!-- Point-to-Point --> <!-- activeMQ消息目标 队列 --> <bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg> </bean> <!-- activeMQ消息目标 主题--> <!-- <bean id="rantzDestination" class="org.apache.activemq.command.ActiveMQTopic">--> <!-- <constructor-arg index="0" value="rantz.marketing.queue"></constructor-arg>--> <!-- </bean>--> <bean id="producer" class="activemq.test.p2p.producer.RantzMarketingGatewayImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="destination" ref="rantzDestination" /> </bean> <bean id="consumer" class="activemq.test.p2p.consumer.MarketingReceiverGatewayImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> <!-- Point-to-Point End--> <!-- Topic --> <bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.topic" /> </bean> <bean id="control" class="org.apache.activemq.command.ActiveMQTopic" autowire="constructor"> <constructor-arg index="0" value="kxcomm.mms.control" /> </bean> <bean id="myListener" class="activemq.test.topic.MyListener"> <property name="connectionFactory" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <bean id="myPublisher" class="activemq.test.topic.MyPublisher"> <property name="connectionFactory" ref="connectionFactory" /> <property name="topic" ref="topic" /> <property name="control" ref="control" /> </bean> <!-- Topic End--> </beans>
package activemq.test.model; import java.io.Serializable; public class User implements Serializable{ private static final long serialVersionUID = -3098636047897519268L; private String name; private String sex; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "User [name=" + name + ", sex=" + sex + ", age=" + age + "]"; } }
PTP模型
PTP(Point-to-Point)模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMSProvider提供工具管理队列的创建、删除。JMSPTP模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。
package activemq.test.p2p.consumer; import org.springframework.jms.core.JmsTemplate; import activemq.test.model.User; public class MarketingReceiverGatewayImpl { private JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public MarketingReceiverGatewayImpl() { } public void receiveMotorist() throws Exception{ User message = (User)jmsTemplate.receiveAndConvert(); System.out.println("reviced msg is:" + message.toString()); } }
package activemq.test.p2p.consumer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartConsumer { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MarketingReceiverGatewayImpl rantzMarketingGateway= (MarketingReceiverGatewayImpl) context.getBean("consumer"); System.out.println("Receive Start ..."); try { while(true){ rantzMarketingGateway.receiveMotorist(); } } catch (Exception e) { e.printStackTrace(); } } }
package activemq.test.p2p.producer; public interface IRantzMarketingGateway { /** * * 发送文本对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */ public void sendMotoristInfo(); /** * * 发送对象 * * @author zhangjh 新增日期:2013-9-20 * @since smsc-gateway */ public void sendObjectInfo(); }
package activemq.test.p2p.producer; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import activemq.test.model.User; public class RantzMarketingGatewayImpl implements IRantzMarketingGateway { private JmsTemplate jmsTemplate; private Destination destination; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getDestination() { return destination; } public void setDestination(Destination destination) { this.destination = destination; } public void sendMotoristInfo() { MessageCreator msg = new MessageCreator(){ public Message createMessage(Session session) throws JMSException { return session.createTextMessage("这是一个测试,"+System.currentTimeMillis()); } }; jmsTemplate.send(destination, msg); } public void sendObjectInfo() { User u = new User(); u.setAge(17); u.setName("yuky"+System.currentTimeMillis()); u.setSex("女"); jmsTemplate.convertAndSend(u); } }
package activemq.test.p2p.producer; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartProducer { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); IRantzMarketingGateway rantzMarketingGateway= (RantzMarketingGatewayImpl) context.getBean("producer"); for(int i=0;i<10;i++){ rantzMarketingGateway.sendObjectInfo(); System.out.println("Start ..."); } } }
PUB/SUB模型
消息订阅分为非持久订阅(non-durablesubscription)和持久订阅(durablesubscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMSProvider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMSProvider会为这个ID保存所有发送到主题的消息,当客户再次连接到JMSProvider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。
package activemq.test.topic; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import activemq.test.model.User; public class MyListener implements MessageListener { private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer producer; private Topic topic; private Topic control; public Topic getTopic() { return topic; } public void setTopic(Topic topic) { this.topic = topic; } public Topic getControl() { return control; } public void setControl(Topic control) { this.control = control; } public ActiveMQConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public void onMessage(Message message) { try{ if (checkText(message, "SHUTDOWN")) { try { connection.close(); System.out.println("退出监听消息"); } catch (Exception e) { e.printStackTrace(System.out); } } else if (checkText(message, "REPORT")) { // send a report: try { System.out.println("MyListener->收到 a report"); long time = System.currentTimeMillis(); String msg = "MyListener->返回 a report :" + time + "ms"; System.out.println(msg); producer.send(session.createTextMessage(msg)); } catch (Exception e) { e.printStackTrace(System.out); } } else { ObjectMessage obj = (ObjectMessage)message; User u = (User) obj.getObject(); System.out.println("Received messages."+ u.toString()); } }catch(Exception e){ } } public void run() throws JMSException { if(connectionFactory!=null){ System.out.println("connectionFactory is ok"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(this); connection.start(); producer = session.createProducer(control); System.out.println("Waiting for messages..."); } } private static boolean checkText(Message m, String s) { try { return m instanceof TextMessage && ((TextMessage)m).getText().equals(s); } catch (JMSException e) { e.printStackTrace(System.out); return false; } } }
package activemq.test.topic; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartListener { public static void main(String[] args) { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MyListener myListener= (MyListener) context.getBean("myListener"); try { if(myListener!=null){ System.out.println("success..."); } myListener.run(); } catch (Exception e) { e.printStackTrace(); } } }
package activemq.test.topic; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import activemq.test.model.User; public class MyPublisher implements MessageListener { private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer publisher; private Topic topic; private Topic control; private final Object mutex = new Object(); public ActiveMQConnectionFactory getConnectionFactory() { return connectionFactory; } public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } public Topic getTopic() { return topic; } public void setTopic(Topic topic) { this.topic = topic; } public Topic getControl() { return control; } public void setControl(Topic control) { this.control = control; } public void onMessage(Message message) { synchronized (mutex) { System.out.println("Received report " + getReport(message) ); } } Object getReport(Message m) { try { return ((TextMessage)m).getText(); } catch (JMSException e) { e.printStackTrace(System.out); return e.toString(); } } public void publish() throws Exception { User u = new User(); u.setAge(17); u.setName("yuky"+System.currentTimeMillis()); u.setSex("女"); // send events ObjectMessage obj = session.createObjectMessage(); obj.setObject(u); for (int i = 0; i < 10; i++) { publisher.send(obj); publisher.send(session.createTextMessage("REPORT")); } } public void run() throws Exception { connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); publisher = session.createProducer(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); session.createConsumer(control).setMessageListener(this); connection.start(); } public void stop() throws JMSException{ publisher.send(session.createTextMessage("SHUTDOWN")); connection.stop(); connection.close(); } }
package activemq.test.topic; import javax.jms.JMSException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StartPublisher { public static void main(String[] args) throws InterruptedException { /*开始加载spring配置文件*/ ApplicationContext context = new ClassPathXmlApplicationContext("classpath:modules/applicationContext.xml"); MyPublisher publisher= (MyPublisher) context.getBean("myPublisher"); try { publisher.run(); publisher.publish(); } catch (Exception e) { try { publisher.stop(); } catch (JMSException e1) { e1.printStackTrace(); } e.printStackTrace(); } } }
相关推荐
ljcsdn 2020-07-27
woaishanguosha 2020-07-18
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
MojitoBlogs 2020-06-09
猫咪的一生 2020-06-03
guicaizhou 2020-05-05
猫咪的一生 2020-05-03
jiangkai00 2020-04-15
方新德 2020-04-08
jiangkai00 2020-03-20
那年夏天0 2020-02-22
sweetgirl0 2020-02-21
sweetgirl0 2020-02-09
MrZhangAdd 2020-01-13
jiangkai00 2020-01-12
GoatSucker 2020-01-11
yangyutong00 2019-12-30