etcd启动流程分析

etcd总共有两种模式。一种是proxy,一种是作为kvstore,这里主要记录了etcd作为kvstore的启动流程,etcd的启动入口在etcd.go/startEtcd,本章先粗略的描述下启动流程的重要环节,后面在详细描述没有重要环节里面的键步骤。

  1. 首先为各个peer建立net.Listener,用于后续监听各个peer的连接。
// 存储peers net.Listener的数组
plns := make([]net.Listener, 0)
for _, u := range cfg.lpurls {
 if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
 plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
 }
 var l net.Listener
 l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
 if err != nil {
 return nil, err 
 }
 urlStr := u.String()
 plog.Info("listening for peers on ", urlStr)
 defer func() {
 if err != nil {
 l.Close()
 plog.Info("stopping listening for peers on ", urlStr)
 }
 }()
 // 把建立好的net.Listener放到数组里面
 plns = append(plns, l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  1. 为客户端建立net.Listener,用于监听客户端的连接。
clns := make([]net.Listener, 0)
for _, u := range cfg.lcurls {
 if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
 plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
 }
 var l net.Listener
 l, err = net.Listen("tcp", u.Host)
 if err != nil {
 return nil, err
 }
 if fdLimit, err := runtimeutil.FDLimit(); err == nil {
 if fdLimit <= reservedInternalFDNum {
 plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
 }
 l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
 }
 // Do not wrap around this listener if TLS Info is set.
 // HTTPS server expects TLS Conn created by TLSListener.
 l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
 if err != nil {
 return nil, err
 }
 urlStr := u.String()
 plog.Info("listening for client requests on ", urlStr)
 defer func() {
 if err != nil {
 l.Close()
 plog.Info("stopping listening for client requests on ", urlStr)
 }
 }()
 clns = append(clns, l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  1. 新建一个etcdserver对象,并启动EtcdServer
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
 // 生成一个存储etcd目录结构的对象
 st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var (
 // 管理etcd预写式日志的对象
 w *wal.WAL
 // 代表raft算法的中的一个机器节点,主要用于借助raft算法以及peer,完成各类事务的提交
 n raft.Node
 s *raft.MemoryStorage
 id types.ID
 cl *cluster
) 
// ...
// 是否存储预写式日志文件,如果存在的会对snap已经wal进行加载,回复etcdserver重启前的状态
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
if err != nil {
 return nil, err
}
var remotes []*Member
switch {
 // ...
 // 这里以有WAL进行记录
case haveWAL:
 if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
 return nil, fmt.Errorf("cannot write to member directory: %v", err)
 }
 if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
 return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
 }
 if cfg.ShouldDiscover() {
 plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
 }
 var snapshot *raftpb.Snapshot
 var err error
 snapshot, err = ss.Load()
 if err != nil && err != snap.ErrNoSnapshot {
 return nil, err
 }
 if snapshot != nil {
 if err := st.Recovery(snapshot.Data); err != nil {
 plog.Panicf("recovered store from snapshot error: %v", err)
 }
 plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
 }
 cfg.Print()
 // 借助于wal日志以及snapshot把该etcdserverh中存储的数据恢复到重启前的状态,然后启动raftNode的run方法
 if !cfg.ForceNewCluster {
 id, cl, n, s, w = restartNode(cfg, snapshot)
 } else {
 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
 }
 cl.SetStore(st)
 cl.Recover()
default:
 return nil, fmt.Errorf("unsupported bootstrap config")
}
// ...
srv := &EtcdServer{
 cfg: cfg,
 snapCount: cfg.SnapCount,
 errorc: make(chan error, 1),
 store: st,
 // server提交写请求主要通过raftNode.Propose方法,后续交互raft算法内部逻辑完成。
 r: raftNode{
 Node: n,
 ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
 raftStorage: s,
 storage: NewStorage(w, ss),
 },
 id: id,
 attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
 cluster: cl,
 stats: sstats,
 lstats: lstats,
 SyncTicker: time.Tick(500 * time.Millisecond),
 peerRt: prt,
 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
 forceVersionC: make(chan struct{}),
 msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
tr := &rafthttp.Transport{
 TLSInfo: cfg.PeerTLSInfo,
 DialTimeout: cfg.peerDialTimeout(),
 ID: id,
 URLs: cfg.PeerURLs,
 ClusterID: cl.ID(),
 Raft: srv,
 Snapshotter: ss,
 ServerStats: sstats,
 LeaderStats: lstats,
 ErrorC: srv.errorc,
 V3demo: cfg.V3demo,
}
if err := tr.Start(); err != nil {
 return nil, err
}
// add all remotes into transport
for _, m := range remotes {
 if m.ID != id {
 tr.AddRemote(m.ID, m.PeerURLs)
 }
}
for _, m := range cl.Members() {
 if m.ID != id {
 // 这个AddPeer的过程中主要完成以下关键两步(针对单个peer):
 // (1)启动StreamReader用于连接到peer并随时准备从peer接收消息,启动消息处理goroutine,然后通过EtcdServer.Process把消息交给Server处理
 // (2)启动StreamWriter用于向peer发送消息
 tr.AddPeer(m.ID, m.PeerURLs)
 }
}
srv.r.transport = tr
return srv, nil
}
// EtcdServer.Start主要完成两个关键步骤:
// (1)启动raftNode,作为EtcdServer与raft.node间的数据交互以及事务提交的中介
// (2)启动自身run,主要raftNode.apply()管道中接收已经被大部分集群节点commit的请求,然后在自身提交
s.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
  1. 启动设置客户端以及peer的请求处理handler并启动相关HTTP监听服务
