白话RabbitMQ(六): RPC
推广
RabbitMQ专题讲座
CoolMQ开源项目
我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面
声明RPC接口
为了阐述RPC我们先建立一个客户端接口,它有一个方法,会发起一个RPC请求,而且会一直阻塞直到有结果返回
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
留意RPC
虽然RPC很常见,但一定要非常小心的使用它,假设rpc调用的是一个非常慢的程序,将导致结果不可预料,而且非常难以调试。
使用RPC时你可以参考下列一些规范
- 系统设计上要有详细的文档描述,使组件间的依赖讲清晰,做到有据可查
- 做好错误的异常处理,特别是当RPC服务挂掉或很长时间没有响应时
- 尽量少用RPC,而使用异步管道,而非阻塞式的RPC,降低系统间的耦合
回调队列(Callback queue)
用RabbitMQ实现RPC比较简单,客户端发起请求,服务端返回对这个请求的响应。为了实现这个功能我们需要一个能够"回调"的队列,我们直接用默认的队列即可
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息属性(Message properties)
AMQP 0-9-1 协议为每个消息定义了14个属性,很多属性很少会被用到,但我们要特别留意如下几个
- 分发模式(deliveryMode): 标记一个消息是否需要持久化(persistent)或者是需要事务(transient)等,在第二章中有描述
- 消息体类型(contentType): 描述消息中传递具体内容的编码方式,比如我们经常使用的JSON可以设置成:application/json
- 消息回应(replyTo):用于回调队列
- 关系Id(correlationId): 用于将RPC的返回值关联到对应的请求。
我们需要引入相应的包
import com.rabbitmq.client.AMQP.BasicProperties;
关系Id(Correlation Id)
在前面的方法中我们为每一个RPC请求都生成了一个队列,这是完全没有必要的,我们为每一个客户端建立一个队列就可以了。
这会引起一个新的问题,因为所有的RPC都是用一个队列,一旦有消息返回,你怎么知道返回的消息对应的是哪个请求呢?所以我们就用到了Correlation Id,作为每个请求独一无二的标识,当我们收到返回值后,会检查这个Id,匹配对应的响应。如果找不到Id所对应的请求,会直接抛弃它。
这里你可能会有疑问,为什么要抛弃掉未知消息呢?而不是抛出异常啥的。这跟我们服务端的竞态条件(possibility of a race condition )会有关系。比如假设我们RabbitMQ服务挂掉了,它刚给我们回复消息,还没等到回应,服务器就挂掉了,那么当RabbitMQ服务重启时,会重发消息,客户端会收到一条重复的消息,为了冥等性的考虑,我们需要仔细的处理返回后的处理方式。
小结
RPC工作过程如下
当客户端启动时,它会创建一个独立的匿名回调队列,然后发送RPC请求,这个RPC
请求会带两个属性:replyTo - RPC调用成功后需要返回的队列名称;correlationId - 每个请求独一无二的标识。RPC服务提供者会等在队列上,一旦有请求到达,它会立即响应,把自己的活干完,然后返回一个结果,根据replyTo返回到对应的队列。而客户端也会等着队列中的信息返回,一旦有一个消息出现,会检查correlationId,将结果返回给响应的请求发起者
整合
Fibonacci级数
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
我们定义个一个fibonacci级数,只能接受正整数,而且是效率不怎么高的那种。
rpc.java如下所示
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] argv) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; try { connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(properties.getCorrelationId()) .build(); String response = ""; try { String message = new String(body,"UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e){ System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(envelope.getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized(this) { this.notify(); } } } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized(consumer) { try { consumer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException | TimeoutException e) { e.printStackTrace(); } finally { if (connection != null) try { connection.close(); } catch (IOException _ignore) {} } } }
服务端的代码比较直接,首先建立连接,建立channel以及声明队列。我们之后可能会建立多个消费者,为了更好的负载均衡,需要在channel.basicQos中设置prefetchCount,然后设置一个basicConsume监听队列,提供一个回调函数来处理请求以及返回值
RPCClient.java
import com.rabbitmq.client.*; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); } public String call(String message) throws IOException, InterruptedException { String corrId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(corrId)) { response.offer(new String(body, "UTF-8")); } } }); return response.take(); } public void close() throws IOException { connection.close(); } //... }
客户端代码如下,我们建立一个连接,声明一个'callback'队列,我们将会往'callback'队列提交消息,并接收RPC的返回值,具体步骤如下:
我们首先生成一个唯一的correlation Id,并保存,我们将会使用它来区分之后所接受到的信息。然后发出这个消息,消息会包含两个属性: replyTo以及collelationId。因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列BlockingQueue是一种非常好的方式,这里我们使用了长度为1的ArrayBlockQueue,handleDelivery的功能是检查消息的的correlationId是不是我们之前所发送的,如果是,将返回值返回到BlockingQueue。此时主线程会等待返回并从ArrayBlockQueue取到返回值
从客户端发起请求
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
源代码参考RPCClient.java 和 RPCServer.java
编译
javac -cp $CP RPCClient.java RPCServer.java
我们的rpc服务端好了,启动服务
java -cp $CP RPCServer # => [x] Awaiting RPC requests
为了获取fibonacci级数我们只需要运行客户端:
java -cp $CP RPCClient # => [x] Requesting fib(30)
以上的实现方式并非建立RPC请求唯一的方式,但是它有很多优点:如果一个RPC服务过于缓慢,你可以非常方便的水平扩展,只需要增加消费者的个数即可,我们的代码还是比较简单的,有些负责的问题并未解决,比如
- 如果服务全部挂了,客户端要如何处理
- 如果服务超时该如何处理
- 非法信息该如何处理
基础章节的内容到此就结束了,到这里,你就能够基本明白消息队列的基本用法,接下来我们可以进入中级内容内容的学习了。