Redis 源码分析之 cluster meet

Redis cluster 是 redis 官方提出的分布式集群解决方案,在此之前,有一些第三方的可选方案,如 codis、Twemproxy等。cluster 内部使用了 gossip 协议进行通信,以达到数据的最终一致性。详细介绍可参考官网 Redis cluster tutorial
本文试图借着cluster meet 命令的实现来对其中的一些通信细节一探究竟。
我们都知道,当 redis server 以 cluster mode 启动时,节点 A 想加入节点 B 所在的集群,只需要执行 CLUSTER MEET ip port 这个命令即可,通过 gossip 通信,最终 B 所在集群的其他节点也都会认识到 A。大概流程图如下:

cluster 初始化

当 redis server 以 cluster mode 启动时,即配置文件中的 cluster-enabled 选项设置为 true,此时在服务启动时,会有一个 cluster 初始化的流程,这个在之前的文章 《Redis 启动流程》中有提到过,即执行函数 clusterInit。在 cluster 中有三个数据结构很重要, clusterStateclusterNodeclusterLink
每个节点都保存着一个 clusterState 结构,这个结构记录了在当前节点的视角下,集群目前所处的状态,即“我看到的世界是什么样子”。
每个节点都会使用一个 clusterNode 结构来记录自己的状态, 并为集群中的所有其他节点(包括主节点和从节点)都创建一个相应的 clusterNode 结构, 以此来记录其他节点的状态。
clusterNode 结构的 link 属性是一个 clusterLink 结构, 该结构保存了连接节点所需的有关信息, 比如套接字描述符, 输入缓冲区和输出缓冲区。
更多的细节可以通过网页 《redis 设计与实现 - 节点》进行了解。
该初始化很简单,首先是创建一个 clusterState 结构,并初始化一些成员,如下:

server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;     // 新节点的 currentEpoch = 0
server.cluster->state = CLUSTER_FAIL; // 初始状态置为 FAIL
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots

然后给 node.conf 文件加锁,确保每个节点使用自己的 cluster 配置文件。

if (clusterLockConfig(server.cluster_configfile) == C_ERR)
    exit(1);

借着这个机会学习下 redis 如何使用的文件锁。

int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
    serverLog(LL_WARNING,
              "Can't open %s in order to acquire a lock: %s",
              filename, strerror(errno));
    return C_ERR;
}

if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
    if (errno == EWOULDBLOCK) {
        serverLog(LL_WARNING,
                  "Sorry, the cluster configuration file %s is already used "
                  "by a different Redis Cluster node. Please make sure that "
                  "different nodes use different cluster configuration "
                  "files.", filename);
    } else {
        serverLog(LL_WARNING,
                  "Impossible to lock %s: %s", filename, strerror(errno));
    }
    close(fd);
    return C_ERR;
}

然后加载 node.conf 文件,这个过程还会检查这个文件是否合理。

如果加载失败(或者配置文件不存在),则以 REDIS_NODE_MYSELF|REDIS_NODE_MASTER 为标记,创建一个clusterNode 结构表示自己本身,置为主节点,并设置自己的名字为一个40字节的随机串;然后将该节点添加到server.cluster->nodes中,这说明这是个新启动的节点,生成的配置文件进行刷盘。

if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
    myself = server.cluster->myself =
        createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
    serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
              myself->name);
    clusterAddNode(myself);
    saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1); // 新节点,将配置刷到配置文件中,fsync

接下来,调用 listenToPort 函数,在集群 gossip 通信端口上创建 socket fd 进行监听。集群内 gossip 通信端口是在 Redis 监听端口基础上加 10000,比如如果Redis监听客户端的端口为 6379,则集群监听端口就是16379,该监听端口用于接收其他集群节点发送过来的 gossip 消息。

然后注册监听端口上的可读事件,事件回调函数为 clusterAcceptHandler

#define CLUSTER_PORT_INCR 10000

if (listenToPort(server.port+CLUSTER_PORT_INCR,
                 server.cfd,&server.cfd_count) == C_ERR)
{
    exit(1);
} else {
    int j;
    for (j = 0; j < server.cfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, 
                              clusterAcceptHandler, NULL) == AE_ERR)
            serverPanic("Unrecoverable error creating Redis Cluster "
                        "file event.");
    }
}

