Redis(5.0.3)里一个简单请求如何被处理


set text “hello world” 从进入服务器到输出结果,整个流程是怎样的?带着这个问题来看看源码。

在命令进入服务之前,服务器需要先初始化好自己

与这个场景相关的,两件事比较重要:

  1. 先注册好所有支持的Redis命令
  2. 初始化并启动事件循环器

所有的Redis命令先是被组织成一个table,里面包含每个命令的名称、对应处理函数、flag、调用次数等信息。

1
2
3
4
5
6
7
8
// src/server.c
// 太多了只给出小部分
struct redisCommand redisCommandTable[] = {
  {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
  {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
  {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
  // ...
}

这个redisCommandTable被映射到哈希结构里,就是Redis内部定义的Dict结构。O(1)就能取到对应命令的处理函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/server.c
void initServerConfig(void) {
  // ...
  // 设置命令映射表
  populateCommandTable();
  // ...
}

void populateCommandTable(void) {
  // ...
  int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

  for (j = 0; j < numcommands; j++) {
    struct redisCommand *c = redisCommandTable+j;
    // ...

  	// 把每个命令放到哈希结构中
  	retval1 = dictAdd(server.commands, sdsnew(c->name), c);
  	retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);

  }
}

然后启动事件循环器,监听默认的6379端口,并设置socket为no_blocking。

将监听6379端口的socket包装为一个aeFileEvent对象,通过aeCreateFileEvent()注册到事件循环器里。注册时,会还会注册一个回调函数acceptTcpHandler()。即有新连接要到来时,就调用回调函数进行accept。

accept到的就是client连接。accept返回一个文件描述符,也将它注册进事件循环器里。这样之后client发起一个set text "hello world"请求到达server时,文件描述符变得可读,事件循环器会捕获到此事件并调用对应的回调函数readQueryFromClient()

每个连接进来,都会创建一个对应的client对象,里面存储client发起的命令,输入/出缓冲区等信息。

所有client对象也会被挂到server.clients链表上。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// src/server.c
void initServer(void) {
  // ...
  server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

  if (server.port != 0 &&
      listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
    exit(1)

  // ...
  // 将监听client请求的IO文件事件加到事件循环器中
  for (j = 0; j < server.ipfd_count; j++) {
      // acceptTcpHandler 就是回调函数
      if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
      {
        serverPanic(
          "Unrecoverable error creating server.ipfd file event.");
      }
    }
  // ...
}

// src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    // ...
    while(max--) {
      // 通过accept系统调用获取client连接的文件描述符,及client ip/port
      cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
      // ...
      serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
      acceptCommonHandler(cfd,0,cip);
    }
}

static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 为新连接绑定一个client对象,并且将连接对应的文件描述符放进事件循环器里
    if ((c = createClient(fd)) == NULL) {
        // ...
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
  // ...
}

client *createClient(int fd) {
    // 为这个连接创建一个client
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        // 将文件描述符设置为noblocking
        anetNonBlock(NULL,fd);
        // 设置TCP_NODELAY,关闭Nagle算法
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive) // 开启tcp keepalive
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // ok,可以将文件描述符加到事件循环器里了
        if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
        {
          // ...
        }
    }

    // 默认客户端操作redis第一个db
    selectDb(c,0);
    // ... 这里是一系列client对象的初始化操作
    // 新client追加到server.clients链表后
    if (fd != -1) linkClient(c);
    initClientMultiState(c);
    return c;
}

回调函数就是在事件循环器中被触发的。拿到有就绪事件的文件描述符后,判断是读还是写,再调用对应的回调函数(fe->rfileProc()fe->wfileProc())。回调函数的类型为void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// src/server.c
// aeMain() -> aeProcessEvents()
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
  // ...
  numevents = aeApiPoll(eventLoop, tvp)

  // ...
  for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd
    // ...
    if (!invert && fe->mask & mask & AE_READABLE) {
      // 读事件处理函数
      fe->rfileProc(eventLoop,fd,fe->clientData,mask);
      fired++;
    }

    if (fe->mask & mask & AE_WRITABLE) {
      if (!fired || fe->wfileProc != fe->rfileProc) {
        // 写事件处理函数
        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        fired++;
      }
    }
  }
  // ...
}

现在可以发送命令了

127.0.0.1:6379> set text “hello world”

敲下回车键,命令通过TCP协议到了server后,accept到新的套接字,并且是可读状态。

这时注册的回调函数readQueryFromClient()就被触发调用。它是所有命令的入口。

上面的源码也能看到,注册readQueryFromClient()之前,是为新连接创建一个client对象,命令的内容,client的属性,输入/输出缓冲区等都是与这个client绑定的。

readQueryFromClient()通过系统调用read()从套接字里读取命令,放在client.querybuf。读取的字节数是有限制的,读取到的内容也有长度长限,超过上限就会拒连释放client对象。

