Redis(5.0.3)定时任务serverCron


redis里很多非响应命令的功能,例如持久化、内存淘汰、复制、cluster等等,是需要设置一个定时任务来完成的。并且这里部分的定时任务的执与响应命令的执行是混在一个线程中。意味着定时任务执行过慢会影响redis的性能。

本文主要介绍下redis是如何实现并包含大概哪些定时任务。

定时任务(时间事件)

redis定时任务被当成时间事件,与IO事件一同放到事件循环器中被处理。

关于事件循环器可以参考Redis事件驱动与连接管理

定时任务是在IO事件处理完之后才被执行的,所以redis的定时任务并不一定准点执行的。

在等待IO事件之前,会先获取下次定时任务应该被执行的时间点,减去当前时间,得出来的值就是阻塞等待IO事件的timeout。总不能无限等待下去。要是timeout前IO事件已经到来了也没关系,最后真正在执行定时任务前会做检查,没到时间点是不会被执行的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
static int processTimeEvents(aeEventLoop *eventLoop) {
  // ...
  // 获取当前时间
  aeGetTime(&now_sec, &now_ms);
  // 到达时间事件设置的执行时间
  if (now_sec > te->when_sec ||
      (now_sec == te->when_sec && now_ms >= te->when_ms))
  {
    // ...
    // te-timeProc 就是定时任务
    retval = te->timeProc(eventLoop, id, te->clientData);
    processed++;
    if (retval != AE_NOMORE) {
      aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
    }
    // ...
  }
  // ...
}

大任务包含小任务

开篇也说到,有多种redis功能是需要定时任务来协助的。它们就是小任务,全被写在一个大任务里(serverCron)。

而大任务就是时间事件。也即全局其实只有一个定时任务,每次执行它的时候,里面再执行各种小任务。

1
2
3
4
5
6
7
8
void initServer(void) {
  // ...
	if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    serverPanic("Can't create event loop timers.");
    exit(1);
  }
  // ...
}

大任务的执行频率由参数hz控制。默认值是10,表示每秒执行10次定时任务。反映在redis内部,就是每隔100ms执行一次大任务。

hz是一个重要的参数,它的不同取值对redis集群性能有所影响。暂不在本文展开。

但不是每个小任务都各100ms执行一次。例如同样是记录信息,命令执行个数,流入流出的流量可以每100ms记录一次。而db的过期key与非过期key的数量,就只需要5000ms记录一次即可。

但大任务是每(1000/hz)ms被调度执行一次的。里面的小任务如何控制调度的间隔时间呢?

每次大任务(serverCron)被执行完后,server.cronloops会自增一。表示大任务被调度的次数

就是#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz)))这个宏了。

它分两部分,任一满足即可执行。用一个例子看看

1
2
3
4
// 每100ms执行一次
run_with_period(100) { function1()}
// 每5000ms执行一次
run_with_period(100) { function2()}

就是这样,在大任务serverCron()里,个别小任务还可以通过run_with_period宏来控制调度的周期,其他小任务就统一按照大任务的调度频率在执行。

小任务列举

小任务很多,只做部分注释列举。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    // ...

    // 看门狗,用于调试,当执行命令耗时过长会发送SIGALRM信号,这时redis会响应信息并记录当前堆栈
    // src/debug.c/enableWatchdog 里注册信号处理函数
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

    // 将当前时间戳缓存起来,这个时间会在多个地方被使用
    // 使用缓存起来的时间戳比每次调用time系统调用更快
    updateCachedTime();

    server.hz = server.config_hz;
    // 5.0版本引入了动态hz,默认开启
    // 如果client很多,那定时任务执行的频率会加快
    if (server.dynamic_hz) {
        while (listLength(server.clients) / server.hz >
               MAX_CLIENTS_PER_CLOCK_TICK)
        {
            server.hz *= 2;
            if (server.hz > CONFIG_MAX_HZ) {
                server.hz = CONFIG_MAX_HZ;
                break;
            }
        }
    }

    // 每100ms执行一次
    run_with_period(100) {
        // 记录执行的命令个数
        trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
        // 记录读流量
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT,server.stat_net_input_bytes);
        // 记录写流量
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,server.stat_net_output_bytes);
    }

    // LRU相关
    unsigned long lruclock = getLRUClock();
    atomicSet(server.lruclock,lruclock);

    // 记录内存使用峰值
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();

    // 记录内存的使用信息,例如zmalloc的RSS信息,lua的内存信息
    run_with_period(100) {
        server.cron_malloc_stats.process_rss = zmalloc_get_rss();
        server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
        zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
                                   &server.cron_malloc_stats.allocator_active,
                                   &server.cron_malloc_stats.allocator_resident);
        if (!server.cron_malloc_stats.allocator_resident) {
            size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
            server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
        }
      	// ...
    }

    // initserver()中注册了信号处理函数,接受到SIGTERM信号后,会设置server.shutdown_asap为1
    // 而对SIGTERM信号的shutdown响应处理推迟到这里执行
    if (server.shutdown_asap) {
        if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
        serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        server.shutdown_asap = 0;
    }

    // 记录每个db里的基本信息,设置了过期时间key的数量,没有过期时间key的数量和db(即dict)的总大小,
    run_with_period(5000) {
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;

            size = dictSlots(server.db[j].dict);
            used = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (used || vkeys) {
                serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
            }
        }
    }

    // 记录client和slave个数及内存使用量
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            serverLog(LL_VERBOSE,
                "%lu clients connected (%lu replicas), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }

    // 专门针对client的定时任务
    // 1. 清理超过空闲上限的client
    // 2. 检查出输入缓冲区过大的client,收缩缓冲区,释放内存
    // 3. 记录输入/出缓冲区的内存使用峰值
    clientsCron();

    // 专门针对db的定时任务
    // 1. 清理过期的key
    // 2. 对db进行resize
    // 3. 对db进行rehash
    databasesCron();

    // ...
    // AOF相关
    // ...

    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }

    freeClientsInAsyncFreeQueue();

    // 复制相关
    run_with_period(1000) replicationCron();

    // 集群相关
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    // 哨兵模式相关
    if (server.sentinel_mode) sentinelTimer();

    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    // RDB相关
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.rdb_bgsave_scheduled &&
        (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
         server.lastbgsave_status == C_OK))
    {
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
            server.rdb_bgsave_scheduled = 0;
    }

    // 大任务被调度次数自增一
    server.cronloops++;
    return 1000/server.hz;
}

大任务下次被调度时间

每次大任务被执行完,需要对它设置好下一次应该调度的时间点。

serverCron()最后会return 1000/server.hz;。server.hz=10时,就是返回100ms。

回到本文的第一段代码,注意在processTimeEvents()里两行代码:

1
2
3
4
retval = te->timeProc(eventLoop, id, te->clientData);
if (retval != AE_NOMORE) {
  aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
}

时间事件处理完之后,retval=1000/server.hz。再用aeAddMillisecondsToNow()设置好下次调度的时间点。等待事件循环器下次调度即可。