Redis(5.0.3)里一个简单请求如何被处理
set text “hello world” 从进入服务器到输出结果,整个流程是怎样的?带着这个问题来看看源码。
在命令进入服务之前,服务器需要先初始化好自己
与这个场景相关的,两件事比较重要:
- 先注册好所有支持的Redis命令
- 初始化并启动事件循环器
所有的Redis命令先是被组织成一个table,里面包含每个命令的名称、对应处理函数、flag、调用次数等信息。
1
2
3
4
5
6
7
8
|
// src/server.c
// 太多了只给出小部分
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
// ...
}
|
这个redisCommandTable被映射到哈希结构里,就是Redis内部定义的Dict结构。O(1)就能取到对应命令的处理函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// src/server.c
void initServerConfig(void) {
// ...
// 设置命令映射表
populateCommandTable();
// ...
}
void populateCommandTable(void) {
// ...
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
// ...
// 把每个命令放到哈希结构中
retval1 = dictAdd(server.commands, sdsnew(c->name), c);
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
}
}
|
然后启动事件循环器,监听默认的6379端口,并设置socket为no_blocking。
将监听6379端口的socket包装为一个aeFileEvent对象,通过aeCreateFileEvent()
注册到事件循环器里。注册时,会还会注册一个回调函数acceptTcpHandler()
。即有新连接要到来时,就调用回调函数进行accept。
accept到的就是client连接。accept返回一个文件描述符,也将它注册进事件循环器里。这样之后client发起一个set text "hello world"
请求到达server时,文件描述符变得可读,事件循环器会捕获到此事件并调用对应的回调函数readQueryFromClient()
。
每个连接进来,都会创建一个对应的client对象,里面存储client发起的命令,输入/出缓冲区等信息。
所有client对象也会被挂到server.clients链表上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
// src/server.c
void initServer(void) {
// ...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1)
// ...
// 将监听client请求的IO文件事件加到事件循环器中
for (j = 0; j < server.ipfd_count; j++) {
// acceptTcpHandler 就是回调函数
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// ...
}
// src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// ...
while(max--) {
// 通过accept系统调用获取client连接的文件描述符,及client ip/port
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// ...
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(cfd,0,cip);
}
}
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 为新连接绑定一个client对象,并且将连接对应的文件描述符放进事件循环器里
if ((c = createClient(fd)) == NULL) {
// ...
close(fd); /* May be already closed, just ignore errors */
return;
}
// ...
}
client *createClient(int fd) {
// 为这个连接创建一个client
client *c = zmalloc(sizeof(client));
if (fd != -1) {
// 将文件描述符设置为noblocking
anetNonBlock(NULL,fd);
// 设置TCP_NODELAY,关闭Nagle算法
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive) // 开启tcp keepalive
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// ok,可以将文件描述符加到事件循环器里了
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
{
// ...
}
}
// 默认客户端操作redis第一个db
selectDb(c,0);
// ... 这里是一系列client对象的初始化操作
// 新client追加到server.clients链表后
if (fd != -1) linkClient(c);
initClientMultiState(c);
return c;
}
|
回调函数就是在事件循环器中被触发的。拿到有就绪事件的文件描述符后,判断是读还是写,再调用对应的回调函数(fe->rfileProc()
和fe->wfileProc()
)。回调函数的类型为void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// src/server.c
// aeMain() -> aeProcessEvents()
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
// ...
numevents = aeApiPoll(eventLoop, tvp)
// ...
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd
// ...
if (!invert && fe->mask & mask & AE_READABLE) {
// 读事件处理函数
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// 写事件处理函数
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
}
// ...
}
|
现在可以发送命令了
127.0.0.1:6379> set text “hello world”
敲下回车键,命令通过TCP协议到了server后,accept到新的套接字,并且是可读状态。
这时注册的回调函数readQueryFromClient()
就被触发调用。它是所有命令的入口。
上面的源码也能看到,注册readQueryFromClient()
之前,是为新连接创建一个client对象,命令的内容,client的属性,输入/输出缓冲区等都是与这个client绑定的。
readQueryFromClient()
通过系统调用read()
从套接字里读取命令,放在client.querybuf。读取的字节数是有限制的,读取到的内容也有长度长限,超过上限就会拒连释放client对象。
对于流入流出redis的字节数,自然也是在read和write这两个环节中被记录。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
// src/server.h
#define PROTO_IOBUF_LEN (1024*16)
// src/networking.c
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
readlen = PROTO_IOBUF_LEN;
// ...
// 读取client发送的命令
nread = read(fd, c->querybuf+qblen, readlen);
// ...
// 设置buf的长度字段
sdsIncrLen(c->querybuf,nread);
// ...
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
// 记录入流量的字节数
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
// 当前client的query缓冲区超出最大限制,拒绝命令的进一步处理,并打log
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
// 在processInputBufferAndReplicate做分发处理命令
processInputBufferAndReplicate(c);
}
|
processInputBufferAndReplicate()
会区分client是不是master节点来响应命令,两种处理方式当然有些差别。不过这里我们先不关心replicate。
读取的命令放在client.querybuf后,是需要按照redis的通信协议进行解析的。解析完做一些常规的检查,例如命令是否存在,命名参数是否合法等。
检查是在processCommand()
里进行的。通过后,就可以调用注册好的命令回调函数来处理了。逻辑入口就是processInputBuffer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
// src/networking.c
void processInputBuffer(client *c) {
server.current_client = c;
while(c->qb_pos < sdslen(c->querybuf)) {
// 这里有一些检查,主要用于判断命令的处理是否有必要进行
// 用户在此之前执行了client pause命令
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
// https://redis.io/topics/protocol redis使用的通信文本协议
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
// 数组
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
// 解释client命令文本
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
// 正式响应client命令
if (processCommand(c) == C_OK) {
// ...
// 如果client是正在执行着阻塞式命令,就先不resetclient
// 否则,执行的其他非阻塞命令就resetclient,这样可以接着处理此client接下来发送的命令
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
// ...
}
}
// ...
}
|
processCommand()
方法可长了,会有一系列的不同模式下的处理方式与检查方式,例如quit命令的特殊处理、权限的鉴权、redis cluster模式下对命令的响应逻辑,设置了min-slaves-to-write后的检查拦截逻辑等待。最后前面拦截检查都通过后,最后就调用call()
来执行命令。
而call()
是redis里执行命令的核心,所以前后肯定又是一系列的检查。关键的就是c->cmd->proc(c);
,它就是调用命令注册的回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// src/server.c
int processCommand(client *c) {
// ...
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
void call(client *c, int flags) {
// ...
start = ustime();
c->cmd->proc(c);
// 命令执行耗时会被记录,这样才会有慢查询日志
duration = ustime()-start
// ...
}
|
而本文的set命令对应回调函数就是setCommand()
,它位于src/t_string.c。执行成功后一般给client返回OK,这个OK字符串就是通过addReply()
方法写到client的输出缓冲区的。
setCommand()
里的一顿操作我们先不关注,重点来看看addReply()
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// src/networking.c
void addReply(client *c, robj *obj) {
// 检查该client能不能回复, 可以回复的话,加到server.client_pending_write队列上
// 例如对client为master节点默认就是不回复命令执行结果
if (prepareClientToWrite(c) != C_OK) return;
// 判断返回内容的类型,字符串还是数字
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyStringToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
|
看完addReply()
的整个处理过程,也看不到怎样给client发送回复,都是把回复内容写到输出缓冲区里。怎么返回client结果呢?
返回命令执行结果
其实在redis里,事件的处理顺序是:
- client输出缓存有内容则返回执行结果
- 读取client发送的命令,执行命令,缓存执行结果
- 执行定时任务
就是这样周而复始。
这个循环在哪儿呢?就是aeMain()
啦。而第一步给client返回执行结果的逻辑被放置在eventLoop->beforesleep()
这个钩子里。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 处理IO事件与时间事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
// 这是注册到钩子的回调函数
void beforeSleep(struct aeEventLoop *eventLoop) {
// ...
// 如果有client的输出缓冲区不为空,则监听client对应的文件描述符的可写状态
// 等于将文件描述符注册到事件循环器中
handleClientsWithPendingWrites();
// ...
}
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
// 所有输出缓存区有内容的client都在clients_pending_write队列上
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
// ...
// 先写一次
if (writeToClient(c->fd,c,0) == C_ERR) continue;
// 之前那一次没写完,那就监听可写事件,注册回调函数,直至写完为止
// 如果输出内容多,tcp协议栈的写缓存有限,不可能一次性全写
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
// 回调函数sendReplyToClient其实本质也是调用writeToClient。
// 只不过包装一层,使其可以作为ae的回调函数
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
|
以上就是一个简单命令的处理流程。