ch := &cors.CORSHandler{
 // etcdhttp/client.go
 Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
 Info: cfg.corsInfo,
}
// streamHandler监听peer的连接,收到peer的连接conn后通过peer.attachOutgoingConn把conn和peer的streamWriter关联起来,用于后续向该peer发送消息。
ph := etcdhttp.NewPeerHandler(s)
for _, l := range plns {
 go func(l net.Listener) {
 plog.Fatal(serveHTTP(l, ph, 5*time.Minute))
 }(l)
}
// Start a client server goroutine for each listen address
for _, l := range clns {
 go func(l net.Listener) {
 plog.Fatal(serveHTTP(l, ch, 0))
 }(l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  1. 在etcdhttp/client.go文件中NewClientHandler的主要代码如下:
mux := http.NewServeMux()
 mux.HandleFunc("/", http.NotFound)
 mux.Handle(healthPath, healthHandler(server))
 mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion))
 // 处理key-value相关操作
 mux.Handle(keysPrefix, kh) 
 mux.Handle(keysPrefix+"/", kh) 
 // ...其他handler
 //处理成员管理相关操作
 mux.Handle(membersPrefix, mh) 
 mux.Handle(membersPrefix+"/", mh) 
 mux.Handle(deprecatedMachinesPrefix, dmh)
 // 路线前缀相关值如下,可以在etcdserver启动后通过http://ip:port/XXXPrefix 直接请求对应的handler
 const (
 authPrefix = "/v2/auth"
 keysPrefix = "/v2/keys"
 deprecatedMachinesPrefix = "/v2/machines"
 membersPrefix = "/v2/members"
 statsPrefix = "/v2/stats"
 varsPath = "/debug/vars"
 metricsPath = "/metrics"
 healthPath = "/health"
 versionPath = "/version"
 configPath = "/config"
 pprofPrefix = "/debug/pprof"
 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

NewPeerHandler最终对应文件rafthttp/transport.go中的代码如下:

func (t *Transport) Handler() http.Handler {
 pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
 // 这里我们主要关注streamHandler
 streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
 snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
 mux := http.NewServeMux()
 mux.Handle(RaftPrefix, pipelineHandler)
 mux.Handle(RaftStreamPrefix+"/", streamHandler)
 mux.Handle(RaftSnapshotPrefix, snapHandler)
 mux.Handle(ProbingPrefix, probing.NewHandler())
 return mux 
}
1
2
3
4
5
6
7
8
9
10
11
12

在文件rafthttp/http.go中streamHandler用于处理http的请求的关键代码如下:

func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // ...
 //获取请求来源peerid
 fromStr := path.Base(r.URL.Path)
 from, err := types.IDFromString(fromStr)
 // ...
 // 通过peerid拿到对应的peer对象
 p := h.peerGetter.Get(from)
 // ...
 w.WriteHeader(http.StatusOK)
 w.(http.Flusher).Flush()
 c := newCloseNotifier()
 conn := &outgoingConn{
 t: t,
 Writer: w,
 Flusher: w.(http.Flusher),
 Closer: c,
 }
 // 把conn和peer的StreamWriter对象关联起来
 p.attachOutgoingConn(conn)
 <-c.closeNotify()
}

etcd启动流程分析

相关推荐