Redis(5.0.3)事件驱动与连接管理
事件驱动原理和设计
A simple event-driven programming library. Originally I wrote this code for the Jim’s event-loop (Jim is a Tcl interpreter) but later translated it in form of a library for easy reuse.
— redis/src/ae.c
Redis的事件驱动实现采用了实现与接口分离。针对不同系统平台,使用了不同的IO多路复用函数。共四种,文件分别是:
- ae_evport.c
- ae_epoll.c
- ae_kqueue.c
- ae_select.c
平台同时支持多种IO函数呢?按照性能Redis定了一个引入顺序,按照性能从高到低include。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
|
ae在Redis里是事件驱动器的名字,取自A simple event-driven?
事件驱动有个核心数据类型aeEventLoop。它就像一个管理器,所有ae对上层代码提供的接口,包括对不同IO复用函数封装的方法,都需要操作aeEventLoop。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// ae.h
// 这些就是ae上层代码提供的所有接口
// 上层代码只管调用,不用关心底层用了哪种IO多路复用方案
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
// 文件事件相关
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 时间事件相关
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize)
|
事件驱动器在Redis全局只有一个,它被创建后,放在server.el里,server也是全局唯一对象。总得有个地方放置服务器相关的所有信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// src/server.h
struct redisServer {
// ...
aeEventLoop *el
// ...
}
// src/server.c
void initServer(void) {
// ...
// 传参是最大允许多少个client建连,默认是不限制
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
// ...
}
|
不同的IO复用方案进行封装,对ae提供一套一致的接口。以ae_epoll.c举例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// 类似构造函数
static int aeApiCreate(aeEventLoop *eventLoop) {}
// 调整大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {}
// 类似析构函数,释放资源
static void aeApiFree(aeEventLoop *eventLoop) {}
// 针对某个文件描述符添加关注的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {}
// 删除关心的文件描述符 或 删除文件描述符上某个事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {}
// 带timeout形式,阻塞等待获取可以读/写的文件描述符
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {}
// 获取IO复用函数名称
static char *aeApiName(void) {}
|
就是这样:
- (封装好的)IO复用函数 对 ae提供服务;
- 而(封装好的)ae 对 server提供服务;
常见的分层设计。
aeEventLoop
开始也说到这是个核心类型。可以看到无论封装的IO复用函数还是ae,第一参数都是aeEventLoop类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
typedef struct aeEventLoop {
int maxfd; // 当前注册的最大文件描述符
int setsize; // 关注的文件描述符上限
long long timeEventNextId; // 时间事件唯一ID递增器
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; // 监听读写事件IO事件的文件描述符列表
aeFiredEvent *fired; // 事件就绪有可读写IO事件的文件描述符列表
aeTimeEvent *timeEventHead; // 时间事件(定时任务)列表(链表)
int stop; // 是否停止事件循环器
void *apidata; // 不同的IO复用函数,poll方法需要参数类型不一样。apidata专门放置这些传参类型
aeBeforeSleepProc *beforesleep; // 事件循环器 新一轮循环前的钩子函数
aeBeforeSleepProc *aftersleep; // 事件循环器 一轮循环后的钩子函数
} aeEventLoop;
|
它实际也是一个事件循环器。Redis中支持两种事件类型,分别是IO事件和时间事件。时间事件其实就是定时任务。
这个事件循环器一旦通过aeMain()
启动后,就只能调用aeStop()
才能停下来
1
2
3
4
5
6
7
8
9
10
11
|
// 启动事件循环器
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 停止后就退出循环
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 处理IO事件与时间事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
|
循环器就在一个forever的while里,执行aeProcessEvents()
。
aeProcessEvents()
干了几件事:(代码比较长就不贴了)
- 先检查有没有等待执行的时间事件(定时任务),离现在最近的一个时间事件还要多久才执行。
- 如果有这样的时间事件,记录下还要多久执行它。取为timeout。
- 带着这个timeout,通过
aeApiPoll()
阻塞等待可读写的IO事件。如果第一步找不到时间事件,这里就没有timeout了,一直阻塞直至有可读性的IO事件。
- 从
aeApiPoll()
返回后,如果有IO事件的话。就挨个处理。对该IO事件是读还是写,都有flag标志。并且读写的回调处理函数,也通过aeCreateFileEvent()
注册进来了。
- 处理完所有IO事件后,就可以执行时间事件了。因为时间事件是定时任务,所有执行完毕后,还需要设置好下一次执行的时间点。
- 结束了。可以开始新一轮循环。
那么beforesleep和aftersleep这两个钩子在哪里被调用?beforesleep在aeMain()
函数里。aftersleep在第三步,aeApiPoll()
一结束就执行。
从执行逻辑可看出,定时任务不是准时执行的。可能会有一些延时。
连接的生命周期管理
一般连接生命周期,就是accept -> read/writer -> close。比较简单,redis也差不多。按这个顺序来看看redis的处理方式。
起初,redis服务器肯定监听在某个端口上。有新连接到来时,就会被accept掉。有socket编程经验不会陌生。
简单的socket编程例子一般是listen将socket绑定成功后,就有一个文件描述符。然后在这个文件描述符上阻塞地accept就行。
redis肯定不会阻塞等。而是把这个文件描述符也通过aeCreateFileEvent()
放进事件循环器中。这样只要有新连接,就等于有可读事件。这时再去accept,就不用傻瓜地阻塞等待了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
void initServer(void) {
// ...
for (j = 0; j < server.ipfd_count; j++) {
// 将监听client请求的IO文件描述符加到事件循环器
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
// ...
while(max--) {
// 通过accept系统调用获取新连接的文件描述符
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0,cip);
}
}
|
新连接的文件描述符拿到后,也将它放进事件循环器中。毕竟不能阻塞地傻等数据从client发过来。
为这个新连接创建一个对应的client时,顺便把它加到事件循环器中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
client *createClient(int fd) {
// 为这个连接创建一个client
client *c = zmalloc(sizeof(client));
if (fd != -1) {
// 将文件描述符设置为noblocking
anetNonBlock(NULL,fd);
// 设置TCP_NODELAY,关闭Nagle算法
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
// 开启tcp keepalive
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// ok,可以将文件描述符加到事件循环器里了
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// ...
}
|
接下来,就正常等待client发命令到来,触发文件描述符的可读事件,在事件循环器里响应即可。
redis进行响应命令,执行对应的命令处理函数,把处理结果放在client的输出缓冲区里,同时通过aeCreateFileEvent()
监听当前client对应文件描述符的写事件。只要可写了,就把输出缓冲区的内容刷给client。
在哪儿询问是否描述符可写呢?就在上面提到的钩子函数beforesleep里。
1
2
3
4
5
6
7
|
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
// 如果某client的输出缓冲区不为空,则监听client对应的文件描述符的可写状态
// 等于将文件描述符注册到事件循环器中
handleClientsWithPendingWrites();
// ...
}
|
写完了也不能主动close掉,毕竟每次执行一个命令就开一个连接成本太大了。
常见策略就是定时把空闲的client连接断掉。这个工作主要放在定时任务中。
而client最近活跃的时间就是有命令到达,读取命令时。通过c->lastinteraction = server.unixtime
记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
// 注册定时任务
void initServer(void) {
// ...
// serverCron就是定时任务
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
// ...
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
// 断开空闲过久的client
clientsCron()
// ...
}
void clientsCron(void) {
// ...
while(listLength(server.clients) && iterations--) {
// ...
// 超出server.idletime,free掉
if (clientsCronHandleTimeout(c,now)) continue;
}
}
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
time_t now = now_ms/1000;
if (server.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
{
serverLog(LL_VERBOSE,"Closing idle client");
// freeclient里调用close
freeClient(c);
return 1;
}
// ...
return 0;
}
|