rabbitmq学习10:使用spring-amqp发送消息及异步接收消息

前面我们已经学习了发送消息及同步接收消息的例子了。下面我们来看看如何通过Spring配置来实现异步接收消息。

   现在我们建立两个WEB项目。发送消息的项目命名为”rabbitmq-demo-producer“ ,异步接受的消息项目名称”rabbitmq-demo-consumer“。

  下面来看看rabbitmq-demo-producer项目中发送信息的程序及配置。

 MessageProducer类是用于发送消息的类。实现如下

package com.abin.rabbitmq;  
  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
  
public class MessageProducer {  
    private RabbitTemplate rabbitTemplate;  
  
    public void sendMessage(Integer i) {  
        String message = "Hello World wubin " + "#" + i;  
        //Exchange的名称为"hello.topic",routingkey的名称为"hello.world.q123ueue"  
        rabbitTemplate.convertAndSend("hello.topic", "hello.world.q123ueue",  
                message);  
        System.out.println("发送第" + i + "个消息成功!内容为:" + message);  
  
//      String messages = "Hello World direct " + "#" + i;  
//      rabbitTemplate.convertAndSend("hello.direct", "hello.world.queue",  
//              messages);  
//      System.out.println("发送第" + i + "个消息成功!内容为:" + messages);  
    }  
  
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
  
}  

spring的配置文件如下:applicationContext-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">  
    <bean id="connectionFactory"  
        class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">  
        <constructor-arg value="localhost" />  
        <property name="username" value="guest" />  
        <property name="password" value="guest" />  
    </bean>  
    <bean id="amqpAdmin"  
        class="org.springframework.amqp.rabbit.core.RabbitAdmin">  
        <constructor-arg ref="connectionFactory" />  
    </bean>  
    <bean id="rabbitTemplate"  
        class="org.springframework.amqp.rabbit.core.RabbitTemplate">  
        <constructor-arg ref="connectionFactory"></constructor-arg>  
    </bean>  
    <bean id="messageProducer"  
        class="com.abin.rabbitmq.MessageProducer">  
        <property name="rabbitTemplate">  
            <ref bean="rabbitTemplate" />  
        </property>  
    </bean>  
</beans>  

对于发送消息的程序自己可以实现,我是通过Struts2来实现的,例如

package com.abin.action;  
  
import java.util.Date;  
  
import com.abin.rabbitmq.MessageProducer;  
import com.opensymphony.xwork2.ActionSupport;  
  
public class SendAction extends ActionSupport {  
    private MessageProducer messageProducer;  
  
    public String execute() throws Exception {  
        Date a = new Date();  
        long b = System.currentTimeMillis();  
        for (int i = 0; i <= 10000; i++) {  
            messageProducer.sendMessage(i);  
        }  
        System.out.println(a);  
        System.out.println(new Date());  
        System.out.println("共花了" + (System.currentTimeMillis() - b) + "ms");  
        return null;  
    }  
  
    public void setMessageProducer(MessageProducer messageProducer) {  
        this.messageProducer = messageProducer;  
    }  
  
}  

发送消息项目的程序差不多就这些了

下面来看看接受消息的程序如下

HelloWorldHandler类用于接收消息的处理类,如下

package com.abin.rabbitmq;  
  
import java.util.Date;  
  
public class HelloWorldHandler {  
    public void handleMessage(String text) {  
        System.out.println("Received: " + text);  
  
        System.out.println(new Date());  
    }  
}  

spring的配置文件如下:applicationContext-rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">  
    <!-- 创建connectionFactory -->  
    <bean id="connectionFactory"  
        class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">  
        <constructor-arg value="localhost" />  
        <property name="username" value="guest" />  
        <property name="password" value="guest" />  
    </bean>  
    <!-- 创建rabbitAdmin 代理类 -->  
    <bean id="rabbitAdmin"  
        class="org.springframework.amqp.rabbit.core.RabbitAdmin">  
        <constructor-arg ref="connectionFactory" />  
    </bean>  
    <!-- 创建rabbitTemplate 消息模板类 -->  
    <bean id="rabbitTemplate"  
        class="org.springframework.amqp.rabbit.core.RabbitTemplate">  
        <constructor-arg ref="connectionFactory"></constructor-arg>  
    </bean>  
    <!-- 声明Queue并设定Queue的名称 -->  
    <bean id="helloWorldQueue"  
        class="org.springframework.amqp.core.Queue">  
        <constructor-arg value="hello.world.queue"></constructor-arg>  
    </bean>  
    <!-- 声明消息转换器为SimpleMessageConverter -->  
    <bean id="messageConverter"  
        class="org.springframework.amqp.support.converter.SimpleMessageConverter">  
    </bean>  
    <!-- 声明Exchange的类型为topic并设定Exchange的名称 -->  
    <bean id="hellotopic"  
        class="org.springframework.amqp.core.TopicExchange">  
        <constructor-arg value="hello.topic"></constructor-arg>  
    </bean>  
  
    <!-- 声明Exchange的类型为direct并设定Exchange的名称 -->  
    <bean id="hellodirect"  
        class="org.springframework.amqp.core.DirectExchange">  
        <constructor-arg value="hello.direct"></constructor-arg>  
    </bean>  
    <!-- 通过Binding来判定Queue、Exchange、routingKey -->  
    <!-- 其中构建Binding的参数1是Queue,参数2是Exchange,参数3是routingKey -->  
    <bean id="queuebling"  
        class="org.springframework.amqp.core.Binding">  
        <constructor-arg index="0" ref="helloWorldQueue"></constructor-arg>  
        <constructor-arg index="1" ref="hellotopic"></constructor-arg>  
        <constructor-arg index="2" value="hello.world.#"></constructor-arg>  
    </bean>  
    <!-- 监听生产者发送的消息开始 -->  
    <!-- 用于接收消息的处理类 -->  
    <bean id="helloWorldHandler"  
        class="com.abin.rabbitmq.HelloWorldHandler">  
    </bean>  
    <!-- 用于消息的监听的代理类MessageListenerAdapter -->  
    <bean id="helloListenerAdapter"  
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="helloWorldHandler" />  
        <property name="defaultListenerMethod" value="handleMessage"></property>  
        <property name="messageConverter" ref="messageConverter"></property>  
    </bean>  
    <!-- 用于消息的监听的容器类SimpleMessageListenerContainer,对于queueName的值一定要与定义的Queue的值相同 -->  
    <bean id="listenerContainer"  
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
        <property name="queueName" value="hello.world.queue"></property>  
        <property name="connectionFactory" ref="connectionFactory"></property>  
        <property name="messageListener" ref="helloListenerAdapter"></property>  
    </bean>  
    <!-- 监听生产者发送的消息结束 -->  
</beans>  

相关推荐