本篇来看一下Redis的请求处理过程。
监听过程
void initServer(void) {
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL)
exit(1);
if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
if (server.tls_port != 0 && listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
exit(1);
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR)
exit(1);
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.ipfd file event.");
}
}
for (j = 0; j < server.tlsfd_count; j++) {
if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE, acceptTLSHandler,NULL) == AE_ERR)
{
serverPanic("Unrecoverable error creating server.tlsfd file event.");
}
}
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
}
监听端口后得到文件描述符,调用aeCreateFileEvent将文件描述符注册到事件循环中,注册监听可读事件。
看一下listenToPort:
int listenToPort(int port, int *fds, int *count) {
int j;
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
if (server.bindaddr[j] == NULL) {
int unsupported = 0;
fds[*count] = anetTcp6Server(server.neterr,port,NULL,server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
}
if (*count == 1 || unsupported) {
fds[*count] = anetTcpServer(server.neterr,port,NULL,server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
}
}
if (*count + unsupported == 2) break;
} else if (strchr(server.bindaddr[j],':')) {
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
} else {
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
}
if (fds[*count] == ANET_ERR) {
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
continue;
return C_ERR;
}
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return C_OK;
}
调用anetTcpServer或anetTcp6Server监听对应的地址,并将文件描述符设置为非阻塞的。
接受连接
在监听的socket对应的fd可读时(有新的连接请求),在事件循环中,会调用我们注册的处理函数acceptTcpHandler(不考虑TLS)。
我们来看一下acceptTcpHandler的过程:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
调用anetTcpAccept接受连接请求拿到客户端连接对应的文件描述符,connCreateAcceptedSocket创建对应的connection结构体,然后将其作为参数调用acceptCommonHandler。参数fd是产生就绪事件的文件描述符,对我们来说,就是监听tcp地址得到的fd。
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
char conninfo[100];
if (connGetState(conn) != CONN_STATE_ACCEPTING) {
connClose(conn);
return;
}
if ((c = createClient(conn)) == NULL) {
connClose(conn);
return;
}
c->flags |= flags;
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
freeClient(connGetPrivateData(conn));
return;
}
}
调用createClient创建对应的client结构体,然后调用connAccept。
来看一下createClient的主要逻辑:
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
selectDb(c,0);
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (conn) linkClient(c);
initClientMultiState(c);
return c;
}
- 设置连接为非阻塞读写。
- 开启TCP_NODELAY。
- 调用connSetReadHandler,读数据回调方法为readQueryFromClient,connSetReadHandler最终会调用connSocketSetReadHandler方法。
看一下connSocketSetReadHandler的主要逻辑:
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else if (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
- 设置读数据的回调方法。
- 调用aeCreateFileEvent将客户端连接对应的文件描述符注册到事件循环中,注册监听可读事件。
connAccept最终会调用connSocketAccept,我们来看一下它的逻辑:
static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
int ret = C_OK;
if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
conn->state = CONN_STATE_CONNECTED;
connIncrRefs(conn);
if (!callHandler(conn, accept_handler)) ret = C_ERR;
connDecrRefs(conn);
return ret;
}
参数accept_handler就是前面调用connAccept传递的clientAcceptHandler。
客户端命令处理
接受连接后,我们已经将客户端连接对应的文件描述符注册到了事件循环中,注册的事件处理方法是connSocketEventHandler。在客户端连接对应的文件描述符有就绪事件时,事件循环会调用connSocketEventHandler方法。
来看一下connSocketEventHandler:
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
connection *conn = clientData;
if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {
int conn_error = connGetSocketError(conn);
if (conn_error) {
conn->last_errno = conn_error;
conn->state = CONN_STATE_ERROR;
} else {
conn->state = CONN_STATE_CONNECTED;
}
if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
if (!callHandler(conn, conn->conn_handler)) return;
conn->conn_handler = NULL;
}
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}
根据事件类型调用具体的处理函数,客户端发来命令时,读事件产生,调用conn->read_handler,前面我们将其设置为了readQueryFromClient,也就是读事件会调用readQueryFromClient。通常在可读可写时,我们先处理读事件,再处理写事件,但是在conn->flags设置了CONN_FLAG_WRITE_BARRIER时,先处理写事件,再处理读事件。
看一下readQueryFromClient的主要逻辑:
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
if (postponeClientRead(c)) return;
readlen = PROTO_IOBUF_LEN;
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
freeClientAsync(c);
return;
}
} else if (nread == 0) {
freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
return;
}
processInputBuffer(c);
}
-
postponeClientRead主要是判断是否开启了ThreadIO,如果开启了且readQueryFromClient不是由beforeSleep或者processEventsWhileBlocked调用的,则设置client的CLIENT_PENDING_READ标志位,并将client放到server.clients_pending_read链表中,在下一次主循环时,会调用beforeSleep,beforeSleep会调用handleClientsWithPendingReadsUsingThreads用多个线程并行的从socket读取数据并解析命令。
int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1; } else { return 0; } }
ProcessingEventsWhileBlocked标志在进入processEventsWhileBlocked方法时设置,在走出方法时清空。processEventsWhileBlocked在执行长耗时的操作时会被调用,如加载aof日志或者rdb日志时,每加载一定数量时,调用一次processEventsWhileBlocked方法,lua脚本超时也会调用processEventsWhileBlocked。lua脚本超时会调用processEventsWhileBlocked的原因是,本质上Redis处理命令是单线程的,如果lua脚本一直占用CPU,会导致无法处理客户端的其它命令,包括来自客户端的SCRIPT KILL或者SHUTDOWN NOSAVE命令。
若是从beforeSleep第二次进入方法,或者未开启ThreadIO,或者从processEventsWhileBlocked进入方法时,也就是postponeClientRead返回0时,继续执行下面的流程,否则直接返回。
以set a aaa请求为例,对应的RESP(REdis Serialization Protocol)协议表示为*3\r\n$3\r\nset\r\n$1\r\na\r\n$3\r\naaa\r\n,对应的INLINE请求表示为set a aaa\r\n。PROTO_REQ_MULTIBULK表明这是一个符合RESP协议的请求,PROTO_REQ_INLINE表示这是一个符合INLINE协议的请求。如果是一个RESP请求,且正在读一个很大的参数,则增大读取的长度以便提前分配内存。
读取请求放到client的querybuf里。
调用processInputBuffer尝试处理整个命令,processInputBuffer在命令遇到没有完整读取时,会直接返回,等待更多数据的到来。
接着看一下processInputBuffer的主要逻辑:
void processInputBuffer(client *c) {
while(c->qb_pos < sdslen(c->querybuf)) {
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
if (c->flags & CLIENT_BLOCKED) break;
if (c->flags & CLIENT_PENDING_COMMAND) break;
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
if (!c->reqtype) {
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
if (processCommandAndResetClient(c) == C_ERR) {
return;
}
}
}
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}
- 首先判断是否有客户端执行了CLIENT PAUSE且还没有到达指定时间。如果有,暂停接收所有客户端(除了来自slave的连接)的数据。
- 判断客户端是否被block了,一些命令如BLPOP在没有数据时会阻塞当前客户端知道超时或者有数据时。如果当前客户端被阻塞了,暂停接收该客户端的数据。
- CLIENT_PENDING_COMMAND会在开启了ThreadIO的情况下出现,如果该标志位被置位,说明该客户端已经有一个未处理的命令,不再继续解析下一个命令。
- 如果当前调用是因为lua脚本超时导致processEventsWhileBlocked调用的,且请求是由master发来的(数据同步请求),则不处理,避免出现错误数据。
- 如果当前客户端被设置了CLIENT_CLOSE_AFTER_REPLY或CLIENT_CLOSE_ASAP标志位,此时已不需要从客户端读取数据,该连接会在稍后被断开。
- 判断请求的格式是RESP格式还是INLINE格式。
- 如果是INLINE请求,按照INLINE格式解析请求,如果无法读取完整的命令则直接返回,等待完整的数据到来后再次解析。如果这个INLINE请求是一个Gopher请求,则提前处理它,Gopher请求是一类特殊的请求,以/开头或者为空,如/foo\r\n代表获取键为/foo的值,\r\n代表获取键为/的值。如果不是Gopher请求则继续执行,执行解析出的命令。
- 如果是RESP请求,则按照RESP协议格式解析,如果无法读取完整的命令则直接返回,等待完整的数据到来后再次解析。
- 调用processCommandAndResetClient(c)执行解析出来的命令。
下一篇文章我们分析解析出的命令的执行过程。