Redis(5.0.3)事件驱动与连接管理


事件驱动原理和设计

A simple event-driven programming library. Originally I wrote this code for the Jim’s event-loop (Jim is a Tcl interpreter) but later translated it in form of a library for easy reuse.

— redis/src/ae.c Redis的事件驱动实现采用了实现与接口分离。针对不同系统平台,使用了不同的IO多路复用函数。共四种,文件分别是:

  1. ae_evport.c
  2. ae_epoll.c
  3. ae_kqueue.c
  4. ae_select.c

平台同时支持多种IO函数呢?按照性能Redis定了一个引入顺序,按照性能从高到低include。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

ae在Redis里是事件驱动器的名字,取自A simple event-driven?

事件驱动有个核心数据类型aeEventLoop。它就像一个管理器,所有ae对上层代码提供的接口,包括对不同IO复用函数封装的方法,都需要操作aeEventLoop。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ae.h
// 这些就是ae上层代码提供的所有接口
// 上层代码只管调用,不用关心底层用了哪种IO多路复用方案
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
// 文件事件相关
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 时间事件相关
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize)

事件驱动器在Redis全局只有一个,它被创建后,放在server.el里,server也是全局唯一对象。总得有个地方放置服务器相关的所有信息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// src/server.h
struct redisServer {
  // ...
  aeEventLoop *el
  // ...
}

// src/server.c
void initServer(void) {
  // ...
  // 传参是最大允许多少个client建连,默认是不限制
  server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
  // ...
}

不同的IO复用方案进行封装,对ae提供一套一致的接口。以ae_epoll.c举例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 类似构造函数
static int aeApiCreate(aeEventLoop *eventLoop) {}
// 调整大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {}
// 类似析构函数,释放资源
static void aeApiFree(aeEventLoop *eventLoop) {}
// 针对某个文件描述符添加关注的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {}
// 删除关心的文件描述符 或 删除文件描述符上某个事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {}
// 带timeout形式,阻塞等待获取可以读/写的文件描述符
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {}
// 获取IO复用函数名称
static char *aeApiName(void) {}

就是这样:

  1. (封装好的)IO复用函数 对 ae提供服务;
  2. 而(封装好的)ae 对 server提供服务;

常见的分层设计。

aeEventLoop

开始也说到这是个核心类型。可以看到无论封装的IO复用函数还是ae,第一参数都是aeEventLoop类型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
typedef struct aeEventLoop {
    int maxfd;   // 当前注册的最大文件描述符
    int setsize; // 关注的文件描述符上限
    long long timeEventNextId; // 时间事件唯一ID递增器
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; // 监听读写事件IO事件的文件描述符列表
    aeFiredEvent *fired; // 事件就绪有可读写IO事件的文件描述符列表
    aeTimeEvent *timeEventHead; // 时间事件(定时任务)列表(链表)
    int stop; // 是否停止事件循环器
    void *apidata; // 不同的IO复用函数,poll方法需要参数类型不一样。apidata专门放置这些传参类型
    aeBeforeSleepProc *beforesleep; // 事件循环器 新一轮循环前的钩子函数
    aeBeforeSleepProc *aftersleep; // 事件循环器 一轮循环后的钩子函数
} aeEventLoop;

它实际也是一个事件循环器。Redis中支持两种事件类型,分别是IO事件和时间事件。时间事件其实就是定时任务。

这个事件循环器一旦通过aeMain()启动后,就只能调用aeStop()才能停下来

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 启动事件循环器
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 停止后就退出循环
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 处理IO事件与时间事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

循环器就在一个forever的while里,执行aeProcessEvents()

aeProcessEvents()干了几件事:(代码比较长就不贴了)

  1. 先检查有没有等待执行的时间事件(定时任务),离现在最近的一个时间事件还要多久才执行。
  2. 如果有这样的时间事件,记录下还要多久执行它。取为timeout。
  3. 带着这个timeout,通过aeApiPoll()阻塞等待可读写的IO事件。如果第一步找不到时间事件,这里就没有timeout了,一直阻塞直至有可读性的IO事件。
  4. aeApiPoll()返回后,如果有IO事件的话。就挨个处理。对该IO事件是读还是写,都有flag标志。并且读写的回调处理函数,也通过aeCreateFileEvent()注册进来了。
  5. 处理完所有IO事件后,就可以执行时间事件了。因为时间事件是定时任务,所有执行完毕后,还需要设置好下一次执行的时间点。
  6. 结束了。可以开始新一轮循环。

