
    技术2024-12-02  27



    这一步表示Redis服务器基本数据结构和各种参数的初始化。在Redis源码中,Redis服务器是用一个叫做redisServer的struct来表达的,里面定义了Redis服务器赖以运行的各种参数,比如监听的端口号和文件描述符、当前连接的各个client端、Redis命令表(command table)配置、持久化相关的各种参数,以及事件循环结构。

    void initServerConfig(void) { int j; ... /* Command table -- we initiialize it here as it is part of the * initial configuration, since command names may be changed via * redis.conf using the rename-command directive. */ server.commands = dictCreate(&commandTableDictType,NULL); server.orig_commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); server.zpopminCommand = lookupCommandByCString("zpopmin"); server.zpopmaxCommand = lookupCommandByCString("zpopmax"); server.sremCommand = lookupCommandByCString("srem"); server.execCommand = lookupCommandByCString("exec"); server.expireCommand = lookupCommandByCString("expire"); server.pexpireCommand = lookupCommandByCString("pexpire"); server.xclaimCommand = lookupCommandByCString("xclaim"); server.xgroupCommand = lookupCommandByCString("xgroup"); server.rpoplpushCommand = lookupCommandByCString("rpoplpush"); /* Debugging */ server.assert_failed = "<no assertion failed>"; server.assert_file = "<no file>"; server.assert_line = 0; server.bug_report_start = 0; server.watchdog_period = 0; /* By default we want scripts to be always replicated by effects * (single commands executed by the script), and not by sending the * script to the slave / AOF. This is the new way starting from * Redis 5. However it is possible to revert it via redis.conf. */ server.lua_always_replicate_commands = 1; initConfigValues(); }



    void populateCommandTable(void) { int j; int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand); for (j = 0; j < numcommands; j++) { struct redisCommand *c = redisCommandTable+j; int retval1, retval2; /* Translate the command string flags description into an actual * set of flags. */ if (populateCommandTableParseFlags(c,c->sflags) == C_ERR) serverPanic("Unsupported command flag"); c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */ retval1 = dictAdd(server.commands, sdsnew(c->name), c); /* Populate an additional dictionary that will be unaffected * by rename-command statements in redis.conf. */ retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c); serverAssert(retval1 == DICT_OK && retval2 == DICT_OK); } }


    struct redisCommand redisCommandTable[] = { {"module",moduleCommand,-2, "admin no-script", 0,NULL,0,0,0,0,0,0}, {"get",getCommand,2, "read-only fast @string", 0,NULL,1,1,1,0,0,0}, /* Note that we can't flag set as fast, since it may perform an * implicit DEL of a large key. */ {"set",setCommand,-3, "write use-memory @string", 0,NULL,1,1,1,0,0,0}, ... }


    struct redisCommand { char *name; redisCommandProc *proc; int arity; char *sflags; /* Flags as string representation, one char per flag. */ uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */ /* Use a function to determine keys arguments in a command line. * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ int lastkey; /* The last argument that's a key */ int keystep; /* The step between first and last key */ long long microseconds, calls; int id; /* Command ID. This is a progressive ID starting from 0 that is assigned at runtime, and is used in order to check ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */ };



    void initConfigValues() { for (standardConfig *config = configs; config->name != NULL; config++) { config->interface.init(config->data); } }


    在Redis中,事件循环是用一个叫aeEventLoop的struct来表示的。「创建事件循环」这一步主要就是创建一个aeEventLoop结构,并存储到server全局变量(即前面提到的redisServer类型的结构)中。另外,事件循环的执行依赖系统底层的I/O多路复用机制(I/O multiplexing),比如Linux系统上的epoll机制。因此,这一步也包含对于底层I/O多路复用机制的初始化(调用系统API)。

    aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }


    服务器程序需要监听才能收到请求。根据配置,这一步可能会打开两种监听:对于TCP连接的监听和对于Unix domain socket的监听。Unix domain socket是一种高效的进程间通信(IPC)机制,在POSIX规范中也有明确的定义,用于在同一台主机上的两个不同进程之间进行通信,比使用TCP协议性能更高(因为省去了协议栈的开销)。当使用Redis客户端连接同一台机器上的Redis服务器时,可以选择使用「Unix domain socket」进行连接。但不管是哪一种监听,程序都会获得文件描述符,并存储到server全局变量中。对于TCP的监听来说,由于监听的IP地址和端口可以绑定多个,因此获得的用于监听TCP连接的文件描述符也可以包含多个。后面,程序就可以拿这一步获得的文件描述符去注册I/O事件回调了。

    int listenToPort(int port, int *fds, int *count) { int j; /* Force binding of if no bind address is specified, always * entering the loop if j == 0. */ if (server.bindaddr_count == 0) server.bindaddr[0] = NULL; for (j = 0; j < server.bindaddr_count || j == 0; j++) { if (server.bindaddr[j] == NULL) { int unsupported = 0; /* Bind * for both IPv6 and IPv4, we enter here only if * server.bindaddr_count == 0. */ fds[*count] = anetTcp6Server(server.neterr,port,NULL, server.tcp_backlog); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; } else if (errno == EAFNOSUPPORT) { unsupported++; serverLog(LL_WARNING,"Not listening to IPv6: unsupported"); } if (*count == 1 || unsupported) { /* Bind the IPv4 address as well. */ fds[*count] = anetTcpServer(server.neterr,port,NULL, server.tcp_backlog); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; } else if (errno == EAFNOSUPPORT) { unsupported++; serverLog(LL_WARNING,"Not listening to IPv4: unsupported"); } } /* Exit the loop if we were able to bind * on IPv4 and IPv6, * otherwise fds[*count] will be ANET_ERR and we'll print an * error and return to the caller with an error. */ if (*count + unsupported == 2) break; } else if (strchr(server.bindaddr[j],':')) { /* Bind IPv6 address. */ fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog); } else { /* Bind IPv4 address. */ fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog); } if (fds[*count] == ANET_ERR) { serverLog(LL_WARNING, "Could not create server TCP listening socket %s:%d: %s", server.bindaddr[j] ? server.bindaddr[j] : "*", port, server.neterr); if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL) continue; return C_ERR; } anetNonBlock(NULL,fds[*count]); (*count)++; } return C_OK; }


    Redis作为一个单线程(single-threaded)的程序,它如果想调度一些异步执行的任务,比如周期性地执行过期key的回收动作,除了依赖事件循环机制,没有其它的办法。这一步就是向前面刚刚创建好的事件循环中注册一个timer事件,并配置成可以周期性地执行一个回调函数:serverCron。由于Redis只有一个主线程,因此这个函数周期性的执行也是在这个线程内,它由事件循环来驱动(即在合适的时机调用),但不影响同一个线程上其它逻辑的执行(相当于按时间分片了)。serverCron函数到底做了什么呢?实际上,它除了周期性地执行过期key的回收动作,还执行了很多其它任务,比如主从重连、Cluster节点间的重连、BGSAVE和AOF rewrite的触发执行,等等

    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; }


    Redis服务端最主要的工作就是监听I/O事件,从中分析出来自客户端的命令请求,执行命令,然后返回响应结果。对于I/O事件的监听,自然也是依赖事件循环。前面提到过,Redis可以打开两种监听:对于TCP连接的监听和对于Unix domain socket的监听。因此,这里就包含对于这两种I/O事件的回调的注册,两个回调函数分别是acceptTcpHandler和acceptUnixHandler。对于来自Redis客户端的请求的处理,就会走到这两个函数中去。另外,其实Redis在这里还会注册一个I/O事件,用于通过管道(pipe)机制与module进行双向通信。

    /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } for (j = 0; j < server.tlsfd_count; j++) { if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, acceptTLSHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.tlsfd file event."); } } if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " "blocked clients subsystem."); }


    void InitServerLast() { bioInit(); initThreadedIO(); set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); }


    Redis会创建一些额外的线程,在后台运行,专门用于处理一些耗时的并且可以被延迟执行的任务(一般是一些清理工作)。在Redis里面这些后台线程被称为bio(Background I/O service)。它们负责的任务包括:可以延迟执行的文件关闭操作(比如unlink命令的执行),AOF的持久化写库操作(即fsync调用,但注意只有可以被延迟执行的fsync操作才在后台线程执行),还有一些大key的清除操作(比如flushdb async命令的执行)。可见bio这个名字有点名不副实,它做的事情不一定跟I/O有关。对于这些后台线程,我们可能还会产生一个疑问:前面的初始化过程,已经注册了一个timer事件回调,即serverCron函数,按说后台线程执行的这些任务似乎也可以放在serverCron中去执行。因为serverCron函数也是可以用来执行后台任务的。实际上这样做是不行的。前面我们已经提到过,serverCron由事件循环来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其它操作(主要是对于命令请求的执行)按时间进行分片了。这样的话,serverCron里面就不能执行过于耗时的操作,否则它就会影响Redis执行命令的响应时间。因此,对于耗时的、并且可以被延迟执行的任务,就只能放到单独的线程中去执行了。

    void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } }



    void initThreadedIO(void) { io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); } /* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */ /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); io_threads_pending[i] = 0; pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }



    void loadDataFromDisk(void) { long long start = ustime(); if (server.aof_state == AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself))) && rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db * of -1 inside the RDB file in a wrong way, see more * information in function rdbPopulateSaveInfo. */ rsi.repl_stream_db != -1) { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; /* If we are a slave, create a cached master from this * information, in order to allow partial resynchronizations * with masters. */ replicationCacheMasterUsingMyself(); selectDb(server.cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); exit(1); } } }



    aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el);
    Processed: 0.013, SQL: 9