NSQ源码-Nsq客户端
看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的
认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责从lookupd中获取到
指定nsqd之后,从nsqd中获取消息。
生产者
我们以nsq/apps/to_nsq/to_nsq.go为例,客户端这边的代码逻辑就简单很多,NewProducer实例化一个instance,publish消息
到nsqd。
/// nsq/apps/to_nsq/to_nsq.go producer, err := nsq.NewProducer(addr, cfg) err := producer.Publish(*topic, line)
下面来看下Publish里的具体逻辑。
// Publish synchronously publishes a message body to the specified topic, returning // an error if publish failed func (w *Producer) Publish(topic string, body []byte) error { // 生成具体的cmd return w.sendCommand(Publish(topic, body)) }
func (w *Producer) sendCommand(cmd *Command) error { doneChan := make(chan *ProducerTransaction) err := w.sendCommandAsync(cmd, doneChan, nil) if err != nil { close(doneChan) return err } t := <-doneChan return t.Error }
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, args []interface{}) error { // keep track of how many outstanding producers we're dealing with // in order to later ensure that we clean them all up... atomic.AddInt32(&w.concurrentProducers, 1) defer atomic.AddInt32(&w.concurrentProducers, -1) if atomic.LoadInt32(&w.state) != StateConnected { // 这里是一个lazily connect err := w.connect() if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil }
在connect函数里启动了一个go routine去处理transactionChan对应的东西
func (w *Producer) connect() error { w.closeChan = make(chan int) w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id)) _, err := w.conn.Connect() w.wg.Add(1) go w.router()
这里需要注意一下, go-nsq/conn.go是对底层连接的一个抽象,他是不关心你是生产者还是消费者,这里使用到了
delegate 模式,conn.go收到消息的处理放到了producerConnDelegate和consumerConnDelegate中,然后通知到具体的
消费者活着生产者。
消费者
回过头我们再来看下消费者部分的代码,client端我们以nsq/apps/nsq_tail/nsq_tail.go为例,代码的基本逻辑如下:
// 1. new comsunmer instanace consumer, err := nsq.NewConsumer(topics[i], *channel, cfg) // 2. add handler consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages}) // 3. connect to nsqd consumer.ConnectToNSQDs(nsqdTCPAddrs) if err != nil { log.Fatal(err) } // 4. connect to lookupd err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs) if err != nil { log.Fatal(err) } consumers = append(consumers, consumer)
下面来看下每个部分的实际代码:
func (r *Consumer) AddHandler(handler Handler) { r.AddConcurrentHandlers(handler, 1) }
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) { if atomic.LoadInt32(&r.connectedFlag) == 1 { panic("already connected") } atomic.AddInt32(&r.runningHandlers, int32(concurrency)) for i := 0; i < concurrency; i++ { go r.handlerLoop(handler) } }
至此handler添加完成,起一个单独的go routine来等待消息的到了。
func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { message, ok := <-r.incomingMessages // 有新的消息的到来 if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) // 调用之前注册的handler if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) } continue } if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } }
官方是不推荐只部署nqd而不部署lookupd的,我们直接看下lookup的连接过程:
func (r *Consumer) ConnectToNSQLookupd(addr string) error { ... r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr) numLookupd := len(r.lookupdHTTPAddrs) r.mtx.Unlock() // if this is the first one, kick off the go loop if numLookupd == 1 { r.queryLookupd() r.wg.Add(1) go r.lookupdLoop() } return nil }
在queryLookupd中先去查询lookupd获取最新的nqd地址,然后connect to nsqd.
func (r *Consumer) lookupdLoop() { // add some jitter so that multiple consumers discovering the same topic, // when restarted at the same time, dont all connect at once. ticker = time.NewTicker(r.config.LookupdPollInterval) // 每个ticker interval更新nqd的地址信息 for { select { case <-ticker.C: r.queryLookupd() case <-r.lookupdRecheckChan: r.queryLookupd() case <-r.exitChan: goto exit } } }
func (r *Consumer) ConnectToNSQD(addr string) error { // 1. new connection conn := NewConn(addr, &r.config, &consumerConnDelegate{r}) conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel)) // 2. connection list _, pendingOk := r.pendingConnections[addr] _, ok := r.connections[addr] r.pendingConnections[addr] = conn if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 { r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr) } r.log(LogLevelInfo, "(%s) connecting to nsqd", addr) // 3. new connect // 3.1 go c.readLoop() // 3.2 go c.writeLoop() resp, err := conn.Connect() // 4. sub to nsqd cmd := Subscribe(r.topic, r.channel) err = conn.WriteCommand(cmd) }
以上就是客户端初始化的一个流程,然后就是接受消息处理了。
->NewConsumer() // 新建一个consumer ->ConnectToNSQLookupds() // 连接到lookupd |->ConnectToNSQLookupd() // 连接到lookupd |->r.queryLookupd() // 查询lookupd的 |->apiRequestNegotiateV1() // 调用lookupd的rest api获取nsqd消息 |->ConnectToNSQD() // 连接到具体nsq |->NewConn() // 连接instance |->conn.Connect() // 开始连接 |->c.readLoop() // 与nqd连接read loop |->c.writeLoop() // 与nqd连接write loop |->Subscribe() // consumer发送SUB command |->lookupdLoop() // 定时查询lookupd并更新nsqd信息
注:
[1]. 关于delegate模式参考 这里