当前节点收到其他集群节点发来的TCP建链请求之后,就会调用 clusterAcceptHandler 函数 accept 连接。在 clusterAcceptHandler函数中,对于每个已经 accept 的链接,都会创建一个clusterLink 结构表示该链接,并注册 socket fd上的可读事件,事件回调函数为 clusterReadHandler

#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    ... ...
    // 如果服务器正在启动,不要接受其他节点的连接, 因为 UPDATE 消息可能会干扰数据库内容
    if (server.masterhost == NULL && server.loading) return;
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        ... ...
        // 创建一个 link 结构来处理连接
        // 刚开始的时候, link->node 被设置成 null,因为现在我们不知道是哪个节点
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

最后是 reset mf 相关的参数。

CLUSTER MEET

A 节点接收 CLUSTER MEET 命令

A 节点在cluster.c -> clusterCommand 函数中,接收到 cluster meet 命令,即

if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
    long long port;

    // CLUSTER MEET <ip> <port>
    if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
        addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr);
        return;
    }
    if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL)
    {
        addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
    } else {
        addReply(c,shared.ok);
    }
}

可以看到重点在 clusterStartHandshake 这个函数。

int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[NET_IP_STR_LEN];
    struct sockaddr_storage sa;
    /* IP and Port sanity check */
    ... ...
        
    // 检查节点(flag) norm_ip:port 是否正在握手
    if (clusterHandshakeInProgress(norm_ip,port)) { 
        errno = EAGAIN;
        return 0;
    }
    // 创建一个含随机名字的 node,type 为 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
    // 相关信息会在 handshake 过程中被修复
    n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;
    clusterAddNode(n);
    return 1;
}
clusterNode *createClusterNode(char *nodename, int flags) {
    clusterNode *node = zmalloc(sizeof(*node));
    if (nodename)
        memcpy(node->name, nodename, CLUSTER_NAMELEN);
    else
        // 在本地新建一个 nodename 节点,节点名字随机,跟它通信时它会告诉我真实名字
        getRandomHexChars(node->name, CLUSTER_NAMELEN);
    node->ctime = mstime(); // mstime
    node->configEpoch = 0;
    node->flags = flags;
    memset(node->slots,0,sizeof(node->slots));
    node->slaveof = NULL;
    ... ...
    node->link = NULL; // link 为空, 在 clusterCron 中能检查的到
    memset(node->ip,0,sizeof(node->ip));
    node->port = 0;
    node->fail_reports = listCreate();
    ... ...
    listSetFreeMethod(node->fail_reports,zfree);
    return node;
}

这个函数会首先进行一些 ip 和 port 的合理性检查,然后去遍历所看到的 nodes,这个 ip:port 对应的 node 是不是正处于 CLUSTER_NODE_HANDSHAKE 状态,是的话,就说明这是重复 meet,没必要往下走。之后,通过 createClusterNode 函数创建一个带有 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET 标记的节点,名字为一个随机的 40 字节字符串(因为此时对 A 来说,B 是一个陌生的节点,信息除了 ip 和 port,其他都不知道),通过 clusterAddNode 函数加到自己的 nodes 中。
这个过程成功后,就返回给客户端 OK 了,其他事情需要通过 gossip 通信去做。

A 节点发送 MEET gossip 消息给 B 节点

A 节点在定时任务 clusterCron 中,会做一些事情。

handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

// 检查是否有 disconnected nodes 并且重新建立连接
di = dictGetSafeIterator(server.cluster->nodes); // 遍历所有节点
while((de = dictNext(di)) != NULL) {
    clusterNode *node = dictGetVal(de);
    
     // 忽略掉 myself 和 noaddr 状态的节点
    if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; 
    
    // 节点处于 handshake 状态,且状态维持时间超过 handshake_timeout,那么从 nodes中删掉它
    if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
        clusterDelNode(node);
        continue;
    }

    // 刚刚收到 cluster meet 命令创建的新 node ,或是 server 刚启动,或是由于某种原因断开了
    if (node->link == NULL) { 
        int fd;
        mstime_t old_ping_sent;
        clusterLink *link;

        // 对端 gossip 通信端口为 node 端口 + 10000,创建 tcp 连接, 本节点相当于 client
        fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
        ... ...
        link = createClusterLink(node);
        link->fd = fd;
        node->link = link;

        // 注册 link->fd 上的可读事件,事件回调函数为 clusterReadHandler
        aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link);
        ... ...

        // 如果 node 带有 MEET flag,我们发送一个 MEET 包而不是 PING,
        // 这是为了强制让接收者把我们加到它的 nodes 中
        clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
        ... ...
        node->flags &= ~CLUSTER_NODE_MEET;
        ... ...
    }
}
dictReleaseIterator(di);

可以看到,遍历自己看到的 nodes,当遍历到 B 节点时,由于 node->link == NULL,因此会监听 B 的启动端口号+10000,即 gossip 通信端口,然后注册可读事件,处理函数为 clusterReadHandler。接着会发送 CLUSTER_NODE_MEET 消息给 B 节点,消除掉 B 节点的 meet 状态。

B 节点处理 A 发来的 MEET gossip 消息

当 B 节点接收到 A 节点发送 gossip 时,回调函数 clusterAcceptHandler 进行处理,然后会 accept 对端的 connect(B 作为 server,对端作为 client),注册可读事件,回调函数为 clusterReadHandler,基本逻辑如下,

void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    clusterLink *link;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    // 如果服务器正在启动,不要接受其他节点的链接,因为 UPDATE 消息可能会干扰数据库内容
    if (server.masterhost == NULL && server.loading) return;
    while(max--) { // 1000 个请求
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_VERBOSE,
                    "Error accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);
        serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
   
        // 创建一个 link 结构来处理连接
        // 刚开始的时候, link->node 被设置成 null,因为现在我们不知道是哪个节点
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}

可以看到每次 accept 对端connect时,都会创建一个 clusterLink 结构用来接收数据,

typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    int fd;                     /* TCP socket file descriptor */
    sds sndbuf;                 /* Packet send buffer */
    sds rcvbuf;                 /* Packet reception buffer */
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;

clusterLink 有一个指针是指向 node 自身的。
B 节点接收到 A 节点发送过来的信息,放到 clusterLinkrcvbuf 字段,然后使用 clusterProcessPacket 函数来处理(接收数据过程很简单,不做分析)。
所以 clusterProcessPacket 函数的作用是处理别人发过来的 gossip 包。

if (!sender && type == CLUSTERMSG_TYPE_MEET) {
    clusterNode *node;

    // 创建一个带有 CLUSTER_NODE_HANDSHAKE 标记的 cluster node,名字随机
    node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
    nodeIp2String(node->ip,link); // ip 和 port 信息均从 link 中获得
    node->port = ntohs(hdr->port);

    clusterAddNode(node);
    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
.....
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);

由于这时 B 节点还不认识 A 节点,因此 B 节点从自己的 nodes 中找 A 节点是找不到的,所以 sender 是空,因此会走进如上的这段逻辑。同样以随机的名字,CLUSTER_NODE_HANDSHAKE 为 flag 创建一个 node,加入自己的 nodes 中。
在这个逻辑末尾会给 A 节点回复一个 PONG 消息。

A 节点处理 B 节点回复的 PONG gossip 消息

同样是在 clusterProcessPacket 中处理 gossip 消息。

if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
    ... ...
    if (link->node) {
        if (nodeInHandshake(link->node)) { // node 处于握手状态
            ... ...
            clusterRenameNode(link->node, hdr->sender); // 修正节点名
            link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // 消除 handshake 状态
            link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }
}

这个时候 A 节点会根据 B 节点发来的消息,更正 A 节点 nodes 中关于 B 节点的名字,以及消除 handshake 状态。

B 节点发送 PING gossip 消息给 A 节点

当 B 节点在做 clusterCron 时,发现自己看到的 A 节点中的 link 为空,即 node->link == NULL,这与上面讲的 A 节点给 B 节点发 MEET 消息类似,不过在 B 节点看了 A 节点没有 meet flag,因此发送的是 PING 消息。

A 节点处理 B 节点发来的 PING 消息

做一些逻辑,不过跟这次要讨论的事情无关,后面会详写。

对于 PING 和 MEET 消息,无论如何都是会回复一个 PONG 消息的

B 节点处理 A 节点回复的 PONG 消息

逻辑同上,将 B 节点的 nodes 中 A 节点的名字进行更正,然后去掉 A 节点的 handshake flag。

小结

至此,一个 cluster meet 命令执行的完整过程就解释清楚了,画了一个流程图可以帮助更好的理解这个流程。

Redis 源码分析之 cluster meet

相关推荐