Codis源码解析 :codis-server添加到集群
上一篇,我们成功在集群中添加了proxy。这一篇来讲讲codis-server添加到集群的过程中发生了什么。
第一步,先别急着添加server,而应该是创建分组。创建分组的过程很简单,主要就是校验group的id在不在1~9999这个范围内,如果在的话(以group1为例),就调用zkClient创建路径/codis3/codis-wujiang/group/group-0001,初识创建好的时候,这个group下面是空的。
{
"id": 1,
"servers": [],
"promoting": {},
"out_of_sync": false
}
接下来,向group中添加codis-server。这个过程中对redis的操作使用了/github/garyburd/redigo这个第三方redis服务。
首先,根据redis的地址新建一个redis客户端
func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) {
//调用了/github/garyburd/redigo/redis/conn.go,返回的redisClient中包含了connection,redis地址以及上次的使用时间等信息
c, err := redigo.Dial("tcp", addr, []redigo.DialOption{
redigo.DialConnectTimeout(math2.MinDuration(time.Second, timeout)),
redigo.DialPassword(auth),
redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout),
}...)
if err != nil {
return nil, errors.Trace(err)
}
return &Client{
conn: c, Addr: addr, Auth: auth,
LastUse: time.Now(), Timeout: timeout,
}, nil
}
//RedisClient结构,对于每台redis服务器,都会有多个连接,过期的连接将会被清除
type Client struct {
conn redigo.Conn
Addr string
Auth string
Database int
LastUse time.Time
Timeout time.Duration
}
返回的redisClient结构如下所示
下一步,获取槽的信息,并且将server挂到当前group下
func (c *Client) SlotsInfo() (map[int]int, error) {
//获取当前client的所有slot的信息,由于我们这一步还没有给group分配槽,所以这里得到的reply是一个len和cap都为0的interface切片
if reply, err := c.Do("SLOTSINFO"); err != nil {
return nil, errors.Trace(err)
} else {
//这里的infos也是len和cap都为0的interface切片
infos, err := redigo.Values(reply, nil)
if err != nil {
return nil, errors.Trace(err)
}
slots := make(map[int]int)
for i, info := range infos {
p, err := redigo.Ints(info, nil)
if err != nil || len(p) != 2 {
return nil, errors.Errorf("invalid response[%d] = %v", i, info)
}
slots[p[0]] = p[1]
}
//slots也只相当于初始化了结构,但是没有填充数据
return slots, nil
}
}
//将server挂在group下,传入的gid就是1,addr是codis-server的地址,datacenter由于没有填写,所以dc是空字符串
func (s *Topom) GroupAddServer(gid int, dc, addr string) error {
s.mu.Lock()
defer s.mu.Unlock()
//这个newContext方法我们之前说过很多遍了,就是重新填充上下文中的cache,如果cache为空,就从store中读出填入cache
ctx, err := s.newContext()
if err != nil {
return err
}
if addr == "" {
return errors.Errorf("invalid server address")
}
//同一group下重复添加server的校验
for _, g := range ctx.group {
for _, x := range g.Servers {
if x.Addr == addr {
return errors.Errorf("server-[%s] already exists", addr)
}
}
}
//从上下文中根据gid取出group的详细信息
g, err := ctx.getGroup(gid)
if err != nil {
return err
}
if g.Promoting.State != models.ActionNothing {
return errors.Errorf("group-[%d] is promoting", g.Id)
}
//此时集群中并没有sentinel
if p := ctx.sentinel; len(p.Servers) != 0 {
defer s.dirtySentinelCache()
p.OutOfSync = true
if err := s.storeUpdateSentinel(p); err != nil {
return err
}
}
defer s.dirtyGroupCache(g.Id)
//将新增的codis-server地址追加到当前group的Servers属性后面
g.Servers = append(g.Servers, &models.GroupServer{Addr: addr, DataCenter: dc})
//更新zk路径,/codis3/codis-wujiang/group/group-0001下面的文件内容
return s.storeUpdateGroup(g)
}
//Group结构
type Group struct {
Id int `json:"id"`
Servers []*GroupServer `json:"servers"`
Promoting struct {
Index int `json:"index,omitempty"`
State string `json:"state,omitempty"`
} `json:"promoting"`
OutOfSync bool `json:"out_of_sync"`
}
下图是ctx, err := s.newContext()这一步取出的ctx,可以看到,此时上下文中group的Servers属性还是空
下图是根据id从上下文取出的group添加了codis-server之后的情况,可以看到,此时在当前Group下已经多了刚添加的codis-server地址
下一步,我们要给刚刚添加的主codis-server添加一个从codis-server,流程和刚才是一样的,第一次添加的时候,两个都是主,要点击下面那个绿色的类似于设置的按钮,将第二个服务器变成第一个的从服务器
//这里传入的addr就是第二台codis-server服务器的地址
func (s *Topom) SyncCreateAction(addr string) error {
s.mu.Lock()
defer s.mu.Unlock()
//此时的上下文的group属性中的Servers已经初始化了两个/pkg/models.GroupServer结构
ctx, err := s.newContext()
if err != nil {
return err
}
//遍历上下文的group属性的Server,如果地址与传入的addr匹配,就可以找到对应的Group g
g, index, err := ctx.getGroupByServer(addr)
if err != nil {
return err
}
if g.Promoting.State != models.ActionNothing {
return errors.Errorf("group-[%d] is promoting", g.Id)
}
if g.Servers[index].Action.State == models.ActionPending {
return errors.Errorf("server-[%s] action already exist", addr)
}
defer s.dirtyGroupCache(g.Id)
//遍历g中的每个Server的Action.Index,取最大值加一赋值给新增的这个Server
//因为此时所有Server的Action.Index都是0,所以这里将新增Server的Action.Index设置为1
g.Servers[index].Action.Index = ctx.maxSyncActionIndex() + 1
//因为我们这里是把新增的这个6380的机器作为从服务器,所以这里要把它的Action.State设置为pending
g.Servers[index].Action.State = models.ActionPending
//更新zk路径
return s.storeUpdateGroup(g)
}
完成上述动作之后,Group的状态变为
并且zk下已经在Group1中注册了两台服务器的信息
再看界面,同属于Group1的两台codis-server的主从关系已经很明显
启动dashboard时有刷新redis状态的goroutine,在成功添加两台codis-server后,这个goroutine的作用也比较明显了,详见笔者之前的一篇博客Codis源码解析——dashboard的启动(2)