曹工说Redis源码(6)-- redis server 主循环大体流程解析

文章导航

Redis源码系列的初衷,是帮助我们更好地理解Redis,更懂Redis,而怎么才能懂,光看是不够的,建议跟着下面的这一篇,把环境搭建起来,后续可以自己阅读源码,或者跟着我这边一起阅读。由于我用c也是好几年以前了,些许错误在所难免,希望读者能不吝指出。

曹工说Redis源码(1)-- redis debug环境搭建,使用clion,达到和调试java一样的效果

曹工说Redis源码(2)-- redis server 启动过程解析及简单c语言基础知识补充

曹工说Redis源码(3)-- redis server 启动过程完整解析(中)

曹工说Redis源码(4)-- 通过redis server源码来理解 listen 函数中的 backlog 参数

曹工说Redis源码(5)-- redis server 启动过程解析,以及EventLoop每次处理事件前的前置工作解析(下)

本讲主题

先给大家复习下前面一讲的功课,大家知道,redis 基本是单线程,也就是说,假设我们启动main方法的,是线程A,那么,最终,去处理客户端socket连接、读取客户端请求、以及向客户端socket写数据,也还是线程A。

同时,大家想必也知道,redis 里还是有一些后台任务要做的,比如:

  1. 字典的rehash(rehash的意思是,redis 里,字典结构,其实是包含了两个hashtable,一般使用第一个;当需要扩充其size的时候,hashtable[1] 就会扩充内存到扩充后的size,然后,就需要把hashtable[0]里面的数据,全部迁移到 hashtable[1] 来,这个过程,即所谓的rehash),rehash的过程,还是比较耗时的;

  2. redis 里的键,如果设了过期时间,到了过期时间后,这个key,是不是就在redis里不存在了呢?不一定,但是你去访问的时候,肯定是看不到了。但这个怎么做到的呢?难道每次来一个这种key,就设置一个timer,在指定过期时间后执行清除任务吗?这个想来,开销太大了;

    所以,其实分了两种策略:

    • 一是redis 给自己开了个周期性的定时任务,就是那种,每隔30s执行一次之类的,在这个任务中,就会去主动检查:设置了过期时间的key的集合,如果发现某个key过期了,直接删除;但是,redis由于其单线程特性,如果遇到过期key特别多的话,就要一直忙着清理过期key了,正事就没法干了(比如处理客户端请求),所以,每次redis执行这种任务的时候,基本就是敷衍了事,得过且过,随机选几个键,删了就算完事。
    • 二是,redis在你真正去get 这个key的时候,才去检查是否过期,如果发现过期了,再删除。这是什么策略?就是懒。所以叫惰性删除。
  3. 检查当前的客户端集合,看看哪些是一直空闲,且超过了一定时间的,这部分客户端,被筛选出来,直接干掉,关掉与该客户端之间的长连接。

  4. 还有其他一些任务,下边再说。

所以,从上面可知,redis 主要要干两类活,一种是客户端要它干的,比如,我执行个get/set命令,这个优先级比较高;另一类就是例行工作,每隔多久就得干一次。

前面一讲,我们已经讲到了下面这个代码:

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            // 1
            eventLoop->beforesleep(eventLoop);

        // 2 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

1处,我们已经讲完了;本讲,主要讲2处,这个主循环。ok,扯了一堆,let‘s go!

主循环大体流程

获取:当前还有多长时间,到达周期任务的时间点

获取有没有周期任务要执行,如果有,则计算一下,要过多久,才到周期任务的执行时间;把过多久这个时间,算出来后,定义为 timeLeftToScheduledJobTime;如果没有周期任务,这个时间可以定义为null;

如果发现时间已经到了,则表示现在就可以执行这个周期任务了,把timeLeftToScheduledJobTime 设为0

这部分代码,如下所示:

