RabbitMQ官方教程二 Work Queues(GOLANG语言实现)
RabbitMQ官方教程二 Work Queues(GOLANG语言实现)
在第一个教程中,我们编写了程序来发送和接收来自命名队列的消息。 在这一部分中,我们将创建一个工作队列,该队列将用于在多个worker之间分配耗时的任务。
工作队列(又称任务队列)的主要思路是避免立即执行资源密集型任务(比如耗时较长的邮件发送、文件处理等),而不得不等待它完成。 相反,我们安排任务在以后完成(异步完成)。 我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。 当您运行许多worker时,他们将共享任务。
这个概念在Web应用程序中特别有用,因为在Web应用程序中,不可能在较短的HTTP请求时间内处理复杂的任务。
准备
在本教程的前一部分,我们发送了一条包含“ Hello World!”的消息。 现在,我们将发送代表复杂任务的字符串。 我们没有真实的任务,例如要调整大小的图像或要渲染的pdf文件,因此我们假装耗时任务-使用time.Sleep函数来伪造它。 我们将字符串中的点数作为它的复杂度。 每个点将占“工作”的一秒。 例如,Hello ...描述的虚拟任务将花费三秒钟。
我们将稍微修改上一个示例中的send.go代码,以允许从命令行发送任意消息。 该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go:
#new_task.go package main import ( "github.com/streadway/amqp" "log" "os" "strings" ) func main(){ // 连接RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest::5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "hello", // 队列名称 false, // 是否持久化 false, // 是否自动删除 false, // 是否独立 false,nil, ) failOnError(err, "Failed to declare a queue") body := bodyForm(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { DeliveryMode:amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyForm(args []string) string{ var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s } // 帮助函数检测每一个amqp调用 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
#worker.go package main import ( "bytes" "github.com/streadway/amqp" "log" "time" ) func main(){ // 连接RabbitMQ服务器 conn, err := amqp.Dial("amqp://guest::5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 创建一个channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 监听队列 q, err := ch.QueueDeclare( "hello", // 队列名称 false, // 是否持久化 false, // 是否自动删除 false, // 是否独立 false,nil, ) failOnError(err, "Failed to declare a queue") // 消费队列 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) // 统计string中的`.`来表示执行时间 dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever } // 帮助函数检测每一个amqp调用 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
开启两个worker准备接受队列中数据
# shell 1 go run worker.go # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 go run worker.go # => [*] Waiting for messages. To exit press CTRL+C
执行new_task.go进行发送数据
# shell 3 go run new_task.go First message. go run new_task.go Second message.. go run new_task.go Third message... go run new_task.go Fourth message.... go run new_task.go Fifth message.....
得到结果如下
默认情况下,RabbitMQ将每个消息依次发送给下一个消费者。 平均而言,每个消费者都会收到相同数量的消息。 这种分发消息的方式称为循环。
消息确认
执行任务可能需要几秒钟。 您可能想知道,如果其中一个消费者开始一项漫长的任务而仅部分完成而死掉,会发生什么情况。 使用我们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便立即将其标记为删除。 在这种情况下,如果您杀死一个worker,我们将丢失正在处理的消息。 我们还将丢失所有发送给该特定工作人员但尚未处理的消息。
但是我们不想丢失任何任务。 如果一个worker死亡,我们希望将任务交付给另一个worker。
为了确保消息永不丢失,RabbitMQ支持消息确认。 消费者发送回一个消息确认,告知RabbitMQ特定的消息已被接收,处理,并且RabbitMQ可以自由删除它。
如果使用者宕机(其通道已关闭,连接已关闭或TCP连接丢失)而没有发送确认,RabbitMQ将了解消息未完全处理,并将重新排队。 如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。 这样,您可以确保即使worker偶尔宕机也不会丢失任何消息。
RabbitMQ没有任何消息超时设置; 消费者死亡时,RabbitMQ将重新传递消息。 即使处理一条消息花费非常非常长的时间也没关系。
在本教程中,我们将通过为“ auto-ack”参数传递一个false来使用手动消息确认,然后一旦在任务完成后使用d.Ack(false)从worker发送适当的确认。
#worker.go修改部门代码 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // 是否自动消息确认 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
使用此代码,我们可以确保,即使您在处理消息时使用CTRL + C杀死worker,也不会丢失任何信息。 worker后不久,所有未确认的消息将重新发送。
消息确认必须与消息发送在同一channel上。 尝试使用其他channel进行确认将导致通道级协议异常。
忘记消息确认
忘记设置消息确认是一个常见的错误。 这是一个很容易犯的错误,但是后果很严重。 当您的客户端退出时,消息将被重新发送(可能看起来像是随机重新发送),但是RabbitMQ将占用越来越多的内存,因为它将无法释放任何未确认的消息。
为了调试这种错误,您可以使用rabbitmqctl打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息持久化
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是,如果RabbitMQ服务器停止,我们的任务仍然会丢失。
RabbitMQ退出或宕机时,它将丢失队列和消息,除非您告知不要这样做。 要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。 为此,我们需要将其声明为持久的:
q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
尽管此命令本身是正确的,但在我们当前的设置中将无法使用。 这是因为我们已经定义了一个名为hello的队列,该队列并不持久。 RabbitMQ不允许您使用不同的参数重新定义现有队列,并且将向尝试执行此操作的任何程序返回错误。 但是有一个快速的解决方法-让我们声明一个名称不同的队列,例如task_queue:
q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
这种持久的选项更改需要同时应用于生产者代码和消费者代码。
在这一点上,我们确保即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在,我们需要使用amqp.Persistent选项amqp.Publishing来将消息标记为持久消息。
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing { DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), })
关于消息持久性的说明
将消息标记为持久性并不能完全保证不会丢失消息。 尽管它告诉RabbitMQ将消息保存到磁盘,但是RabbitMQ接受消息并且尚未保存消息时,还有很短的时间。 而且,RabbitMQ不会对每条消息都执行fsync(2)-它可能只是保存到缓存中,而没有真正写入磁盘。 持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。 如果您需要更强有力的保证,则可以使用publisher confirms。
公平分发
您可能已经注意到,调度仍然无法完全按照我们的要求进行。 例如,在有两名worker的情况下,当所有奇数的消息都很重,偶数消息很轻时,一个worker将一直忙碌而另一位worker将几乎不做任何工作。 但是,RabbitMQ对此一无所知,并且仍将平均分配消息。
发生这种情况是因为RabbitMQ在消息进入队列时才调度消息。 它不会查看使用者的未确认消息数。 它只是盲目地将每第n条消息发送给第n个使用者。
为了解决这个问题,我们可以将预取计数的值设置为1。这告诉RabbitMQ一次不要给一个worker发送多条消息。 换句话说,在处理并确认上一条消息之前,不要将新消息发送给worker。 而是将其分派给不忙的下一个worker。
err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS")
关于队列大小的注意事项
如果所有工作人员都忙,您的队列就满了。 您将需要留意这一点,也许会增加更多的工作人员,或者有其他一些策略。
最终代码
new_task.go
package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest::5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
worker.go
package main import ( "bytes" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest::5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }