golang-nsq系列(二)--nsqd源码解析
上一篇初识了 nsq
三个模块(nsqd, nsqlookupd, nsqadmin
)的 demo
演示,本篇则从源码开始,一步一步去解析 nsqd
的执行流程和逻辑处理,学习别人优秀的项目架构,以期学以致用。
1. nsqd
执行入口
在 nsq/apps/nsqd/main.go
可以找到执行入口文件,如下:
2. nsqd
执行主逻辑源码
2.1 通过第三方 svc
包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start()
,启动 nsqd
实例;
func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { logFatal("%s", err) } } func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0]) return os.Chdir(dir) } return nil } func (p *program) Start() error { opts := nsqd.NewOptions() flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) ... }
2.2 初始化配置项(opts, cfg
),加载历史数据(nsqd.LoadMetadata
)、持久化最新数据(nsqd.PersistMetadata
),然后开启协程,进入 nsqd.Main()
主函数;
options.Resolve(opts, flagSet, cfg) nsqd, err := nsqd.New(opts) if err != nil { logFatal("failed to instantiate nsqd - %s", err) } p.nsqd = nsqd err = p.nsqd.LoadMetadata() if err != nil { logFatal("failed to load metadata - %s", err) } err = p.nsqd.PersistMetadata() if err != nil { logFatal("failed to persist metadata - %s", err) } go func() { err := p.nsqd.Main() if err != nil { p.Stop() os.Exit(1) } }()
2.3 初始化 tcpServer, httpServer, httpsServer
,然后循环监控队列信息(n.queueScanLoop
)、节点信息管理(n.lookupLoop
)、统计信息(n.statsdLoop
)输出;
tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf)) }) httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) }
2.4 分别处理 tcp/http
请求,开启 handler
协程进行并发处理,其中 newHTTPServer
注册路由采用了 Decorate
装饰器模式(后面会进一步解析);
http-Decorate
路由分发:
router := httprouter.New() router.HandleMethodNotAllowed = true router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf) router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf) router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf) s := &httpServer{ ctx: ctx, tlsEnabled: tlsEnabled, tlsRequired: tlsRequired, router: router, } router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1)) // v1 negotiate router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1)) router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1)) router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1)) // only v1 router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1)) router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
tcp-handler
处理:
for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } go handler.Handle(clientConn) }
2.5 tcp
解析 V2
协议,走内部协议封装的 prot.IOLoop(conn)
进行处理;
var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) return }
2.6 通过内部协议进行 p.Exec
(执行命令)、p.Send
(发送结果),保证每个 nsqd
节点都能正确的进行消息生成与消费,一旦上述过程有 error
都会被捕获处理,确保分布式投递的可靠性。
params := bytes.Split(line, separatorBytes) p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } }
3. nsqd
流程图小结
上述流程小结示意图如下:
【小结】从源码可以看到,代码逻辑清晰明了,利用 Go
协程高效并发处理分布式多节点 nsqd
的消息生产与消费,里面有很多细节有待下一步仔细剖析,学以致用。