【译】RabbitMQ系列(二)-Work模式
Work模式
在第一章中,我们写了通过一个queue来发送和接收message的简单程序。在这一章中,我们会创建一个workqueue,来将执行时间敏感的任务分发到多个worker中。
work模式主要的意图是要避免等待完成一个耗时的任务。取而代之地,我们延迟任务的执行,将任务封装成消息,将之发送到queue。一个运行着的worker进程会弹出这个任务并执行它。当运行多个worker进程时,任务会在它们之间分派。
这种模式在web应用中特别有用,因为在一个较短的HTTP请求窗口中不会去执行一个复杂的任务。
准备工作
在上一章中,我们发送了一个”Hello World!"的message。现在我们将发送一个代表了复杂任务的字符串。这不是一个实际的任务,比如像调整图片大小或是重新渲染pdf文档,我们通Thead.sleep() 来模拟一个耗时的任务。message中的小圆点表示其复杂度,圆点越多则任务的执行越耗时。比如“Hello..."的message将耗时3秒。
我们简单的修改上一章的Send.java代码,允许在命令行发送任意message。新的类叫做NewTask.java
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
同样的,我们修改上一章中的Recv.java,让它在处理message的时候根据小圆点进行睡眠。新的类叫Worker.java
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
像在第一章一样编译这两个类
javac -cp $CP NewTask.java Worker.java
Round-robin分派
使用Task模式的一个明显的优势是让并行执行任务变得简单。我们只需要启动更多的worker就可以消减堆积的message,系统水平扩展简单。
首先,我们在同一时间启动两个worker。他们都会从queue获得message,来看一下具体细节。
打开了三个终端,两个是跑worker的。
java -cp $CP Worker
java -cp $CP Worker
第三个终端里来发布新的任务message。
java -cp $CP NewTask First message. java -cp $CP NewTask Second message.. java -cp $CP NewTask Third message... java -cp $CP NewTask Fourth message.... java -cp $CP NewTask Fifth message.....
让我们看看worker的处理message的情况.第一个worker收到了第1,3,5message,第二个worker收到了第2,4个message。
默认情况下,RabbitMQ会顺序的将message发给下一个消费者。每个消费者会得到平均数量的message。这种方式称之为round-robin(轮询).
Message 确认
执行任务需要一定的时间。你可能会好奇如果一个worker开始执行任务,但是中途异常退出,会是什么结果。在我们现在的代码中,一旦RabbitMQ将消息发送出去了,它会立即将该message删除。这样的话,就可能丢失message。
在实际场景中,我们不想丢失任何一个task。如果一个worker异常中断了,我们希望这个task能分派给另一个worker。
为了确保不会丢失message,RabbitMQ采用message确认机制。RabbitMQ只有收到该message的Ack之后,才会删除该消息。
如果worker中断退出了( channel关闭了,connection关闭了,或是TCP连接丢失了)而没有发送Ack,RabbitMQ会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的worker。这样就不用message丢失,即使是在worker经常异常中断退出的场景下。
不会有任何message会timeout。当消费者中断退出,RabbitMQ会重新分派message。即使消息的执行会花费很长的时间。
默认情况下,message是需要人工确认的。在上面的例子中,我们通过autoAck=true来关闭了人工确认。像下面这样,我们将该标志设置为false,worker就需要在完成了任务之后,发送确认。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
上面的代码保证即使当worker还在处理一条消息,而强制它退出,也不会丢失message。然后不久,所有未被确认的消息都会被重新分派。
发送确认必须和接收相同的channel。使用不同的channel进行确认会导致channel-level protocol 异常。
忘记确认消息是一个比较常见的错误,但是其后果是很严重的。当client退出时,message会被重新分派,但是RabbitMQ会占用越来越多的内存,因它无法释放那些未被确认的message。
可以通过rabbitmqctl来打印messages_unacknowledged:
##linux sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ##windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Message 持久化
我们学习了在消费者出现问题的时候不丢失message。但是如果RabbitMQ服务器宕机了,我们还是会丢失message。
当RabbitMQ宕机时,默认情况下,它会”忘记“所有的queue和message。为了确保message不丢失,我们需要确认两件事情:我们要使得queue和message都是持久的。
首先,我们要确保RabbitMQ不会丢失我们设置好的queue。所以,我们要把它声明成持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然代码没有任何问题,但是光这样是无效的。因为我们之前已经定义过名字为hello的queue。RabbitMQ不允许你使用不同的参数去重新定义一个已经存在的queue,而且这还不会反悔任何错误信息。但是我们还是有别的方法,让我们使用一个别的名字,比如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
声明queue的改变要在生产者和消费者的代码里都进行修改。
接着我们要设置message的持久性,我们通过设置MessageProperties为PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
将message标记成持久的不能100%保证message不会丢失,虽然这告诉RabbitMQ将message保存到磁盘,然而在RabbitMQ从接到message到保存之间,仍然有一小段时间。同时RabbitMQ不会给每一条message执行fsync(2) -- 可能只是保存到了cache而没有写到磁盘上去。所以持久的保证也不是非常强,然后对我们简单的task queue来说则足够了。如果需要一个非常强的保证,则可以使用发布确认的方式。
Fair 分派
你可能已经注意到分派的工作没有如我们所期望的来执行。比如在有2个worker的情况系,所有偶数的message耗时很长,而所有奇数的message则耗时很短,这样其中一个worker则一直被分派到偶数的message,而另一个则一直是奇数的message。RabbitMQ对此并不知晓,进而继续这样分派着message。
这样的原因是RabbitMQ是在message入queue的时候确定分派的。它不关心消费者ack的情况。
我们可以通过basicQos方法和prefetchCount(1)来解决这个问题。这个设置是让RabbitMQ给worker一次一个message。或者这么说,直到worker处理完之前的message并发送ack,才给worker下一个message。否则,Rabbitmq会将message发送给其它不忙的worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意queue的大小。如果所有的worker都处于忙碌状态,queue可能会被装满。必须监控queue深度,可能要开启更多的worker,或者采取其他的措施。
开始执行
NewTask.java的最终版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
Worker.java的最终版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用message ack和prefetchCount,来设定work queue。持久化选项则在RabbitMQ重启后能让任务得以恢复。