redis服务器是一个事件驱动程序,服务器主要处理以下两类事件:
文件事件:redis服务器通过套接字与客户端进行连接,通信时会产生相应的文件事件,而服务器则通过监听并处理这些事件来完成一系列网络通信操作。简言之文件事件就是服务器对套接字操作的抽象;时间事件:redis服务器中的一些操作需要在给定的时间点执行,而时间事件就是服务器对这类定时操作的抽象。本文只借文件事件来研究redis中事件,时间事件以后再介绍。这里还是从initServer()方法中的部分代码看起:
... server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); ... // Create an event handler for accepting new connections in TCP and Unix domain sockets. for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.ipfd file event."); } } ... if (server.sofd > 0 && aeCreateFileEvent(server.el, server.sofd, AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) { serverPanic("Unrecoverable error creating server.sofd file event."); } ...先是调用了aeCreateEventLoop()方法创建以及初始化aeEventLoop结构体并返回复制给server.el。方法及结构体如下:
/* State of an event based program */ typedef struct aeEventLoop { int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ long long timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop; aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; ... }结构体中定义了文件事件、时间事件、已注册事件的最大描述符等信息,初始化时maxfd=-1表明当前还没有注册任何事件。回到上文的initServer()方法中,下面两部分分别是将tcp套接字和unix域套接字通过aeCreateFileEvent()方法创建文件事件,并针对不同的事件类型,事件处理程序也不同。如这里tcp套接字的事件处理程序是acceptTcpHandler()方法;而unix域套接字的事件处理程序是acceptUnixHandler()方法。当服务端接收到应答不同套接字的信息,就会调用对应的事件处理程序进行处理。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }如上可以看到,在aeCreateFileEvent()方法中实际是调用aeApiAddEvent()方法将套接字注册到之前定义的server.el中,这里如果选择的是epoll系统调用,那么aeApiAddEvent()方法体如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; }可以看到实际上调用的还是epoll_ctl()方法。再回到aeCreateFileEvent()方法中,同时也将事件处理程序也赋值给server.el结构体中对应套接字的文件事件结构体中。
/* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;这里在写法上它并不是直接将事件处理程序赋值,而是声明了一个指向该套接字对应的文件事件结构体的指针*fe,我猜测是这样后面赋值语句写起来会简洁些。
当有客户端连接到服务器中监听的套接字时,文件事件分发器就会调用对应的事件处理程序进行处理。假设这里是tcp连接,调用的也就是acceptTcpHandler()方法。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { ... while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); ... acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } } int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) { ... if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1) return ANET_ERR; ... return fd; } static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { int fd; while(1) { fd = accept(s,sa,len); ... } return fd; }上述三个方法从上往下依次调用,包括创建socket、通过该socket与客户端建立连接然后进行处理等等。
关于文件事件上文只介绍了其中少数几个方法,如aeCreateFileEvent()方法是接受一个套接字描述符、一个事件类型以及一个事件处理程序作为参数,将给定的套接字事件加入到I/O多路复用程序监听的范围之内,并将事件和事件处理程序关联起来。实际上还有其他几个api:
aeEventLoop *aeCreateEventLoop(int setsize); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); ...根据方法名基本就可大概猜出方法作用,此处就不多做介绍啦!
