Redis Threaded IO
Redis 6.0开始支持多线程,Redis分主线程和IO线程,IO线程只用于读取客户端命令和发送回复数据给客户端,客户端命令依旧是由主线程来执行。
Redis多线程的实现
为什么要使用多线程呢?
Redis将所有数据放在内存中,内存的响应速度在纳秒级别,对于小数据包,Redis服务器可以处理80,000到100,000 QPS,所以对于大部分公司,单线程的Redis足够了。
但是Redis从自身的角度来说,因为读写网络的read/write系统调用占用了Redis执行期间大部分CPU时间,瓶颈主要在于网络IO消耗,网络IO主要延时由服务器响应延时+带宽限制+网络延时+跳转路由延时,一般在毫秒级别。
多线程Redis主要为了利用多核CPU,目前主线程只能利用一个核,多线程任务可以分摊Redis同步IO读写的负荷。
Redis6.0多线程开启方法
要开启Redis的IO线程功能,需要修改redis.conf配置文件:
io-threads-do-reads yes # 开启IO线程
io-threads 6 # 设置IO线程数
线程数的设置,官方建议:4核的设置2或3个线程,8核的建议设置6个线程,线程数一定要小于机器核数,线程数并不是越大越好,官方认为超过8个基本没什么意义了。
initThreadedIO
initThreadedIO会根据redis.conf里配置的io-threads设置的IO线程数初始化,代码如下:
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
//用户没有开启多线程,使用主线程来处理
if (server.io_threads_num == 1) return;
//线程数超过128
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. index[0]为主线程*/
/* Things we do only for the additional threads. 非主线程*/
pthread_t tid;
//为线程初始化对应的锁
pthread_mutex_init(&io_threads_mutex[i],NULL);
//线程等待状态初始化为 0
io_threads_pending[i] = 0;
//初始化后将线程暂时锁住
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
// 将index 和对应线程 ID 加以映射
io_threads[i] = tid;
}
}
io_threads_num == 1 按单线程处理
io_threads_num > IO_THREADS_MAX_NUM 超过上限128,按异常处理
initThreadedIO()函数的主要工作是:
- 为每个IO线程创建一个链表,用于放置要进行IO操作的客户端连接。
- 为每个IO线程创建一个锁,用于主线程与IO线程的通信。
- 调用pthread_create来创建IO线程,IO线程的主体函数是IOThreadMain()。
IOThreadMain
下面来分析一下IO线程的主体函数主要完成的工作:
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
IO线程的主体函数主要完成以下几个操作:
- 等待主线程分配客户端连接(对应IO线程的io_threads_list链表不为空)。
- 判断当前是进行读操作还是写操作( io_threads_op 等于 IO_THREADS_OP_WRITE 表示要进行写操作,而 io_threads_op 等于 IO_THREADS_OP_READ 表示要进行读操作)。 如果是进行写操作,那么就调用 writeToClient() 函数向客户端连接进行发送数据。 如果是读操作,那么 就 调用 readQueryFromClient() 函数读取客户端连接的请求。
- 完成对客户端连接的读写操作后,需要清空对应IO线程的 io_threads_list 链表和计数器 io_threads_pending ,用于通知主线程已经完成读写操作。
postponeClientRead
主线程是怎样分配客户端连接给各个IO线程的呢?
主线程在接收到客户端连接后,会把客户端连接添加到事件驱动库中监听读事件,读事件的回调函数为readQueryFromClient()。也就是客户端连接可读时会触发调用readQueryFromClient()函数,而readQueryFromClient()函数会调用postponeClientRead()函数判断当前Redis是否开启了IO线程功能
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active && //线程是否在不断等待IO
server.io_threads_do_reads && //是否多线程IO读取
!ProcessingEventsWhileBlocked &&
!(c->flags &
(CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{//不能是主从,且未处于等待读取的状态
c->flags |= CLIENT_PENDING_READ;//将client设置为等待读取的状态flag
listAddNodeHead(server.clients_pending_read,c);// 将这个client加入到等待读取队列
return 1;
} else {
return 0;
}
}
postponeClientRead在判断开启IO线程功能后,会调用listAddNodeHead把客户端连接添加到clients_pending_read链表中,并且设置标志位CLIENT_PENDING_READ,避免二次添加。
server里维护了一个clients_pending_read,包含所有处于读事件pending的客户端列表。
handleClientsWithPendingReadsUsingThreads
如何分配客户端给thread
handleClientsWithPendingReadsUsingThreads函数中把客户端连接分配给各个IO线程。
/* When threaded I/O is also enabled for the reading + parsing side, the
* readable handler will just put normal clients into a queue of clients to
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
//redis检查有多少等待读的client
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
//分配给各个IO线程
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
//如果长度不为0,进行while循环,将每个等待的client分配给线程,当等待长度超过线程数时,每个线程分配到的client可能会超过1个:
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
//设置各个IO线程负责的客户端连接数
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
//主线程也要负责一部分客户端连接的读写操作
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
//等待所有IO线程完成
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
//执行各个客户端连接的命令
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
//最后清空client_pendign_read
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
processInputBuffer(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
- 分配客户端连接给各个IO线程(添加到对应IO线程的io_threads_list链表中),分配策略为轮询。
- 设置各个IO线程赋值的客户端连接数io_threads_pending。
- 处理主线程赋值那部分客户端连接的读写操作。
- 等待所有IO线程完成读取客户端连接请求的命令。
- 执行各个客户端连接请求的命令。
IO线程在完成读取客户端连接的请求后,会把io_threads_pending计数器清零,主线程就是通过检测io_threads_pending计数器来判断是否所有IO线程都完成了对客户端连接的读取命令操作。
startThreadedIO
如何处理读请求
当任务分发完毕后,每个线程按照正常流程将自己负责的Client的读取缓冲区的内容进行处理
每轮处理中,需要将各个线程的锁开启,并打相应的标志位
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
serverAssert(io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
// 解开线程的锁定状态
pthread_mutex_unlock(&io_threads_mutex[j]);
// 现在可以开始多线程 IO 执行对应读 /写任务
io_threads_active = 1;
}
结束时,需要检查是否有剩余待读的IO,如果没有,将线程锁定,标志关闭
void stopThreadedIO(void) {
// 需要停止的时候可能还有等待读的 Client 在停止前进行处理
handleClientsWithPendingReadsUsingThreads();
if (tio_debug) { printf("E"); fflush(stdout); }
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
// 本轮 IO 结束 将所有线程上锁
pthread_mutex_lock(&io_threads_mutex[j]);
// IO 状态设置为关闭
io_threads_active = 0;
}
Redis的Threaded IO模型中,每次所有的线程都只能进行读或者写操作,通过io_threads_op控制,同时每个线程中负责的client依次执行:
// 每个 thread 有可能需要负责多个 client
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
// 当前全局处于写事件时,向输出缓冲区写入响应内容
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// 当前全局处于读事件时,从输入缓冲区读取请求内容
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
每个线程执行readQueryFromClient,将对应的请求放入一个队列中,单线程执行,最后类似地由多线程将结果写入客户端的buffer中。
总结
Redis Threaded IO将服务读Client的输入缓冲区和将执行结果写入输出缓冲区的过程改为了多线程的模型,同时保持同一时间全部线程均处于读或者写的状态。但是命令的具体执行仍是以单线程(队列)的形式,因为Redis希望保持坚定的结构避免处理锁和竞争的问题,并且读写缓冲区的时间(IO)时间远远大于命令执行时间。
原文:https://www.v2ex.com/t/646669
https://www.codercto.com/a/114464.html