RabbitMQ spring 使用总结
rabbitMQ相关概念不在本文介绍范围,rabbitMQ官网和其他博客都有大量介绍。
本文重点内容是spring和rabbit环境搭建以及使用中注意事项总结。
1.1rabbitMQ服务器搭建
下载安装官网最新版本服务器
1.2rabbitMQ开启服务管理
rabbitMQstart启动
1.3springpom配置
<spring-rabbit.version>1.3.9.RELEASE</spring-rabbit.version>
<!--消息队列rabbitmq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq-client.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
</dependency>
1.4springconfig配置
在D:\workspace\sps\src\main\resources\spring-rabbitmq.xml
配置如下:
<?xmlversion="1.0"encoding="UTF-8"?>
<beansxmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<mvc:annotation-driven/>
<rabbit:connection-factoryid="connectionFactory"host="${rabbitmq.master.ip}"port="${rabbitmq.master.port}"username="${rabbitmq.master.username}"password="${rabbitmq.master.password}"/>
<rabbit:templateid="amqpTemplate"connection-factory="connectionFactory"
exchange="order_topic_exchange"message-converter="gsonConverter"/>
<rabbit:adminconnection-factory="connectionFactory"/>
<rabbit:queuename="orderQueue"durable="true"/>
<rabbit:queuename="orderPayQueryQueue"durable="true"auto-delete="false"exclusive="false">
<rabbit:queue-arguments>
<entrykey="x-message-ttl">
<valuetype="java.lang.Long">600000</value>
</entry>
<entrykey="x-dead-letter-exchange"value="pay_delay_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queuename="orderPayDelayQueryQueue"durable="true"/>
<rabbit:topic-exchangename="pay_delay_exchange">
<rabbit:bindings>
<rabbit:bindingqueue="orderPayDelayQueryQueue"pattern="orderPay.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:topic-exchangename="order_topic_exchange">
<rabbit:bindings>
<rabbit:bindingqueue="orderQueue"pattern="sps.#"/>
<rabbit:bindingqueue="orderPayQueryQueue"pattern="orderPay.#"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:listener-containerconnection-factory="connectionFactory"acknowledge="manual"concurrency="10">
<rabbit:listenerqueues="orderQueue"ref="orderQueueListener"/>
</rabbit:listener-container>
<beanid="orderQueueListener"class="com.supuy.sps.services.queue.OrderQueueListener"/>
<beanid="gsonConverter"class="com.supuy.core.mq.Gson2JsonMessageConverter"/>
</beans>
1.5延迟消息队列
有时候,因为各种原因,我们想实现延迟消费的目的,但是rabbitMQ并没有提供这个功能,这时候,可以通过x-message-ttl和x-dead-letter-exchange实现。
<rabbit:queuename="orderPayQueryQueue"durable="true"auto-delete="false"exclusive="false">
<rabbit:queue-arguments>
<entrykey="x-message-ttl">
<valuetype="java.lang.Long">600000</value>
</entry>
<entrykey="x-dead-letter-exchange"value="pay_delay_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
1.6生产者
@Override
publicvoidorderBuilder(inttype,StringorderCode){
Stringkey="tps."+orderCode;
orderCode=type+"."+orderCode;
amqpMaster.convertAndSend(key,orderCode);
logger.info("订单加入消息队列,订单编码:{}",key);
}
1.7消费者
packagecom.supuy.tps.service.queue;
importcom.alibaba.fastjson.JSON;
importcom.rabbitmq.client.Channel;
importcom.supuy.tps.common.mq.Gson2JsonMessageConverter;
importcom.supuy.tps.dto.bean.WmsOrderParam;
importcom.supuy.tps.service.IOrderShopService;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
importorg.springframework.beans.factory.annotation.Autowired;
/**
*Createdbybillon2016/5/31.
*/
publicclassOrderSendQueueListenerimplementsChannelAwareMessageListener{
privatestaticLoggerlogger=LoggerFactory.getLogger(OrderSendQueueListener.class);
@Autowired
privateGson2JsonMessageConvertermessageConverter;
@Autowired
privateIOrderShopServiceorderShopService;
@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
channel.basicQos(100);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
Stringdata=(String)messageConverter.fromMessage(message);
if(data!=null){
WmsOrderParamwmsOrderParam=JSON.parseObject(data,WmsOrderParam.class);
if(wmsOrderParam!=null){
wmsOrderParam.setOrderCode(wmsOrderParam.getOrderCode().substring(1));
orderShopService.pushOrderLogInfo(wmsOrderParam);
}
}
}
}
附加类Gson2JsonMessageConverter实现如下,
packagecom.supuy.tps.common.mq;
importcom.google.gson.Gson;
importorg.apache.commons.logging.Log;
importorg.apache.commons.logging.LogFactory;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.MessageProperties;
importorg.springframework.amqp.support.converter.AbstractJsonMessageConverter;
importorg.springframework.amqp.support.converter.ClassMapper;
importorg.springframework.amqp.support.converter.DefaultClassMapper;
importorg.springframework.amqp.support.converter.MessageConversionException;
importjava.io.IOException;
importjava.io.UnsupportedEncodingException;
publicclassGson2JsonMessageConverterextendsAbstractJsonMessageConverter{
privatestaticLoglog=LogFactory.getLog(Gson2JsonMessageConverter.class);
privatestaticClassMapperclassMapper=newDefaultClassMapper();
privatestaticGsongson=newGson();
publicGson2JsonMessageConverter(){
super();
}
@Override
protectedMessagecreateMessage(Objectobject,
MessagePropertiesmessageProperties){
byte[]bytes=null;
try{
StringjsonString=gson.toJson(object);
bytes=jsonString.getBytes(getDefaultCharset());
}
catch(IOExceptione){
thrownewMessageConversionException(
"FailedtoconvertMessagecontent",e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(getDefaultCharset());
if(bytes!=null){
messageProperties.setContentLength(bytes.length);
}
classMapper.fromClass(object.getClass(),messageProperties);
returnnewMessage(bytes,messageProperties);
}
@Override
publicObjectfromMessage(Messagemessage)
throwsMessageConversionException{
Objectcontent=null;
MessagePropertiesproperties=message.getMessageProperties();
if(properties!=null){
StringcontentType=properties.getContentType();
if(contentType!=null&&contentType.contains("json")){
Stringencoding=properties.getContentEncoding();
if(encoding==null){
encoding=getDefaultCharset();
}
try{
Class<?>targetClass=getClassMapper().toClass(
message.getMessageProperties());
content=convertBytesToObject(message.getBody(),
encoding,targetClass);
}
catch(IOExceptione){
thrownewMessageConversionException(
"FailedtoconvertMessagecontent",e);
}
}
else{
log.warn("Couldnotconvertincomingmessagewithcontent-type["
+contentType+"]");
}
}
if(content==null){
content=message.getBody();
}
returncontent;
}
privateObjectconvertBytesToObject(byte[]body,Stringencoding,
Class<?>clazz)throwsUnsupportedEncodingException{
StringcontentAsString=newString(body,encoding);
returngson.fromJson(contentAsString,clazz);
}
@Override
publicClassMappergetClassMapper(){
returnnewDefaultClassMapper();
}
}
1.8Q&A
1ttl设置之后,下次修改时间,会报错,这时候,需要先删除该队列,重启项目。
2接受消息之后,出现错误,该消息就会被持续占有,无法消费。所以,要活用消息的ack,nack,reject。