白话RabbitMQ(二): 任务队列
推广
RabbitMQ专题讲座
CoolMQ开源项目
我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面
前言
在第一篇中我们描述了如何最简单的RabbitMQ操作,如何发送、接受消息。在今天这篇文章中我们将描述如何创建一个任务队列,来将高耗时的任务分发到多个消费者,从而提高处理效率。
任务队列最主要的功能就是解耦高耗时的操作,否则程序会一直等在那里,浪费大量资源。反之我们会把这个操作交给队列,让它延后再做。我们将任务封装成一个消息发送给队列,后台的任务进程会得到这个任务并执行它,而且可以配置多个任务进程,进一步加大吞吐率。
特别是对于网络请求,一次短短的HTTP请求是要求迅速响应的,不可能让它一直停顿在高耗时操作上。
准备工作
在第一章中我们发送了“Hello World!”。现在来完成更复杂一点的,因为这里并没有真正的高耗时操作,比如缩放图像或输出一个pdf。因此我们只是用Thread.sleep()来假装我们很繁忙,而且会用"."来表示需要停顿的秒数,比如一个叫Hello...的任务将停顿3秒钟。
我们简单的更改下Send.java,称之为 NewTask.java.
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
然后是工具类
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
当然,我们的Recv.java也需要进行一些改造,它需要对每一个"."停顿1秒,Work.java如下
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
编译上面这些代码
javac -cp $CP NewTask.java Worker.java
轮询调度
任务队列的一个最大优点是可以并行工作,能够非常容易的水平扩张。
首先,让我们同时运行两个工作线程,他们能够同时从队列获取消息。我们也需要同时开启3个console:1个生产者,2个消费者
消费者C1
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
消费者C2
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
让我们运行生产者
# shell 3 java -cp $CP NewTask # => First message. java -cp $CP NewTask # => Second message.. java -cp $CP NewTask # => Third message... java -cp $CP NewTask # => Fourth message.... java -cp $CP NewTask # => Fifth message.....
让我们看看消费者们
消费者C1
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
消费者C2
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message..
RabbitMQ默认有序的将会发送消息给下一个消费者,所以每一个消费者都会得到相同数量的消息,这种方式就叫做轮询调度(round-robin),你可以尝试下更多的消费者
消息确认
一个任务可能非常耗时,如果消费者在做一个高耗时任务时挂掉了,我们将会丢失所有发送到这个消费者上的消息。这是非常不可取的,所以我们希望能够明确的知道消息是否消费成功,如果一个消费挂了,我们能够知道,并且将消息发送给下一个消费者。
为了确保消息不丢失,RabbitMQ支持消息确认。收到消息后消费者会给RabbitMQ服务器发送一个ack(我已经收到消息了),RabbitMQ就会在服务上删除这个消息了。
如果一个消费者挂了(连接关闭,channel关闭,或者是TCP连接丢失)而没有发送ack,RabbitMQ就会知道消息并没有消费成功,于是乎消息会被放到消息队列重新消费。如果此时还有其它消费者的话,消息会发送给其它消费者来消费,确保消息不会丢失
消息并没有超时时间这个概念,消息只会在消费者挂掉了时候重发,即使是一个非常非常耗时的的消费者也不会发生重发
手动消息确认(Manual message acknowledgments)默认是打开的,虽然我们之前关闭了它:autoAck=true。让我们先将它设置为false
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
这样一来,即使你使用CTRL+C强制杀死了一个消费者,消费者所丢失的消息也将会被重发,会被另一个消费者所接受并消费。
忘记应答
很容易犯忘记应答的错误,但会导致非常严重的后果。Messages会被重发,RabbitMQ会消耗越来越多的内存因为unacked的消息无法释放(甚至更严重,RabbitMQ内部维护了一个最大打开线程数,如果太多的消息没有应答,RabbitMQ甚至会整个崩溃掉)
你可以用Rabbitmqctl查看未被应答的消息数
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化
我们现在知道了可以通过应答来保证消息不丢失,但万一RabbitMQ挂了呢?还是可能会导致消息丢失。因此我们可以通过持久化的机制,包括将队列以及队列中的消息持久化的方式,来保证即便RabbitMQ挂了,当它重启的时候,队列以及消息也能够恢复
首先做队列的持久化,声明队列为durable
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
但很可惜的是,这种声明方式并不适用与上面的方法,因为我们已经将“Hello”定义为一个非持久化的队列了,是不能再将他改为持久化的,如果这样做,将会直接返回一个error信息。所以,我们需要重新再定义一个队列
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
在保证队列的持久化后需要保证消息的持久化-将消息设置为PERSISTENT_TEXT_PLAIN
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
公平分发
但这样还是存在问题:假设有如下的情形,一个消费者非常耗时,而一个消费者非常快,由于消息都是公平的发送,所以它们都是接收到相同数量的消息,会导致一个消费者非常忙碌,而另外一个消费者非常空闲,而RabbitMQ无法得知这一点。
为了解决这个缺陷我们引入了basicQos方法以及prefetchCount =1的设置。这会告诉RabbitMQ一次只给消费者一个消息:如果这个消息未确认,将不会发送新的消息,从而它会将消息发送给其它并不那么忙的消费者
int prefetchCount = 1; channel.basicQos(prefetchCount);
留意queue size
如果所有的消费者都非常忙,队列可能会很快被填满,所以你需要留意这一点,要么增加更多的消费者,或者采取其它的策略。
整合
NewTask.java
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用消息确认和prefetchCount你就能设置一个持久化队列了,同时,使用durable和persist,,即使RabbitMQ挂掉了,重启后也能够重发消息