那么beforesleep和aftersleep这两个钩子在哪里被调用?beforesleep在aeMain()函数里。aftersleep在第三步,aeApiPoll()一结束就执行。

从执行逻辑可看出,定时任务不是准时执行的。可能会有一些延时。

连接的生命周期管理

一般连接生命周期,就是accept -> read/writer -> close。比较简单,redis也差不多。按这个顺序来看看redis的处理方式。

起初,redis服务器肯定监听在某个端口上。有新连接到来时,就会被accept掉。有socket编程经验不会陌生。

简单的socket编程例子一般是listen将socket绑定成功后,就有一个文件描述符。然后在这个文件描述符上阻塞地accept就行。

redis肯定不会阻塞等。而是把这个文件描述符也通过aeCreateFileEvent()放进事件循环器中。这样只要有新连接,就等于有可读事件。这时再去accept,就不用傻瓜地阻塞等待了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void initServer(void) {
  // ...
	for (j = 0; j < server.ipfd_count; j++) {
    // 将监听client请求的IO文件描述符加到事件循环器
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
      {
        serverPanic(
          "Unrecoverable error creating server.ipfd file event.");
      }
    }
}

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    // ...

    while(max--) {
        // 通过accept系统调用获取新连接的文件描述符
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);
    }
}

新连接的文件描述符拿到后,也将它放进事件循环器中。毕竟不能阻塞地傻等数据从client发过来。

为这个新连接创建一个对应的client时,顺便把它加到事件循环器中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
client *createClient(int fd) {
    // 为这个连接创建一个client
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        // 将文件描述符设置为noblocking
        anetNonBlock(NULL,fd);
        // 设置TCP_NODELAY,关闭Nagle算法
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            // 开启tcp keepalive
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // ok,可以将文件描述符加到事件循环器里了
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
  // ...
}

接下来,就正常等待client发命令到来,触发文件描述符的可读事件,在事件循环器里响应即可。

redis进行响应命令,执行对应的命令处理函数,把处理结果放在client的输出缓冲区里,同时通过aeCreateFileEvent()监听当前client对应文件描述符的写事件。只要可写了,就把输出缓冲区的内容刷给client。

在哪儿询问是否描述符可写呢?就在上面提到的钩子函数beforesleep里。

1
2
3
4
5
6
7
void beforeSleep(struct aeEventLoop *eventLoop) {
    // ...
    // 如果某client的输出缓冲区不为空,则监听client对应的文件描述符的可写状态
    // 等于将文件描述符注册到事件循环器中
    handleClientsWithPendingWrites();
    // ...
}

写完了也不能主动close掉,毕竟每次执行一个命令就开一个连接成本太大了。

常见策略就是定时把空闲的client连接断掉。这个工作主要放在定时任务中。

而client最近活跃的时间就是有命令到达,读取命令时。通过c->lastinteraction = server.unixtime记录。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 注册定时任务
void initServer(void) {
  // ...
  // serverCron就是定时任务
  if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    serverPanic("Can't create event loop timers.");
    exit(1);
  }
  // ...
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
  // ...
  // 断开空闲过久的client
  clientsCron()
  // ...
}

void clientsCron(void) {
    // ...
    while(listLength(server.clients) && iterations--) {
        // ...
        // 超出server.idletime,free掉
        if (clientsCronHandleTimeout(c,now)) continue;
    }
}

int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
  time_t now = now_ms/1000;

  if (server.maxidletime &&
      !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
      !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
      !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
      !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
      (now - c->lastinteraction > server.maxidletime))
  {
    serverLog(LL_VERBOSE,"Closing idle client");
    // freeclient里调用close
    freeClient(c);
    return 1;
  }
  // ...
  return 0;
}