etcd启动流程分析
etcd总共有两种模式。一种是proxy,一种是作为kvstore,这里主要记录了etcd作为kvstore的启动流程,etcd的启动入口在etcd.go/startEtcd,本章先粗略的描述下启动流程的重要环节,后面在详细描述没有重要环节里面的键步骤。
- 首先为各个peer建立net.Listener,用于后续监听各个peer的连接。
// 存储peers net.Listener的数组 plns := make([]net.Listener, 0) for _, u := range cfg.lpurls { if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() { plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } var l net.Listener l, err = rafthttp.NewListener(u, cfg.peerTLSInfo) if err != nil { return nil, err } urlStr := u.String() plog.Info("listening for peers on ", urlStr) defer func() { if err != nil { l.Close() plog.Info("stopping listening for peers on ", urlStr) } }() // 把建立好的net.Listener放到数组里面 plns = append(plns, l) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
- 为客户端建立net.Listener,用于监听客户端的连接。
clns := make([]net.Listener, 0) for _, u := range cfg.lcurls { if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() { plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String()) } var l net.Listener l, err = net.Listen("tcp", u.Host) if err != nil { return nil, err } if fdLimit, err := runtimeutil.FDLimit(); err == nil { if fdLimit <= reservedInternalFDNum { plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) } l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) } // Do not wrap around this listener if TLS Info is set. // HTTPS server expects TLS Conn created by TLSListener. l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo) if err != nil { return nil, err } urlStr := u.String() plog.Info("listening for client requests on ", urlStr) defer func() { if err != nil { l.Close() plog.Info("stopping listening for client requests on ", urlStr) } }() clns = append(clns, l) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
- 新建一个etcdserver对象,并启动EtcdServer
func NewServer(cfg *ServerConfig) (*EtcdServer, error) { // 生成一个存储etcd目录结构的对象 st := store.New(StoreClusterPrefix, StoreKeysPrefix) var ( // 管理etcd预写式日志的对象 w *wal.WAL // 代表raft算法的中的一个机器节点,主要用于借助raft算法以及peer,完成各类事务的提交 n raft.Node s *raft.MemoryStorage id types.ID cl *cluster ) // ... // 是否存储预写式日志文件,如果存在的会对snap已经wal进行加载,回复etcdserver重启前的状态 haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { return nil, err } var remotes []*Member switch { // ... // 这里以有WAL进行记录 case haveWAL: if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { return nil, fmt.Errorf("cannot write to WAL directory: %v", err) } if cfg.ShouldDiscover() { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } var snapshot *raftpb.Snapshot var err error snapshot, err = ss.Load() if err != nil && err != snap.ErrNoSnapshot { return nil, err } if snapshot != nil { if err := st.Recovery(snapshot.Data); err != nil { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) } cfg.Print() // 借助于wal日志以及snapshot把该etcdserverh中存储的数据恢复到重启前的状态,然后启动raftNode的run方法 if !cfg.ForceNewCluster { id, cl, n, s, w = restartNode(cfg, snapshot) } else { id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) } cl.SetStore(st) cl.Recover() default: return nil, fmt.Errorf("unsupported bootstrap config") } // ... srv := &EtcdServer{ cfg: cfg, snapCount: cfg.SnapCount, errorc: make(chan error, 1), store: st, // server提交写请求主要通过raftNode.Propose方法,后续交互raft算法内部逻辑完成。 r: raftNode{ Node: n, ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), }, id: id, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: cl, stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), peerRt: prt, reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), forceVersionC: make(chan struct{}), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } tr := &rafthttp.Transport{ TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.peerDialTimeout(), ID: id, URLs: cfg.PeerURLs, ClusterID: cl.ID(), Raft: srv, Snapshotter: ss, ServerStats: sstats, LeaderStats: lstats, ErrorC: srv.errorc, V3demo: cfg.V3demo, } if err := tr.Start(); err != nil { return nil, err } // add all remotes into transport for _, m := range remotes { if m.ID != id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range cl.Members() { if m.ID != id { // 这个AddPeer的过程中主要完成以下关键两步(针对单个peer): // (1)启动StreamReader用于连接到peer并随时准备从peer接收消息,启动消息处理goroutine,然后通过EtcdServer.Process把消息交给Server处理 // (2)启动StreamWriter用于向peer发送消息 tr.AddPeer(m.ID, m.PeerURLs) } } srv.r.transport = tr return srv, nil } // EtcdServer.Start主要完成两个关键步骤: // (1)启动raftNode,作为EtcdServer与raft.node间的数据交互以及事务提交的中介 // (2)启动自身run,主要raftNode.apply()管道中接收已经被大部分集群节点commit的请求,然后在自身提交 s.Start() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
- 启动设置客户端以及peer的请求处理handler并启动相关HTTP监听服务
ch := &cors.CORSHandler{ // etcdhttp/client.go Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()), Info: cfg.corsInfo, } // streamHandler监听peer的连接,收到peer的连接conn后通过peer.attachOutgoingConn把conn和peer的streamWriter关联起来,用于后续向该peer发送消息。 ph := etcdhttp.NewPeerHandler(s) for _, l := range plns { go func(l net.Listener) { plog.Fatal(serveHTTP(l, ph, 5*time.Minute)) }(l) } // Start a client server goroutine for each listen address for _, l := range clns { go func(l net.Listener) { plog.Fatal(serveHTTP(l, ch, 0)) }(l) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
- 在etcdhttp/client.go文件中NewClientHandler的主要代码如下:
mux := http.NewServeMux() mux.HandleFunc("/", http.NotFound) mux.Handle(healthPath, healthHandler(server)) mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion)) // 处理key-value相关操作 mux.Handle(keysPrefix, kh) mux.Handle(keysPrefix+"/", kh) // ...其他handler //处理成员管理相关操作 mux.Handle(membersPrefix, mh) mux.Handle(membersPrefix+"/", mh) mux.Handle(deprecatedMachinesPrefix, dmh) // 路线前缀相关值如下,可以在etcdserver启动后通过http://ip:port/XXXPrefix 直接请求对应的handler const ( authPrefix = "/v2/auth" keysPrefix = "/v2/keys" deprecatedMachinesPrefix = "/v2/machines" membersPrefix = "/v2/members" statsPrefix = "/v2/stats" varsPath = "/debug/vars" metricsPath = "/metrics" healthPath = "/health" versionPath = "/version" configPath = "/config" pprofPrefix = "/debug/pprof" ) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
NewPeerHandler最终对应文件rafthttp/transport.go中的代码如下:
func (t *Transport) Handler() http.Handler { pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) // 这里我们主要关注streamHandler streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) mux := http.NewServeMux() mux.Handle(RaftPrefix, pipelineHandler) mux.Handle(RaftStreamPrefix+"/", streamHandler) mux.Handle(RaftSnapshotPrefix, snapHandler) mux.Handle(ProbingPrefix, probing.NewHandler()) return mux } 1 2 3 4 5 6 7 8 9 10 11 12
在文件rafthttp/http.go中streamHandler用于处理http的请求的关键代码如下:
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ... //获取请求来源peerid fromStr := path.Base(r.URL.Path) from, err := types.IDFromString(fromStr) // ... // 通过peerid拿到对应的peer对象 p := h.peerGetter.Get(from) // ... w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() c := newCloseNotifier() conn := &outgoingConn{ t: t, Writer: w, Flusher: w.(http.Flusher), Closer: c, } // 把conn和peer的StreamWriter对象关联起来 p.attachOutgoingConn(conn) <-c.closeNotify() }
相关推荐
CurrentJ 2020-08-18
JustHaveTry 2020-07-17
Dannyvon 2020-07-13
Dannyvon 2020-07-04
###host字段指定授权使用该证书的etcd节点IP或子网列表,需要将etcd集群的3个节点都添加其中。cp etcd-v3.3.13-linux-amd64/etcd* /opt/k8s/bin/
xiunai 2020-07-04
breezegao 2020-07-02
微微一笑 2020-06-14
微微一笑 2020-06-12
CurrentJ 2020-06-06
lenchio 2020-06-04
微微一笑 2020-06-03
工作中的点点滴滴 2020-06-01
Rcvisual 2020-05-28
Dannyvon 2020-05-28
微微一笑 2020-05-26
wishli 2020-05-19
工作中的点点滴滴 2020-05-14