activeMQ+JMS笔记

[1]

在介绍ActiveMQ之前,首先简要介绍一下JMS规范。

JMS的简介:

(1)

JMS(Java Message Service,Java消息服务)是一组Java应用程序接口(Java API),它提供创建、发送、接收、读取消息的服务。JMS 使您能够通过消息收发服务从一个 JMS 客户机向另一个 JML 客户机交流消息。
JMS是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC (Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ.
(2)

JMS典型的应用场景:

操作可异步执行.

发email了,发msn消息了.

或者一些比较耗时的操作,   比如要在某目录下生成一个大报表.   操作者把指令发出去就完事.
 
[2]

JMS的基本构件:

(1)

Broker

什么是Broker呢?可以把JMS Brokers 看成是服务器端。这个服务器可以独立运行.也可以随着其他容器
以内嵌方式云心,如下配置:
使用显示的Java代码创建

BrokerServicebroker=newBrokerService();

//configurethebroker

broker.addConnector("tcp://localhost:61616");

broker.start();
使用BrokerFacotry创建BrokerService broker = BrokerFactory.getInstance().createBroker(someURI);
使用Spring Bean创建

<beanid=”broker”class=”org.apache.activemq.xbean.BrokerFactoryBean”>

<propertyname=”config”value=”classpath:org/apache/activemq/xbean/activemq.xml”/>

<propertyname=”start”value=”true”/>

</bean>
还可以使用XBean或Spring 2.0等多种配置方式配置,通过ActiveMQConnectionFactory还可以隐含的创建内嵌的broker,这个broker就不是一个独立的服务了。
<bean id=”jmsTemplate” class=”org.springframework.jms.core.JmsTemplate”>

<propertyname=”connectionFactory”ref=”jmsFactory”/>

<propertyname=”defaultDestination”ref=”destination”/>

<propertyname=”destinationResolver”ref=”默认是DynamicDestionResolver”/>

<propertyname=”pubSubDomain”><value>trueorfalse默认是false,

false是QueneDestination,true是TopicDestination</value>

</bean>
上面的defaultDestination是指默认发送和接收的目的地,我们也可以不指定,而是通过目的地名称让jmsTemplate自动帮我们创建.
(2)

1连接工厂

连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。

2连接

JMSConnection封装了客户与JMS提供者之间的一个虚拟的连接。

3会话

JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。
(3)

目的地:

目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种
消息传递域:Point-to-Point消息(P2P),点对点;发布订阅消息(Publish Subscribe messaging,简称Pub/Sub)
两者的区别:

P2P消息模型是在点对点之间传递消息时使用。如果应用程序开发者希望每一条消息都能够被处理,那么应该使用P2P消息模型。与Pub/Sub消息模型不同,P2P消息总是能够被传送到指定的位置。

        P2P消息,每个消息只能有一个消费者。
  Pub/Sub模型在一到多的消息广播时使用。如果一定程度的消息传递的不可靠性可以被接受的话,那么应用程序开发者也可以使用Pub/Sub消息模型。换句话说,它适用于所有的消息消费程序并不要求能够收到所有的信息或者消息消费程序并不想接收到任何消息的情况。    Pub/Sub,每个消息可以有多个消费者。
在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。
(3)

3.1

消息生产者

消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。

3.2

消息消费者

消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种
方法之一:

?异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。(异步操作)

?同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

3.3

消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:
简单文本 (TextMessage)、可序列化的对象 (ObjectMessage)、属性集合
(MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
(4)JMS定义了从0到9的优先级路线级别,0是最低的优先级而9则是最高的。更特殊的是0到4是正常优先级的变化幅度,而5到9是加快的优先级的变化幅度。
[3]

ActiveMQ简介:

ActiveMQ是开源的JMS实现,Geronimo应用服务器就是使用的ActiveMQ提供JMS服务。

安装

在http://activemq.apache.org/download.html下载5.0.0发行包,解压即可,

启动

window环境运行解压目录下的/bin/activemq.bat

测试

ActiveMQ默认使用的TCP连接端口是61616,通过查看该端口的信息可以测试ActiveMQ是否成功启动

window环境运行netstat-an|find"61616"

监控

ActiveMQ5.0版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。

admin:http://127.0.0.1:8161/admin/

demo:http://127.0.0.1:8161/demo/

点击demo应用中的“Marketdatapublisher”,就会发一些测试的消息。转到admin页面的topicsmenu下面(queue和topic的区别见http://andyao.iteye.com/blog/153173),可以看到消息在增长。

ActiveMQ5.0的配置文件在解压目录下的/conf目录下面。主要配置文件为activemq.xml.
 
[4]
实例一:(没有结合spring框架)
public class QueueProducer {

/*

*创建的简图

ConnectionFactory—->Connection—>Session—>Message

Destination+Session————————————>Producer

Destination+Session————————————>MessageConsumer

*/

publicstaticvoidmain(String[]args){

//ConnectionFactory:连接工厂,JMS用它创建连接

ConnectionFactoryconnectionFactory;

//Connection:JMS客户端到JMSProvider的连接

Connectionconnection=null;

//Session:一个发送或接收消息的线程

Sessionsession;

//Destination:消息的目的地;消息发送给谁.

Queuequeue;

//设置回复的目的地

QueuereplyQueue;

//MessageProducer:消息发送者

MessageProducerproducer;

  MessageConsumer replyer;
  connectionFactory = new ActiveMQConnectionFactory(

ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD,

    "tcp://192.168.1.191:61616");
  try {

//构造从工厂得到连接对象

connection=connectionFactory.createConnection();

//启动

connection.start();

//获取操作连接

session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

//创建队列:可以在http://localhost:8161/admin/queue.jsp中看到

queue=newActiveMQQueue("jason.queue2");

replyQueue=newActiveMQQueue("jason.replyQueue");

//得到消息生成者【发送者】:需要由Session和Destination来创建

producer=session.createProducer(queue);

//创建消息

TextMessagemessage=session.createTextMessage("jason学习ActiveMq发送的消息");

//在消息中设置回复的目的地,

//对方用MessageProducersender=session.createProducer(message.getJMSReplyTo());创建回复者

message.setJMSReplyTo(replyQueue);

//发送一个non-Persistent的消息

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

producer.send(message);

//发送一个Persistent的消息

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

producer.send(session.createTextMessage("这是一个Persistent的消息!重启JMS,仍可获取"));

System.out.println("发送消息:jason学习ActiveMq发送的消息");

System.out.println("这是一个Persistent的消息!重启JMS,仍可获取");

//用回复的目的地定义回复接收者,且设置侦听

replyer=session.createConsumer(replyQueue);

replyer.setMessageListener

(

newMessageListener()

{

publicvoidonMessage(Messagemessage)

{

try{

TextMessagetxtmess=(TextMessage)message;

System.out.println("consumer的回复内容是:"+txtmess.getText());

}catch(Exceptione){

e.printStackTrace();

}

}

}

);

   session.commit();
  } catch (Exception e) {

e.printStackTrace();

}finally{

try{

if(null!=connection)

connection.close();

}catch(Throwableignore){

}

}

}

}
接收者:
//public class Receiver {

publicclassQueueConsumerimplementsMessageListener{

publicstaticvoidmain(String[]args)

{

QueueConsumerre=newQueueConsumer();

//循环只是为了让程序每2秒进行一次连接侦听是否有消息可以获取.

while(true)

{

re.consumeMessage();

try{

Thread.sleep(2000);

}catch(InterruptedExceptione){

e.printStackTrace();

}

}

//对于主动接收的,只须直接执行:re.consumeMessage();即可.

//其中的while(true),会一次性将所有的消息获取过来.

}

publicvoidconsumeMessage()

{

ConnectionFactoryconnectionFactory;

Connectionconnection=null;

Sessionsession;

Queuequeue;

    MessageConsumer consumer;

    connectionFactory = new ActiveMQConnectionFactory(

ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD,

"tcp://192.168.1.191:61616");

try{

connection=connectionFactory.createConnection();

connection.start();

session=connection.createSession(Boolean.FALSE,

Session.AUTO_ACKNOWLEDGE);

queue=newActiveMQQueue("jason.queue2");

consumer=session.createConsumer(queue);

//接受消息方式一:主动的去接受消息,用consumer.receive

//只能获取一条消息–>不采用

//TextMessagemessage=(TextMessage)consumer.receive(1000);

//if(null!=message){

//System.out.println("收到消息"+message.getText());

//}

//可以不断循环,获取所有的消息.—>关键.

//while(true){

//TextMessagemessage=(TextMessage)consumer.receive(1000);

//if(null!=message){

//System.out.println("收到消息"+message.getText());

//}else{

//break;//没有消息时,退出

//}

//}

/*接受消息方式二:基于消息监听的机制,需要实现MessageListener接口,这个接口有个onMessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

*/

consumer.setMessageListener(this);

        

    } catch (Exception e) {

e.printStackTrace();

}finally{

try{

if(null!=connection)

connection.close();

}catch(Throwableignore){

}

}

}

publicvoidonMessage(Messagemessage)

{

try{

if(messageinstanceofTextMessage){

TextMessagetxtmess=(TextMessage)message;

        System.out.println("收到的消息是:" + txtmess.getText()); 
      //回复发送者
        MessageProducer sender=session.createProducer(message.getJMSReplyTo());

sender.send(session.createTextMessage("已收到你的消息"));

}

else

System.out.println("收到的消息是:"+message);

}catch(Exceptione){

}

}

}
说明:
(2)

VMTransport

VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
TCP Transport

TCPtransport允许客户端通过TCPsocket连接到远程的broker。以下是配置语法:

tcp://hostname:port?transportOptions

tcp://localhost:61616
(3)

3.1

启动activeMQ后,用户创建的queues会被保存在activeMQ解压目录下的\data\kr-store\data中.

3.2

创建queue,可以在代码中创建,也可以直接进入http://localhost:8161/admin—>点击queue—>在上面的field中填下你要创建的queue名–>点击创建即可.

3.3

若用户创建的queue,不是持久化的,则在重启activeMQ后,数据文件中的内容会被清空,但文件仍存在.
(4)

messageProducer发送消息后,会在保存在目的地,即上面的queue中,也即就是在\data\kr-store\data目录下的文件中.

messageReceiver(连接到相同目的地的接收者),不需要立即接收.只要activeMQ的服务端不关闭,当运行接收者,连接到activeMQ的服务端时,就可以获取activeMQ服务端上已发送的消息.

发送/接收的消息情况及数量及消息的内容与处理(删除),可以在
(5)

可以将activeMQ的服务端放于一PC中,发送者位于另一PC,接收者也位于另一PC中.

只要:tcp://activeMQ的服务端IP:activeMQ的服务端口,进行连接即可.
(6)queue消息,只被消费一次.
 
topic的实例(无结合spring)
public class TopicTest {

publicstaticvoidmain(String[]args)throwsException{

ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("vm://localhost");

Connectionconnection=factory.createConnection();

connection.start();

//创建一个Topic

Topictopic=newActiveMQTopic("testTopic");

Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

//注册消费者1

MessageConsumercomsumer1=session.createConsumer(topic);

comsumer1.setMessageListener(newMessageListener(){

publicvoidonMessage(Messagem){

try{

System.out.println("Consumer1get"+((TextMessage)m).getText());

}catch(JMSExceptione){

e.printStackTrace();

}

}

});

//注册消费者2

MessageConsumercomsumer2=session.createConsumer(topic);

comsumer2.setMessageListener(newMessageListener(){

publicvoidonMessage(Messagem){

try{

System.out.println("Consumer2get"+((TextMessage)m).getText());

}catch(JMSExceptione){

e.printStackTrace();

}

}

});

//创建一个生产者,然后发送多个消息。

MessageProducerproducer=session.createProducer(topic);

for(inti=0;i<10;i++){

producer.send(session.createTextMessage("Message:"+i));

}

}

}
输出如下:
Consumer1 get Message:0

Consumer2getMessage:0

Consumer1getMessage:1

Consumer2getMessage:1

Consumer1getMessage:2

Consumer2 get Message:2
………………………..

实例二:

异步的电子邮件(将topic的应用实例也放在一起)

(4.1)

背景说明:  

以传统的方式发送电子邮件(作为同步请求的一部分)会引起一些问题。首先,连接到电子邮件服务器需要一次网络往返,速度可能会很慢,尤其是在服务器非常繁忙的时候。过载的电子邮件服务器甚至可以使依赖于电子邮件的服务暂时不可用。

xa事务支持

另一个显而易见的问题是,电子邮件服务器通常在本质上是非事务性的。当事务被回滚时,这可以导致出现不一致的通知——把一条消息放入队列之后不能取消它。幸运的是, jms 支持事务,而且可以通过把消息的发送延迟到提交底层事务的时候来解决这个问题。
(4.2)

实现效果:

用户在更改密码后,系统会发送邮件通知用户,为了避免发送邮件时程序对用户操作的阻塞,可以用JMS异步发送邮件.
(4.3)

实现流程:

当用户更改密码后

–1->调用JMS的发送者,发送者会利用jmsTemplate发送消息到目的地

–2->系统,执行原系统的程序.

–2->当消息发送后,messageListener侦听到消息,接收后执行相应的方法handleMessage().在此方法中执行发送email.

好处:

从而实现了异步发送邮件,避免用户等待单一线程完成,使原程序的执行更快,提升用户体验
<1>

第一步:

创建broker,jmsFactory,destination,messageConverter,jmsTemplate(用于发送JMS消息).
然后将jmsFactory,Destination,MessageConverter放入jmsTemplate中.
(1)首先,我们在Spring中加入ActiveMQ Broker的配置:
1.
删除原来jason-servlet.xml中的<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN 2.0//EN"
"http://www.springframework.org/dtd/spring-beans-2.0.dtd">

<beans>

改为:

<beansxmlns="http://www.springframework.org/schema/beans"

xmlns:amq="http://activemq.org/config/1.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-2.0.xsd

http://activemq.org/config/1.0

http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">

同时,要导入相应的jar包即可.
2.
<!–  在Spring中配置嵌入式的 activemq broker,这样就不用在使用JMS时,要手动启动activeMQ broker –>

<amq:brokeruseJmx="false"persistent="false">

<amq:transportConnectors>

<amq:transportConnectoruri="tcp://localhost:61616"/>

</amq:transportConnectors>

 </amq:broker>
 <!–  消息的存储机制, 服务器重启也不会丢失消息.

<amq:brokeruseJmx="false"persistent="true">

<amq:persistenceAdapter>

<amq:amqPersistenceAdapterdirectory="d:/amq"/>

</amq:persistenceAdapter>

<amq:transportConnectors>

<amq:transportConnectoruri="tcp://localhost:61616"/>

<amq:transportConnectoruri="vm://localhost:0"/>

</amq:transportConnectors>

</amq:broker>

 –>
(2)

在Spring中配置JMSConnectionFactory。

<beanid="jmsFactory2"

class="org.apache.activemq.ActiveMQConnectionFactory">

<propertyname="brokerURL"value="tcp://localhost:61616"/>

    </bean>
也可直接用:
<amq:connectionFactory id="jmsFactory2" brokerURL="tcp://localhost:61616" />

注意其中的borkerURL,应该是你在activemq.xml中transportconnector节点的uri属性,这表示JMSServer的监听地址。activeMQ默认端口是61616,由于采用默认方式,所以这里也是61616.

同时,要运行程序之前,我们要先启动broker,即启动解压目录下的/bin/activemq.bat.
(3)

配置消息发送目的地:

<beanid="topicDestination"

class="org.apache.activemq.command.ActiveMQTopic">

<constructor-argvalue="MY.topic"/>

</bean>

<beanid="queueDestination"

class="org.apache.activemq.command.ActiveMQQueue">

<constructor-argvalue="MY.queue"/>

    </bean>
也可直接用:
    <amq:topic name="topicDestination" physicalname="MY.topic"/>

<amq:queuename="queueDestination"physicalname="MY.queue"/>

在 JMS中,目的地有两种:主题(topic)和队列(queue)。两者的区别是:当一个主题目的地中被放入了一个消息后,所有的订阅者都会收到通知;而 对于队列,仅有一个“订阅者”会收到这个消息,队列中的消息一旦被处理,就不会存在于队列中。显然,对于邮件发送程序来说,使用队列才是正确的选择,而使 用主题时,可能会发送多封相同的邮件。
(4)

配置Spring中消息发送的JMSTemplate:

(与hibernate相似,其配置的是hibernateTemplate.都要将连接工厂放到template中)

<beanid="producerJmsTemplate"

class="org.springframework.jms.core.JmsTemplate">

<propertyname="connectionFactory">

<beanclass="org.springframework.jms.connection.SingleConnectionFactory">

<propertyname="targetConnectionFactory"

ref="jmsFactory2"/>

</bean>

</property>

<!–因为要实现此模板同时用于queue与topic,所以目的地要放于发送者中.若单独使用,放于此,更方便!

<propertyname="defaultDestination"ref="queueDestination"/>

–>

<propertyname="messageConverter"ref="userMessageConverter"/>

    </bean>
(5)

在实际的消息发送中,邮件内容需要用到User.username,User.password,User.email,User.fullname,我们定义了messageConverter,在发送信息时,将user对象转换成消息,在接收消息时,会将消息转换成User对象.只要在上面的jmstemplate中设置了messageConverter属性,发送/接收消息时,Spring会自动帮我们进行转换,下面是Converter的配置和代码:

<beanid="userMessageConverter"

        class="com.laoer.bbscs.jms.UserMessageConverter" />
代码如下:

publicclassUserMessageConverterimplementsMessageConverter{

privatestatictransientLoglogger=LogFactory.getLog(UserMessageConverter.class);

publicObjectfromMessage(Messagemessage)throwsJMSException{

if(logger.isDebugEnabled()){

logger.debug("ReceiveJMSmessage:"+message);

}

if(messageinstanceofObjectMessage){

ObjectMessageoMsg=(ObjectMessage)message;

if(oMsginstanceofActiveMQObjectMessage){

ActiveMQObjectMessageaMsg=(ActiveMQObjectMessage)oMsg;

try{

PersonInfopersonInfo=(PersonInfo)aMsg.getObject();

returnpersonInfo;

}catch(Exceptione){

logger.error("Message:["+message+"]isnotainstanceofpersonInfo.");

thrownewJMSException("Message:["+message+"]isnotainstanceofpersonInfo.");

}

}else{

logger.error("Message:["+message+"]isnot"+"ainstanceofActiveMQObjectMessage[personInfo].");

thrownewJMSException("Message:["+message+"]isnot"+"ainstanceofActiveMQObjectMessage[personInfo].");

}

}else{

logger.error("Message:["+message+"]isnotainstanceofObjectMessage.");

thrownewJMSException("Message:["+message+"]isnotainstanceofObjectMessage.");

}

}

publicMessagetoMessage(Objectobj,Sessionsession)throwsJMSException{

if(logger.isDebugEnabled()){

logger.debug("ConvertUserobjecttoJMSmessage:"+obj);

}

if(objinstanceofPersonInfo){

ActiveMQObjectMessagemsg=(ActiveMQObjectMessage)session.createObjectMessage();

msg.setObject((PersonInfo)obj);

returnmsg;

}else{

logger.error("Object:["+obj+"]isnotainstanceofPersonInfo.");

thrownewJMSException("Object:["+obj+"]isnotainstanceofPersonInfo.");

}

}

}
此程序实现了MessageConverter接口,并实现其中的fromMessage和toMessage方法,分别实现转换接收到的消息为User对象和转换User对象到消息。我们在程序中使用的是ActiveMQObjectMessage,它是ActiveMQ中对javax.jms.ObjectMessage的一个实现。
<2>

第二步:

配置发送消息:

(1)

在spring配置文件中配置发送者:

<!–发送queue消息–>

<beanid="userMessageProducer"class="com.laoer.bbscs.jms.UserMessageProducer">

<propertyname="jmsTemplate"ref="producerJmsTemplate"/>

<propertyname="defaultDestination"ref="queueDestination"/>

</bean>

<!–发送topic消息–>

<beanid="topicMessageProducer"class="com.laoer.bbscs.jms.TopicMessageProducer">

<propertyname="jmsTemplate"ref="producerJmsTemplate"/>

<propertyname="defaultDestination"ref="topicDestination"/>

</bean>

由于发送者是用jmsTemplate进行发送的,所以有注入jmsTemplate.

(2)

代码如下:

packagecom.laoer.bbscs.jms;

publicclassUserMessageProducer{

privateJmsTemplatejmsTemplate;

//因为"目的地"属性放于jmsTemplate中,则不用添加此属性.

privateQueuedefaultDestination;

publicvoidsendUserLoginInformationMail(PersonInfopersonInfo){

//若"目的地"属性放于jmsTemplate中,则用此方式

//getJmsTemplate().convertAndSend(personInfo);

getJmsTemplate().convertAndSend(this.defaultDestination,personInfo);

}

getter,setter略…

}

其中,sendUserLoginInformationMail方法是唯一我们需要编写的,调用JMSTemplate的convertAndSend方法,Spring会自己调用我们之前配置的converter来转换我们发送的User对象,再进行发送。
public class TopicMessageProducer {

privateJmsTemplatejmsTemplate;

privateTopicdefaultDestination;

publicvoidsendTopicMessage(PersonInfopersonInfo){

getJmsTemplate().convertAndSend(this.defaultDestination,personInfo);

    }
    getter,setter略…}
 
<3>

实现消息的接收

我们使用MDP(MessageDrivePOJO)来实现消息的异步接收。

我们需要实现javax.jms.MessageListener接口的void onMessage(Message message)方法来接收消息。
不过我们可以使用Spring中提供的MessageListenerAdapter来简化接收消息的代码。
(0)
<!– 定义消息消费者,然后直接在messageListener中调用.

消费者,不用加入jmsTemplate属性,jmsTemplate只用于发送消息–>

<!–queue消息消费者,只能一个–>

<beanid="userMessageConsumer"class="com.laoer.bbscs.jms.UserMessageConsumer">

<propertyname="mailSender"ref="mailSender"/>

</bean>

<!–topic消息消费者,可以多个–>

<beanid="topicConsumerA"class="com.laoer.bbscs.jms.TopicConsumerA"/>

<beanid="topicConsumerB"class="com.laoer.bbscs.jms.TopicConsumerB"/>

(1)

配置messageListener,用它来接收消息.(用jmsTemplate来发送消息)

<!–定义queue,topic(A/Bconsumer)各自的侦听器–>

<beanid="messageListener"

class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

<constructor-argref="userMessageConsumer"/>

<propertyname="defaultListenerMethod"value="handleMessage"/>

<propertyname="messageConverter"ref="userMessageConverter"/>

</bean>

<beanid="topicListenerA"

class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

<constructor-argref="topicConsumerA"/>

<propertyname="defaultListenerMethod"value="receiveA"/>

<propertyname="messageConverter"ref="userMessageConverter"/>

</bean>

<beanid="topicListenerB"

class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

<constructor-argref="topicConsumerB"/>

<propertyname="defaultListenerMethod"value="receiveB"/>

<propertyname="messageConverter"ref="userMessageConverter"/>

    </bean> 
其中的mailSender:即是我们的邮件发送类,此类中的方法send()方法实现了邮件发送的功能。
消息侦听适配器defaultListenerMethod属性:指定Spring在收到消息后调用的方法,此处为handleMessage,Spring会根据收到的消息–转换为User对象–>调用handleMessage()方法。
(2)配置消息侦听容器,并指定我们定义的消息侦听器。
<!– 定义queue,topic(A/B consumer)各自的侦听容器 –>

<beanid="listenerContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<propertyname="concurrentConsumers"value="5"/>

<propertyname="connectionFactory"ref="jmsFactory2"/>

<propertyname="destination"ref="queueDestination"/>

<propertyname="messageListener"ref="messageListener"/>

</bean>

<beanid="topicListenerContainerA"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<propertyname="concurrentConsumers"value="5"/>

<propertyname="connectionFactory"ref="jmsFactory2"/>

<propertyname="destination"ref="topicDestination"/>

<propertyname="messageListener"ref="topicListenerA"/>

</bean>

<beanid="topicListenerContainerB"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<propertyname="concurrentConsumers"value="5"/>

<propertyname="connectionFactory"ref="jmsFactory2"/>

<propertyname="destination"ref="topicDestination"/>

<propertyname="messageListener"ref="topicListenerB"/>

</bean>

配置了同时侦听的个数,连接工厂(发送者,接收者都要连接到同一地方),目的地(与上面对应),自定义的侦听器.
(3)

接收者的代码:

packagecom.laoer.bbscs.jms;

publicclassUserMessageConsumer{

privatestatictransientLoglogger=LogFactory.getLog(UserMessageConsumer.class);

privateMailSendermailSender;

publicvoidhandleMessage(PersonInfopersonInfo)throwsJMSException{

if(logger.isDebugEnabled()){

logger.debug("ReceiveaUserobjectfromActiveMQ:"+personInfo.toString());

}

mailSender.send(personInfo,"h***[email protected]");

}

getter,setter略….

}
 
public class TopicConsumerA {

publicvoidreceiveA(PersonInfopersonInfo)throwsJMSException

{

System.out.println("TopicConsumerA收到TopicProducer的消息—->personInfo的用户名是:"+personInfo.getName());

}

}
TopicConsumerB与TopicConsumerA相似.
 
(4)

发送邮件的相应方法:

publicStringsend(PersonInfopersoninfo,StringmailAddr)

{

System.out.println("现在的时间是:"+System.currentTimeMillis());

//不要使用SimpleEmail,会出现乱码问题

HtmlEmailemail=newHtmlEmail();

try{

//这里是发送服务器的名字

email.setHostName("smtp.sohu.com");

//编码集的设置

email.setCharset("gbk");

//收件人的邮箱

//email.addTo("h***[email protected]");

email.addTo(mailAddr);

//发送人的邮箱

email.setFrom("sh****[email protected]","she**fa");

   // 如果需要认证信息的话,设置认证:用户名-密码。分别为发件人在邮件
服务器上的注册名称和密码

email.setAuthentication("she**fa","20***23");

email.setSubject("测试email与JMS–你的密码修改了");

//要发送的信息

   email.setMsg("你现在的用户名是:"+personinfo.getName()+" \n密码
是:"+personinfo.getPassword());

//发送

email.send();

System.out.println("现在的时间是:"+System.currentTimeMillis());

return"发送成功!";

}catch(EmailExceptione){

return"发送失败!";

}

 }
(5)
在jLogin.java中调用即可(也添加相应的producer及setter,getter):
//用ActiveMQ的queue,正常发email

this.getMailSender().send(pi,hw***@126.com");

this.getUserMessageProducer().sendUserLoginInformationMail(pi);

//用ActiveMQ的topic发布消息

   this.getTopicMessageProducer().sendTopicMessage(pi);
 
这样,当执行后,会异步发送邮件,同时topicProducer发送一个消息后,topicConsumerA/B都会侦听接收到,且执行相应的操作.

[6]

实例二:

整合ActiveMQ,Quartz,实现定时发送邮件:

(1)

activeMQ的配置同上;

(2)

如何将ActiveMQ整合到Quartz中去:

1.用非继承的方法实现quartz有任务类.

将任务类的目标类,目标方法:指定为消息发送类,及其发送消息的方法即可.

<!–利用Quartz定时分email–>

   <bean id="mailJobDetailBean"
class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">

<propertyname="targetObject">

<reflocal="userMessageProducer"/>–>指定发送者

</property>

<propertyname="targetMethod">–>指定发送方法,不同于上面

<value>send2</value>

</property>

   </bean>
    public void send2()

{

PersonInfopersonInfo=newPersonInfo();

personInfo.setName("jason");

personInfo.setPassword("123456789");

this.getJmsTemplate().convertAndSend(this.defaultDestination,personInfo);

    }
2.创建时间表.

<beand="mailCronTriggerBean"

class="org.springframework.scheduling.quartz.CronTriggerBean">

<propertyname="jobDetail">

<reflocal="mailJobDetailBean"/>

</property>

<propertyname="cronExpression">

<value>30****?</value>

</property>

</bean>

3.放入任务工厂.

   <bean id="schedulerFactoryBean"

class="org.springframework.scheduling.quartz.SchedulerFactoryBean">

<propertyname="triggers">

<list>

<refbean="cronTriggerBean"/>

<refbean="jasonGetPersonCronTriggerBean"/>

<refbean="mailCronTriggerBean"/>

</list>

</property>

</bean>

相关推荐