etcd启动流程分析
etcd总共有两种模式。一种是proxy,一种是作为kvstore,这里主要记录了etcd作为kvstore的启动流程,etcd的启动入口在etcd.go/startEtcd,本章先粗略的描述下启动流程的重要环节,后面在详细描述没有重要环节里面的键步骤。
- 首先为各个peer建立net.Listener,用于后续监听各个peer的连接。
// 存储peers net.Listener的数组
plns := make([]net.Listener, 0)
for _, u := range cfg.lpurls {
if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
var l net.Listener
l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
if err != nil {
return nil, err
}
urlStr := u.String()
plog.Info("listening for peers on ", urlStr)
defer func() {
if err != nil {
l.Close()
plog.Info("stopping listening for peers on ", urlStr)
}
}()
// 把建立好的net.Listener放到数组里面
plns = append(plns, l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23- 为客户端建立net.Listener,用于监听客户端的连接。
clns := make([]net.Listener, 0)
for _, u := range cfg.lcurls {
if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
}
var l net.Listener
l, err = net.Listen("tcp", u.Host)
if err != nil {
return nil, err
}
if fdLimit, err := runtimeutil.FDLimit(); err == nil {
if fdLimit <= reservedInternalFDNum {
plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
}
l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
}
// Do not wrap around this listener if TLS Info is set.
// HTTPS server expects TLS Conn created by TLSListener.
l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
if err != nil {
return nil, err
}
urlStr := u.String()
plog.Info("listening for client requests on ", urlStr)
defer func() {
if err != nil {
l.Close()
plog.Info("stopping listening for client requests on ", urlStr)
}
}()
clns = append(clns, l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35- 新建一个etcdserver对象,并启动EtcdServer
func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
// 生成一个存储etcd目录结构的对象
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
var (
// 管理etcd预写式日志的对象
w *wal.WAL
// 代表raft算法的中的一个机器节点,主要用于借助raft算法以及peer,完成各类事务的提交
n raft.Node
s *raft.MemoryStorage
id types.ID
cl *cluster
)
// ...
// 是否存储预写式日志文件,如果存在的会对snap已经wal进行加载,回复etcdserver重启前的状态
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
if err != nil {
return nil, err
}
var remotes []*Member
switch {
// ...
// 这里以有WAL进行记录
case haveWAL:
if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
return nil, fmt.Errorf("cannot write to member directory: %v", err)
}
if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
if cfg.ShouldDiscover() {
plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
}
var snapshot *raftpb.Snapshot
var err error
snapshot, err = ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
return nil, err
}
if snapshot != nil {
if err := st.Recovery(snapshot.Data); err != nil {
plog.Panicf("recovered store from snapshot error: %v", err)
}
plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
}
cfg.Print()
// 借助于wal日志以及snapshot把该etcdserverh中存储的数据恢复到重启前的状态,然后启动raftNode的run方法
if !cfg.ForceNewCluster {
id, cl, n, s, w = restartNode(cfg, snapshot)
} else {
id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
}
cl.SetStore(st)
cl.Recover()
default:
return nil, fmt.Errorf("unsupported bootstrap config")
}
// ...
srv := &EtcdServer{
cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
// server提交写请求主要通过raftNode.Propose方法,后续交互raft算法内部逻辑完成。
r: raftNode{
Node: n,
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
},
id: id,
attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
}
tr := &rafthttp.Transport{
TLSInfo: cfg.PeerTLSInfo,
DialTimeout: cfg.peerDialTimeout(),
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.ID(),
Raft: srv,
Snapshotter: ss,
ServerStats: sstats,
LeaderStats: lstats,
ErrorC: srv.errorc,
V3demo: cfg.V3demo,
}
if err := tr.Start(); err != nil {
return nil, err
}
// add all remotes into transport
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cl.Members() {
if m.ID != id {
// 这个AddPeer的过程中主要完成以下关键两步(针对单个peer):
// (1)启动StreamReader用于连接到peer并随时准备从peer接收消息,启动消息处理goroutine,然后通过EtcdServer.Process把消息交给Server处理
// (2)启动StreamWriter用于向peer发送消息
tr.AddPeer(m.ID, m.PeerURLs)
}
}
srv.r.transport = tr
return srv, nil
}
// EtcdServer.Start主要完成两个关键步骤:
// (1)启动raftNode,作为EtcdServer与raft.node间的数据交互以及事务提交的中介
// (2)启动自身run,主要raftNode.apply()管道中接收已经被大部分集群节点commit的请求,然后在自身提交
s.Start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127- 启动设置客户端以及peer的请求处理handler并启动相关HTTP监听服务
ch := &cors.CORSHandler{
// etcdhttp/client.go
Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
Info: cfg.corsInfo,
}
// streamHandler监听peer的连接,收到peer的连接conn后通过peer.attachOutgoingConn把conn和peer的streamWriter关联起来,用于后续向该peer发送消息。
ph := etcdhttp.NewPeerHandler(s)
for _, l := range plns {
go func(l net.Listener) {
plog.Fatal(serveHTTP(l, ph, 5*time.Minute))
}(l)
}
// Start a client server goroutine for each listen address
for _, l := range clns {
go func(l net.Listener) {
plog.Fatal(serveHTTP(l, ch, 0))
}(l)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18- 在etcdhttp/client.go文件中NewClientHandler的主要代码如下:
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(healthPath, healthHandler(server))
mux.HandleFunc(versionPath, versionHandler(server.Cluster(), serveVersion))
// 处理key-value相关操作
mux.Handle(keysPrefix, kh)
mux.Handle(keysPrefix+"/", kh)
// ...其他handler
//处理成员管理相关操作
mux.Handle(membersPrefix, mh)
mux.Handle(membersPrefix+"/", mh)
mux.Handle(deprecatedMachinesPrefix, dmh)
// 路线前缀相关值如下,可以在etcdserver启动后通过http://ip:port/XXXPrefix 直接请求对应的handler
const (
authPrefix = "/v2/auth"
keysPrefix = "/v2/keys"
deprecatedMachinesPrefix = "/v2/machines"
membersPrefix = "/v2/members"
statsPrefix = "/v2/stats"
varsPath = "/debug/vars"
metricsPath = "/metrics"
healthPath = "/health"
versionPath = "/version"
configPath = "/config"
pprofPrefix = "/debug/pprof"
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29NewPeerHandler最终对应文件rafthttp/transport.go中的代码如下:
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
// 这里我们主要关注streamHandler
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}
1
2
3
4
5
6
7
8
9
10
11
12在文件rafthttp/http.go中streamHandler用于处理http的请求的关键代码如下:
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ...
//获取请求来源peerid
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
// ...
// 通过peerid拿到对应的peer对象
p := h.peerGetter.Get(from)
// ...
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
c := newCloseNotifier()
conn := &outgoingConn{
t: t,
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,
}
// 把conn和peer的StreamWriter对象关联起来
p.attachOutgoingConn(conn)
<-c.closeNotify()
}
相关推荐
CurrentJ 2020-08-18
JustHaveTry 2020-07-17
Dannyvon 2020-07-13
Dannyvon 2020-07-04
###host字段指定授权使用该证书的etcd节点IP或子网列表,需要将etcd集群的3个节点都添加其中。cp etcd-v3.3.13-linux-amd64/etcd* /opt/k8s/bin/
xiunai 2020-07-04
breezegao 2020-07-02
微微一笑 2020-06-14
微微一笑 2020-06-12
CurrentJ 2020-06-06
lenchio 2020-06-04
微微一笑 2020-06-03
工作中的点点滴滴 2020-06-01
Rcvisual 2020-05-28
Dannyvon 2020-05-28
微微一笑 2020-05-26
wishli 2020-05-19
工作中的点点滴滴 2020-05-14