Codis源码解析 :codis-server添加到集群

Codis源码解析 :codis-server添加到集群

上一篇,我们成功在集群中添加了proxy。这一篇来讲讲codis-server添加到集群的过程中发生了什么。

第一步,先别急着添加server,而应该是创建分组。创建分组的过程很简单,主要就是校验group的id在不在1~9999这个范围内,如果在的话(以group1为例),就调用zkClient创建路径/codis3/codis-wujiang/group/group-0001,初识创建好的时候,这个group下面是空的。

{

"id": 1,

"servers": [],

"promoting": {},

"out_of_sync": false

}

接下来,向group中添加codis-server。这个过程中对redis的操作使用了/github/garyburd/redigo这个第三方redis服务。

首先,根据redis的地址新建一个redis客户端

func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) {

//调用了/github/garyburd/redigo/redis/conn.go,返回的redisClient中包含了connection,redis地址以及上次的使用时间等信息

c, err := redigo.Dial("tcp", addr, []redigo.DialOption{

redigo.DialConnectTimeout(math2.MinDuration(time.Second, timeout)),

redigo.DialPassword(auth),

redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout),

}...)

if err != nil {

return nil, errors.Trace(err)

}

return &Client{

conn: c, Addr: addr, Auth: auth,

LastUse: time.Now(), Timeout: timeout,

}, nil

}

//RedisClient结构,对于每台redis服务器,都会有多个连接,过期的连接将会被清除

type Client struct {

conn redigo.Conn

Addr string

Auth string

Database int

LastUse time.Time

Timeout time.Duration

}

返回的redisClient结构如下所示

Codis源码解析 :codis-server添加到集群

下一步,获取槽的信息,并且将server挂到当前group下

func (c *Client) SlotsInfo() (map[int]int, error) {

//获取当前client的所有slot的信息,由于我们这一步还没有给group分配槽,所以这里得到的reply是一个len和cap都为0的interface切片

if reply, err := c.Do("SLOTSINFO"); err != nil {

return nil, errors.Trace(err)

} else {

//这里的infos也是len和cap都为0的interface切片

infos, err := redigo.Values(reply, nil)

if err != nil {

return nil, errors.Trace(err)

}

slots := make(map[int]int)

for i, info := range infos {

p, err := redigo.Ints(info, nil)

if err != nil || len(p) != 2 {

return nil, errors.Errorf("invalid response[%d] = %v", i, info)

}

slots[p[0]] = p[1]

}

//slots也只相当于初始化了结构,但是没有填充数据

return slots, nil

}

}

//将server挂在group下,传入的gid就是1,addr是codis-server的地址,datacenter由于没有填写,所以dc是空字符串

func (s *Topom) GroupAddServer(gid int, dc, addr string) error {

s.mu.Lock()

defer s.mu.Unlock()

//这个newContext方法我们之前说过很多遍了,就是重新填充上下文中的cache,如果cache为空,就从store中读出填入cache

ctx, err := s.newContext()

if err != nil {

return err

}

if addr == "" {

return errors.Errorf("invalid server address")

}

//同一group下重复添加server的校验

for _, g := range ctx.group {

for _, x := range g.Servers {

if x.Addr == addr {

return errors.Errorf("server-[%s] already exists", addr)

}

}

}

//从上下文中根据gid取出group的详细信息

g, err := ctx.getGroup(gid)

if err != nil {

return err

}

if g.Promoting.State != models.ActionNothing {

return errors.Errorf("group-[%d] is promoting", g.Id)

}

//此时集群中并没有sentinel

if p := ctx.sentinel; len(p.Servers) != 0 {

defer s.dirtySentinelCache()

p.OutOfSync = true

if err := s.storeUpdateSentinel(p); err != nil {

return err

}

}

defer s.dirtyGroupCache(g.Id)

//将新增的codis-server地址追加到当前group的Servers属性后面

g.Servers = append(g.Servers, &models.GroupServer{Addr: addr, DataCenter: dc})

//更新zk路径,/codis3/codis-wujiang/group/group-0001下面的文件内容

return s.storeUpdateGroup(g)

}

//Group结构

type Group struct {

Id int `json:"id"`

Servers []*GroupServer `json:"servers"`

Promoting struct {

Index int `json:"index,omitempty"`

State string `json:"state,omitempty"`

} `json:"promoting"`

OutOfSync bool `json:"out_of_sync"`

}

下图是ctx, err := s.newContext()这一步取出的ctx,可以看到,此时上下文中group的Servers属性还是空

Codis源码解析 :codis-server添加到集群

下图是根据id从上下文取出的group添加了codis-server之后的情况,可以看到,此时在当前Group下已经多了刚添加的codis-server地址

Codis源码解析 :codis-server添加到集群

下一步,我们要给刚刚添加的主codis-server添加一个从codis-server,流程和刚才是一样的,第一次添加的时候,两个都是主,要点击下面那个绿色的类似于设置的按钮,将第二个服务器变成第一个的从服务器

Codis源码解析 :codis-server添加到集群

//这里传入的addr就是第二台codis-server服务器的地址

func (s *Topom) SyncCreateAction(addr string) error {

s.mu.Lock()

defer s.mu.Unlock()

//此时的上下文的group属性中的Servers已经初始化了两个/pkg/models.GroupServer结构

ctx, err := s.newContext()

if err != nil {

return err

}

//遍历上下文的group属性的Server,如果地址与传入的addr匹配,就可以找到对应的Group g

g, index, err := ctx.getGroupByServer(addr)

if err != nil {

return err

}

if g.Promoting.State != models.ActionNothing {

return errors.Errorf("group-[%d] is promoting", g.Id)

}

if g.Servers[index].Action.State == models.ActionPending {

return errors.Errorf("server-[%s] action already exist", addr)

}

defer s.dirtyGroupCache(g.Id)

//遍历g中的每个Server的Action.Index,取最大值加一赋值给新增的这个Server

//因为此时所有Server的Action.Index都是0,所以这里将新增Server的Action.Index设置为1

g.Servers[index].Action.Index = ctx.maxSyncActionIndex() + 1

//因为我们这里是把新增的这个6380的机器作为从服务器,所以这里要把它的Action.State设置为pending

g.Servers[index].Action.State = models.ActionPending

//更新zk路径

return s.storeUpdateGroup(g)

}

完成上述动作之后,Group的状态变为

Codis源码解析 :codis-server添加到集群

并且zk下已经在Group1中注册了两台服务器的信息

Codis源码解析 :codis-server添加到集群

再看界面,同属于Group1的两台codis-server的主从关系已经很明显

Codis源码解析 :codis-server添加到集群

启动dashboard时有刷新redis状态的goroutine,在成功添加两台codis-server后,这个goroutine的作用也比较明显了,详见笔者之前的一篇博客Codis源码解析——dashboard的启动(2)

相关推荐