对于流入流出redis的字节数,自然也是在read和write这两个环节中被记录。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// src/server.h
#define PROTO_IOBUF_LEN         (1024*16)

// src/networking.c
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;

    readlen = PROTO_IOBUF_LEN;

    // ...

    // 读取client发送的命令
    nread = read(fd, c->querybuf+qblen, readlen);

    // ...

    // 设置buf的长度字段
    sdsIncrLen(c->querybuf,nread);
    // ...
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    // 记录入流量的字节数
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        // 当前client的query缓冲区超出最大限制,拒绝命令的进一步处理,并打log
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    // 在processInputBufferAndReplicate做分发处理命令
    processInputBufferAndReplicate(c);
}

processInputBufferAndReplicate()会区分client是不是master节点来响应命令,两种处理方式当然有些差别。不过这里我们先不关心replicate。

读取的命令放在client.querybuf后,是需要按照redis的通信协议进行解析的。解析完做一些常规的检查,例如命令是否存在,命名参数是否合法等。

检查是在processCommand()里进行的。通过后,就可以调用注册好的命令回调函数来处理了。逻辑入口就是processInputBuffer()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// src/networking.c
void processInputBuffer(client *c) {
    server.current_client = c;

    while(c->qb_pos < sdslen(c->querybuf)) {
        // 这里有一些检查,主要用于判断命令的处理是否有必要进行
        // 用户在此之前执行了client pause命令
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        if (c->flags & CLIENT_BLOCKED) break;
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        // https://redis.io/topics/protocol redis使用的通信文本协议
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                // 数组
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        // 解释client命令文本
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 正式响应client命令
            if (processCommand(c) == C_OK) {
                // ...

                // 如果client是正在执行着阻塞式命令,就先不resetclient
                // 否则,执行的其他非阻塞命令就resetclient,这样可以接着处理此client接下来发送的命令
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            // ...
        }
    }
    // ...
}

processCommand()方法可长了,会有一系列的不同模式下的处理方式与检查方式,例如quit命令的特殊处理、权限的鉴权、redis cluster模式下对命令的响应逻辑,设置了min-slaves-to-write后的检查拦截逻辑等待。最后前面拦截检查都通过后,最后就调用call()来执行命令。

call()是redis里执行命令的核心,所以前后肯定又是一系列的检查。关键的就是c->cmd->proc(c);,它就是调用命令注册的回调函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// src/server.c
int processCommand(client *c) {
  // ...

  /* Exec the command */
  if (c->flags & CLIENT_MULTI &&
      c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
      c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
  {
    queueMultiCommand(c);
    addReply(c,shared.queued);
  } else {
    call(c,CMD_CALL_FULL);
    c->woff = server.master_repl_offset;
    if (listLength(server.ready_keys))
      handleClientsBlockedOnKeys();
  }
  return C_OK;
}

void call(client *c, int flags) {
    // ...
    start = ustime();
    c->cmd->proc(c);
    // 命令执行耗时会被记录,这样才会有慢查询日志
    duration = ustime()-start
    // ...
}

而本文的set命令对应回调函数就是setCommand(),它位于src/t_string.c。执行成功后一般给client返回OK,这个OK字符串就是通过addReply()方法写到client的输出缓冲区的。

setCommand()里的一顿操作我们先不关注,重点来看看addReply()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// src/networking.c
void addReply(client *c, robj *obj) {
    // 检查该client能不能回复, 可以回复的话,加到server.client_pending_write队列上
    // 例如对client为master节点默认就是不回复命令执行结果
    if (prepareClientToWrite(c) != C_OK) return;

    // 判断返回内容的类型,字符串还是数字
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

看完addReply()的整个处理过程,也看不到怎样给client发送回复,都是把回复内容写到输出缓冲区里。怎么返回client结果呢?

返回命令执行结果

其实在redis里,事件的处理顺序是:

  1. client输出缓存有内容则返回执行结果
  2. 读取client发送的命令,执行命令,缓存执行结果
  3. 执行定时任务

就是这样周而复始。

这个循环在哪儿呢?就是aeMain()啦。而第一步给client返回执行结果的逻辑被放置在eventLoop->beforesleep()这个钩子里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 处理IO事件与时间事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

// 这是注册到钩子的回调函数
void beforeSleep(struct aeEventLoop *eventLoop) {
    // ...

    // 如果有client的输出缓冲区不为空,则监听client对应的文件描述符的可写状态
    // 等于将文件描述符注册到事件循环器中
    handleClientsWithPendingWrites();
  	// ...
}

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    // 所有输出缓存区有内容的client都在clients_pending_write队列上
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        // ...
        // 先写一次
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // 之前那一次没写完,那就监听可写事件,注册回调函数,直至写完为止
        // 如果输出内容多,tcp协议栈的写缓存有限,不可能一次性全写
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            // 回调函数sendReplyToClient其实本质也是调用writeToClient。
            // 只不过包装一层,使其可以作为ae的回调函数
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

以上就是一个简单命令的处理流程。