golang rabbitmq的使用(五)
先说一个实际的业务场景:
Client端有一个请求需要进行耗时处理或者查询,这个处理在Server端做。Server 端处理完后通知给请求的Client端。
这种场景可以称之为RPC(Remote Procedure Call)
有两个点说明一下:
- <1>Client端发送请求给Server端可以简单定义一个Queue。Client作为Producer发布消息,Server端作为Cosumer消费消息
<2>Server端处理完耗时处理后需要将处理结果返回给请求的客户端。
- 可以在Client声明一个不指定名称的Queue,系统会自动生成一个随机名称的Queue。将Queue的名称在publish是发送给Server端
- 因为Server端要将处理结果返回给对应的请求,所以在Client端需要生成一个CorrelationId发送给Server端
处理流程
Client端
(1)声明从Server返回消息用的queue
respQueue, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // noWait nil, )
(2)发送请求消息到rpc_queue
err = ch.Publish( "", //exchange config.QUEUENAME, //routing key false, false, amqp.Publishing{ ContentType: "text/plain", CorrelationId: correlationID, ReplyTo: respQueue.Name, Body: []byte(msgBody), })
corrId为自己随机生成的Id
(2)Server端
(3)声明rpc_queue,从rpc_queue中消费消息
q, err := ch.QueueDeclare( config.QUEUENAME, false, false, false, false, nil, ) msgs, err := ch.Consume( q.Name, "", false, // auto ack false, false, false, nil, )
(4)执行处理后使用msg中的ReplyTo返回处理结果给Client
err = ch.Publish( "", msg.ReplyTo, false, false, amqp.Publishing{ ContentType: "text/plain", CorrelationId: msg.CorrelationId, Body: []byte(bookName), }) msg.Ack(false)
(5)Client端从reply queue中接收从Server端来的response
respMsgs, err := ch.Consume( respQueue.Name, "", true, // auto-ack true, // exclusive false, // noLocal false, // nowait nil, )
详细代码如下
conf.go
package config const ( RMQADDR = "amqp://guest:[email protected]:5672/" QUEUENAME = "rpc_queue" SERVERINSTANCESCNT = 5 )
client.go
package main import ( config "binTest/rabbitmqTest/t1/l6/conf" "fmt" "log" "math/rand" "os" "github.com/streadway/amqp" ) func main() { if len(os.Args) < 2 { log.Println("Arguments error") return } conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() msgBody := os.Args[1] respQueue, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // noWait nil, ) failOnError(err, "Failed to declare a response queue") correlationID := randomID(32) err = ch.Publish( "", //exchange config.QUEUENAME, //routing key false, false, amqp.Publishing{ ContentType: "text/plain", CorrelationId: correlationID, ReplyTo: respQueue.Name, Body: []byte(msgBody), }) log.Printf(" [x] Sent %s", msgBody) failOnError(err, "Failed to publish a message") respMsgs, err := ch.Consume( respQueue.Name, "", true, // auto-ack true, // exclusive false, // noLocal false, // nowait nil, ) for item := range respMsgs { if item.CorrelationId == correlationID { fmt.Println("response:", string(item.Body)) break } } } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } } func randomID(length int) string { if length <= 0 { return "" } bytes := make([]byte, length) for i := 0; i < length; i++ { bytes[i] = byte(rand.Intn(9)) } return string(bytes) }
server.go
package main import ( config "binTest/rabbitmqTest/t1/l6/conf" "fmt" "log" "math/rand" "time" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.Qos( config.SERVERINSTANCESCNT, 0, false, ) forever := make(chan bool) for routine := 0; routine < config.SERVERINSTANCESCNT; routine++ { go func(routineNum int) { q, err := ch.QueueDeclare( config.QUEUENAME, false, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, "", false, // auto ack false, false, false, nil, ) for msg := range msgs { log.Printf("In %d start consuming message: %s\n", routineNum, msg.Body) bookName := queryBookID(string(msg.Body)) err = ch.Publish( "", msg.ReplyTo, false, false, amqp.Publishing{ ContentType: "text/plain", CorrelationId: msg.CorrelationId, Body: []byte(bookName), }) if err != nil { fmt.Println("Failed to reply msg to client") } else { fmt.Println("Response to client:", bookName) } msg.Ack(false) } }(routine) } <-forever } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } } func queryBookID(bookID string) string { bookName := "QUERIED_" + bookID time.Sleep(time.Duration(rand.Intn(9)) * time.Second) return bookName }
执行效果
Client端
Server端
全部代码可以在如下处取得
https://github.com/BinWang-sh...
相关推荐
zhuxue 2020-10-14
shenzhenzsw 2020-10-09
shyoldboy 2020-09-27
leihui00 2020-09-16
lishijian 2020-08-17
程序员伊成 2020-08-06
ljcsdn 2020-07-27
waitzkj 2020-07-25
powrexly 2020-07-20
liym 2020-07-20
zhoucheng0 2020-07-19
shenzhenzsw 2020-07-18
woaishanguosha 2020-07-18
waitzkj 2020-07-18
zhoucheng0 2020-07-08
json0000 2020-07-04
NVEFLY 2020-07-04
OnMyHeart 2020-07-04