spring集成rabbitmq实现rpc
public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException { return this.convertSendAndReceive(this.exchange, routingKey, message, null); }
一
spring整合Rabbit MQ提供了Reply来实现RPC,AMQP协议定义了14中消息的属性,其中两项,一项是Replyto,表示返回消息的队列,一个是correlationId 用来表示发送消息和返回消息的标志,来区分是否是一个调用
下面一步步来实现RPC
首先贴出spring配置文件代码
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:prpc="http://www.pinnettech.com/schema/rpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.pinnettech.com/schema/rpc http://www.pinnettech.com/schema/springtag.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <context:component-scan base-package="com.temp.rabbit"> </context:component-scan> <!-- rabbit消息发送方 --> <!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码 guest默认不允许远程登录--> <rabbit:connection-factory id="rabbitConnectionFactory" host="localhost" username="dengwei" password="dengwei" port="5672" virtual-host="/" channel-cache-size="5"/> <rabbit:admin connection-factory="rabbitConnectionFactory"/> <!-- 发送消息可以带*,绑定关系需全单词 --> <rabbit:direct-exchange name="rpc.bao.direct.goods" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="rpc.bao.goods" key="dengwei.goods"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- durable是否持久化 exclusive:是否排外的--> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rpc.bao.goods"/> <!-- 消息转换器 --> <bean id="byteMessageConverter" class="com.temp.rabbit.BytesMessageConverter"/> <!-- 发送消息模板 --> <rabbit:template id="amqTemplate" exchange="rpc.bao.direct.goods" connection-factory="rabbitConnectionFactory" message-converter="byteMessageConverter" /> <!-- 消息发送方 end --> <!-- 消息接受方处理器 --> <bean id="msgHandler" class="com.temp.rabbit.receive.MessageHandler"/> <!-- 消息消费者 --> <bean id="msgLisenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> <constructor-arg name="delegate" ref="msgHandler"/> <constructor-arg name="messageConverter" ref="byteMessageConverter"/> </bean> <!-- 消费者容器 --> <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto"> <rabbit:listener queues="rpc.bao.goods" ref="msgLisenerAdapter"/> </rabbit:listener-container> <!-- 消息接收方 end --> <!-- RPC 配置 --> <!-- 消息服务提供接口实现 --> <bean id="service1" class="com.temp.rabbit.bean.TempServiceImp"/> <!-- 代理类 --> <bean id="service1Proxy" class="com.temp.rabbit.receive.proxy.ServiceProxy"> <property name="t" ref="service1"></property> </bean> <!-- 代理对象 --> <bean id="proxyService" factory-bean="service1Proxy" factory-method="getProxy"/> </beans>
其中消息转换器类:
package com.temp.rabbit; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.util.SerializationUtils; public class BytesMessageConverter extends AbstractJsonMessageConverter{ @Override protected Message createMessage(Object msg, MessageProperties msgPro) { byte[] data = SerializationUtils.serialize(msg); msgPro.setContentLength(data.length); System.out.println("create message "+msg.getClass()); return new Message(data , msgPro); } @Override public Object fromMessage(Message msg) throws MessageConversionException { byte[] data = msg.getBody() ; Object result = SerializationUtils.deserialize(data); System.out.println("create obj "+result.getClass()); return result; } }
消费者处理handler
package com.temp.rabbit.receive; import java.lang.reflect.Method; import com.temp.rabbit.bean.RpcRequest; import com.temp.rabbit.bean.RpcResponse; import com.temp.rabbit.bean.TempServiceImp; public class MessageHandler { //没有设置默认的处理方法的时候,方法名是handleMessage public RpcResponse handleMessage(RpcRequest message){ Class<?> clazz = message.getClassName() ; RpcResponse response = new RpcResponse(); Method method; try { method = clazz.getMethod(message.getMethodName(), message.getParamType()); Object result = method.invoke(new TempServiceImp(), message.getParams()); response.setResult(result); } catch (Exception e) { e.printStackTrace(); } return response ; } }
服务提供:
package com.temp.rabbit.bean; public class TempServiceImp implements TempService { public String sayHello(){ return "TempServiceImp hello ... " ; } }
代理类:
package com.temp.rabbit.receive.proxy; import java.io.Serializable; import java.lang.reflect.Method; import org.springframework.beans.factory.annotation.Autowired; import com.temp.rabbit.bean.RpcRequest; import com.temp.rabbit.bean.RpcResponse; import com.temp.rabbit.send.SendRabbitMsgImp; import net.sf.cglib.proxy.Enhancer; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; public class ServiceProxy<T> implements MethodInterceptor{ private Enhancer enhancer = new Enhancer(); private T t ; public void setT(T t){ this.t = t ; } @Autowired private SendRabbitMsgImp rabbitMsg ; public Object getProxy(){ enhancer.setSuperclass(t.getClass()); enhancer.setCallback(this); return enhancer.create(); } @Override public Object intercept(Object obj, Method method, Object[] param, MethodProxy proxy) throws Throwable { RpcRequest request = new RpcRequest(); request.setMethodName(method.getName()); request.setClassName(t.getClass()); Class<?>[] paramType = new Class<?>[param.length]; Serializable[] para = new Serializable[param.length]; for(int i = 0 ; i < param.length ; i ++){ paramType[i] = param[i].getClass(); para[i] = (Serializable)param[i]; } request.setParams(para); request.setParamType(paramType); RpcResponse result = (RpcResponse)rabbitMsg.sendAdcReceive("dengwei.goods", request) ; return result.getResult(); } }
主程序
package com.temp; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.temp.rabbit.bean.TempService; public class Main { public static void main(String[] args) { ApplicationContext app = new ClassPathXmlApplicationContext("classpath:spring.xml"); TempService proxy = (TempService)app.getBean("proxyService"); System.out.println("main result " + proxy.sayHello()) ; } }
消息发送实现:
package com.temp.rabbit.send; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component("sendMsg") public class SendRabbitMsgImp implements SendRabbitMsg{ @Autowired private RabbitTemplate template ; @Override public void sendData2Queue(String queueKey, Object msg) { try { template.convertAndSend(queueKey, msg); } catch (Exception e) { e.printStackTrace(); System.out.println("send data 2 msg erro "); } System.out.println("消息已发送"); } @Override public Object sendAdcReceive(String queueKey , Object msg){ try { return template.convertSendAndReceive(queueKey, msg); } catch (Exception e) { e.printStackTrace(); System.out.println("send data 2 msg erro "); } System.out.println("消息已发送"); return null ; } }
这里面的RpcRequest和RpcResponse就不贴代码了
这里讲一下原理实现,我们可以跟着源码看一下
首先调用的是RabbitTemplate的
public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException { return this.convertSendAndReceive(this.exchange, routingKey, message, null); }
然后一路走下去
@Override public Object convertSendAndReceive(final String exchange, final String routingKey, final Object message, final MessagePostProcessor messagePostProcessor) throws AmqpException { Message requestMessage = convertMessageIfNecessary(message); if (messagePostProcessor != null) { requestMessage = messagePostProcessor.postProcessMessage(requestMessage); } Message replyMessage = this.doSendAndReceive(exchange, routingKey, requestMessage); if (replyMessage == null) { return null; } return this.getRequiredMessageConverter().fromMessage(replyMessage); } protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) { if (this.replyQueue == null) { return doSendAndReceiveWithTemporary(exchange, routingKey, message); } else { return doSendAndReceiveWithFixed(exchange, routingKey, message); } }
到这里我们会看到,有一个分支如果replyqueue不为空则是走另外的一个方法,因为之前没有设置replyqueue所以,这里会
走第一步方法,也就是doSendAndReceiveWithTemporary
来看一下这个方法源码
protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) { return this.execute(new ChannelCallback<Message>() { @Override public Message doInRabbit(Channel channel) throws Exception { final ArrayBlockingQueue<Message> replyHandoff = new ArrayBlockingQueue<Message>(1); Assert.isNull(message.getMessageProperties().getReplyTo(), "Send-and-receive methods can only be used if the Message does not already have a replyTo property."); DeclareOk queueDeclaration = channel.queueDeclare(); String replyTo = queueDeclaration.getQueue(); message.getMessageProperties().setReplyTo(replyTo); String consumerTag = UUID.randomUUID().toString(); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties( properties, envelope, encoding); Message reply = new Message(body, messageProperties); if (logger.isTraceEnabled()) { logger.trace("Message received " + reply); } try { replyHandoff.put(reply); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }; channel.basicConsume(replyTo, true, consumerTag, true, true, null, consumer); doSend(channel, exchange, routingKey, message, null); Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout, TimeUnit.MILLISECONDS); channel.basicCancel(consumerTag); return reply; } }); }
这里流程就是申明一个大小为1的临时队列,然后发送消息,然后监听返回的消息,放到临时队列,然后取出返回消息。
那么因为每次都会创建临时队列,所以对性能是个考验那么有第二种方式,在rabbitmq中申明一个返回队列,用来存放该服务的返回消息。
那么需要在spring配置文件中配置一个reply队列
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="reply"/>
然后在消息监听容器中再配置一个发送消息的模板template为消费者
<!-- 消费者容器 --> <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto"> <rabbit:listener queues="reply" ref="amqTemplate"/> <rabbit:listener queues="rpc.bao.goods" ref="msgLisenerAdapter"/> </rabbit:listener-container>
最后再发送消息的实现中即SendRabbitMsgImp类中注入队列
@Autowired @Qualifier("reply") private Queue reply ;
然后设置template的replyqueue为reply ;template.setReplyQueue(reply);
这个设置代码可以再初始化方法中,也可以再发送消息之前,其实最好的实在spring中设置
那么该说原理了,我们可以看最开始发送消息的第二个方法
protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) { if (this.replyQueue == null) { return doSendAndReceiveWithTemporary(exchange, routingKey, message); } else { return doSendAndReceiveWithFixed(exchange, routingKey, message); } }
protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message) { return this.execute(new ChannelCallback<Message>() { @Override public Message doInRabbit(Channel channel) throws Exception { final PendingReply pendingReply = new PendingReply(); String messageTag = UUID.randomUUID().toString(); RabbitTemplate.this.replyHolder.put(messageTag, pendingReply); // Save any existing replyTo and correlation data String savedReplyTo = message.getMessageProperties().getReplyTo(); pendingReply.setSavedReplyTo(savedReplyTo); if (StringUtils.hasLength(savedReplyTo) && logger.isDebugEnabled()) { logger.debug("Replacing replyTo header:" + savedReplyTo + " in favor of template's configured reply-queue:" + RabbitTemplate.this.replyQueue.getName()); } message.getMessageProperties().setReplyTo(RabbitTemplate.this.replyQueue.getName()); String savedCorrelation = null; if (RabbitTemplate.this.correlationKey == null) { // using standard correlationId property byte[] correlationId = message.getMessageProperties().getCorrelationId(); if (correlationId != null) { savedCorrelation = new String(correlationId, RabbitTemplate.this.encoding); } } else { savedCorrelation = (String) message.getMessageProperties() .getHeaders().get(RabbitTemplate.this.correlationKey); } pendingReply.setSavedCorrelation(savedCorrelation); if (RabbitTemplate.this.correlationKey == null) { // using standard correlationId property message.getMessageProperties().setCorrelationId(messageTag .getBytes(RabbitTemplate.this.encoding)); } else { message.getMessageProperties().setHeader( RabbitTemplate.this.correlationKey, messageTag); } if (logger.isDebugEnabled()) { logger.debug("Sending message with tag " + messageTag); } doSend(channel, exchange, routingKey, message, null); LinkedBlockingQueue<Message> replyHandoff = pendingReply.getQueue(); Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout, TimeUnit.MILLISECONDS); RabbitTemplate.this.replyHolder.remove(messageTag); return reply; } }); }
这个方法并没有申请临时队列,发送消息后直接再pendingReply中的队列中取,那么怎么放到pendingReply的队列中区的呢,可以看到,RabbitTemplate是实现了MessageLIstener,那么看他实现的onMessage方法
public void onMessage(Message message) { try { String messageTag; if (this.correlationKey == null) { // using standard correlationId property messageTag = new String(message.getMessageProperties().getCorrelationId(), this.encoding); } else { messageTag = (String) message.getMessageProperties() .getHeaders().get(this.correlationKey); } if (messageTag == null) { logger.error("No correlation header in reply"); return; } PendingReply pendingReply = this.replyHolder.get(messageTag); if (pendingReply == null) { if (logger.isWarnEnabled()) { logger.warn("Reply received after timeout for " + messageTag); } } else { // Restore the inbound correlation data String savedCorrelation = pendingReply.getSavedCorrelation(); if (this.correlationKey == null) { if (savedCorrelation == null) { message.getMessageProperties().setCorrelationId(null); } else { message.getMessageProperties().setCorrelationId( savedCorrelation.getBytes(this.encoding)); } } else { if (savedCorrelation != null) { message.getMessageProperties().setHeader(this.correlationKey, savedCorrelation); } else { message.getMessageProperties().getHeaders().remove(this.correlationKey); } } // Restore any inbound replyTo String savedReplyTo = pendingReply.getSavedReplyTo(); message.getMessageProperties().setReplyTo(savedReplyTo); LinkedBlockingQueue<Message> queue = pendingReply.getQueue(); queue.add(message); if (logger.isDebugEnabled()) { logger.debug("Reply received for " + messageTag); if (savedReplyTo != null) { logger.debug("Restored replyTo to " + savedReplyTo); } } } } catch (UnsupportedEncodingException e) { throw new AmqpIllegalStateException("Invalid Character Set:" + this.encoding, e); } }
这里就明白了,根据唯一id也就是前面说的correlationId找到消息的pendingReply,然后将返回的消息放到pendingReply的队列中,这样就实现了RPC的调用,下面附上工程代码