if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        // 1
        struct timeval tv, *tvp;

        //  获取最近的时间事件
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 2
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            // 如果时间事件存在的话
            // 那么根据最近可执行时间事件和现在时间的时间差来决定文件事件的阻塞时间
            long now_sec, now_ms;

            // 计算距今最近的时间事件还要多久才能达到
            // 并将该时间距保存在 tv 结构中
            /**
             * 3 获取当前时间,这里把两个long 局部变量的地址传进去了,在里面会去修改它
             */
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            // 4
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms + 1000) - now_ms) * 1000;
                tvp->tv_sec--;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms) * 1000;
            }

            // 5 时间差小于 0 ,说明事件已经可以执行了,将秒和毫秒设为 0 (不阻塞)
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {

            // 执行到这一步,说明没有时间事件
            if (flags & AE_DONT_WAIT) {
                // 6
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 7
                tvp = NULL; /* wait forever */
            }
        }
  • 1处,定义了一个变量tvp,基本用来存储前面我们说的那个timeLeftToScheduledJobTime
  • 2处,会获取最近的周期任务的时间
  • 3处,获取当前时间,保存到 long now_sec, now_ms
  • 4处,最近的周期任务的时间,减去当前时间,差值保存到tvp->tv_sec
  • 5处,如果最终算出来,时间差为负数,则设为0,表示,这个周期任务现在就可以运行
  • 6处和7处,这是另外一个else分支,从2处分出来的,如果没找到最近的周期任务,则进入这里;根据参数flags中是否设置了AE_DONT_WAIT选项,分出2个分支,一个设为0,一个设为null。

select函数简介

说到网络编程中的多路复用,select几乎是绕不开的话题,在没有epoll之前,基本就是使用select。当然,select有它的缺点,那就是:

  1. select总是去线性扫描所有的文件描述符,看看哪个文件描述符是ready的;怎么算作ready,读或者写,不用阻塞,就算是ready;
  2. select最大支持的文件描述符数量有限制,默认为1024.

下面,大家看看select的api,大家也可以自行在linux机器上执行:man select 查看。

select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing
    
int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct timeval *timeout);

上面的第一行,是select的简单说明,其中一个词是,synchronous,同步的意思,说明select是同步的,不是异步的,只是进行了io多路复用。

下面那个是其api,简单解释三个参数:

  • 参数fd_set *readfds

    Those listed in readfds will be watched to see if characters become available for reading (more
    precisely, to see if a read will not block

    也就是说,这个集合中的fd,会被监测,看看哪些fd可以无阻塞地读取;怎么才能无阻塞地读取,那肯定是这个fd的输入缓冲区有内容啊,比如,客户端发了数据过来

  • 参数fd_set *writefds

    those in writefds will be watched to see if a write will not block

    这个集合,会被监测,看看是否可以对这个fd,进行无阻塞地写;什么时候,不能无阻塞地写呢?肯定是缓冲区满了的时候。这种应该常见于:给对端发数据时,对方一直不ack这些数据,所以我方的缓冲区里,一直不能删这些数据,导致缓冲区满。

  • struct timeval *timeout

    The timeout argument specifies the minimum interval that select() should block waiting for a file descriptor to become ready. If both fields of the timeval structure are zero, then select() returns immediately. (This is useful for polling.) If timeout is NULL (no timeout), select() can block
    indefinitely.

    这个timeout参数,指定了select()操作,等待文件描述符变成ready过程中,需要等待多长时间。如果这个timeout的两个字段,都被设为了0,则select()会马上返回。如果timeout是null,这个操作会无限阻塞。

所以,select我就算大家了解了,其中的timeout参数,简单来说,就是调用select时,最大阻塞多久就要返回。

如果设为0,则马上返回;如果为null,则无限阻塞;如果为正常的大于0的值,则阻塞对应的时长。

和前面的部分,联系起来,就是说:

  • 假设没有周期任务,则,无限阻塞;
  • 如果有周期任务,且时间已经到达,则马上返回;
  • 如果有周期任务,且时间未到,则阻塞对应时长后返回。

linux下不是用epoll吗,为啥还讲select

有的函数,天生适合拿来讲课。epoll,kqueue等,会单独拿来讲。

获取到ready的文件描述符后,处理该文件描述符

// 1 处理文件事件,阻塞时间由 tvp 决定,tvp:timevalue pointer
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            // 2 从已就绪数组中获取事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

            // 读事件
            if (fe->mask & mask & AE_READABLE) {
                // rfired 确保读/写事件只能执行其中一个
                rfired = 1;
                fe->rfileProc(eventLoop, fd, fe->clientData, mask);
            }
            // 写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop, fd, fe->clientData, mask);
            }

            processed++;
        }
  • 1处,这里就会根据当前的操作系统,决定调用select或是epoll,或是其他的实现。(通过条件编译实现)。

    假设这里的底层实现,就是前面讲的select函数,那么,select函数执行完后,eventLoop->fired 属性,就会存放这次select筛选出来的那些,ready的文件描述符集合。

    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        aeApiState *state = eventLoop->apidata;
        int retval, j, numevents = 0;
    
        /**
         * 拷贝到带_的变量中
         */
        memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
        memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
    	// 1
        retval = select(eventLoop->maxfd+1,
                    &state->_rfds,&state->_wfds,NULL,tvp);
        if (retval > 0) {
            for (j = 0; j <= eventLoop->maxfd; j++) {
                int mask = 0;
                aeFileEvent *fe = &eventLoop->events[j];
    
                if (fe->mask == AE_NONE) continue;
                if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
                    mask |= AE_READABLE;
                if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
                    mask |= AE_WRITABLE;
                // 2
                eventLoop->fired[numevents].fd = j;
                eventLoop->fired[numevents].mask = mask;
                numevents++;
            }
        }
        return numevents;
    }

    如上所示,1处,调用select;2处,赋值给fired。

  • 2处,从fired中取出对应的文件描述符

  • 3处,如果fired中的文件描述符,可读,则执行对应的函数指针rfileProc指向的函数

  • 4处,如果fired中的文件描述符,可写,则执行对应的函数指针wfileProc指向的函数

如果有周期任务,则执行周期任务

/* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

这里会调用processTimeEvents,其实现如下,其中涉及到复杂的时间计算,我们可以只看核心流程:

/* Process time events
 *
 * 处理所有已到达的时间事件
 */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

    // 更新最后一次处理时间事件的时间
    eventLoop->lastTime = now;

    // 遍历链表
    // 执行那些已经到达的事件
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId - 1;
    while (te) {
        long now_sec, now_ms;
        long long id;

        // 获取当前时间
        aeGetTime(&now_sec, &now_ms);

        // 如果当前时间等于或等于事件的执行时间,那么说明事件已到达,执行这个事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms)) {
            int retval;

            id = te->id;
            //1 执行事件处理器,并获取返回值
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;

            // 记录是否有需要循环执行这个事件时间
            if (retval != AE_NOMORE) {
                // 2 是的, retval 毫秒之后继续执行这个时间事件
                aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms);
            } else {
                // 不,将这个事件删除
                aeDeleteTimeEvent(eventLoop, id);
            }

            // 因为执行事件之后,事件列表可能已经被改变了
            // 因此需要将 te 放回表头,继续开始执行事件
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}
  • 1处,执行timeProc这个函数指针,执行的函数,在初始化的时候,这个指针,被赋值为serverCron;

    初始化时,会调用一下代码:

    // 为 serverCron() 创建时间事件
        if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
            redisPanic("Can‘t create the serverCron time event.");
            exit(1);
        }

    这里的serverCron,是一个函数指针。

    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
                                aeTimeProc *proc, void *clientData,
                                aeEventFinalizerProc *finalizerProc) {
        // 更新时间计数器
        long long id = eventLoop->timeEventNextId++;
    
        // 创建时间事件结构
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
    
        // 设置 ID
        te->id = id;
    
        // 设定处理事件的时间
        aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);
        // 1 设置事件处理器
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        // 设置私有数据
        te->clientData = clientData;
    
        // 将新事件放入表头
        te->next = eventLoop->timeEventHead;
        eventLoop->timeEventHead = te;
    
        return id;
    }

    上面的1处,将传入的serverCron,赋值给了te->timeProc。

  • 2处,注册下一次的周期任务

总结

本讲主要讲解了主循环的最外层结构,如果有什么不清楚的,可以留言。