简介
服务初始化时会启动一个后台线程运行任务
有3种配置
1 no 不会生成aof文件
2 always 等到beforesleep执行时写入
3 everysec 等到beforesleep执行时提交任务给后台线程
优点:
1 丢数据少
2文件易读易修复
缺点:
1 设置比always可能会影响性能
2 文件较大(其实有rewrite机制)
3 恢复速度较慢
后台线程相关代码
void initServer(void) {
...
bioInit();
...
}
创建后台线程
void bioInit(void) {
...
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
...
}
执行后台任务
void *bioProcessBackgroundJobs(void *arg) {
...
while(1) {
listNode *ln;
如果任务为空,则等待唤醒
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
...
执行redis_fsync
else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
...
}
#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif
主线程相关代码
int main(int argc, char **argv) {
...
aeSetBeforeSleepProc(server.el,beforeSleep);
...
}
void beforeSleep(struct aeEventLoop *eventLoop) {
...
flushAppendOnlyFile(0);
...
}
void flushAppendOnlyFile(int force) {
...
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
...
}
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
唤醒后台任务工作线程
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
bgrewrite命令
struct redisCommand redisCommandTable[] = {
...
{"bgrewriteaof",bgrewriteaofCommand,1,"as",0,NULL,0,0,0,0,0},
...
}
void bgrewriteaofCommand(client *c) {
if (server.aof_child_pid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (server.rdb_child_pid != -1) {
server.aof_rewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReply(c,shared.err);
}
}
rewrite 子进程相关代码
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
如果开启aof,切当前的aof文件大小超过设置的server.aof_rewrite_min_size,并且变化超过了设置的aof_rewrite_perc则开启进程rewrite
if (server.aof_state == AOF_ON &&
server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
ldbPendingChildren())
{
...
else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
...
}
...
}
int rewriteAppendOnlyFileBackground(void) {
...
创建ipc,用于父子进程通信
if (aofCreatePipes() != C_OK) return C_ERR;
...
if ((childpid = fork()) == 0) {
...
重写
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
server.child_info_data.cow_size = private_dirty;
通知父进程aof重写结束
sendChildInfo(CHILD_INFO_TYPE_AOF);
exitFromChild(0);
} else {
exitFromChild(1);
}
}
int rewriteAppendOnlyFile(char *filename) {
rdb,aof混合模式
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
单aof模式
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
...
读取子进程写文件期间父进程数据的变化
aofReadDiffFromParent();
...
}
rewrite父进程相关代码
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
...
打开子进程创建的aof文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
...
写入子进程写aof文件期间父进程缓存的diff
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
...
覆盖之前的aof文件
if (rename(tmpfile,server.aof_filename) == -1) {
...
}
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;
listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;
if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}
int processCommand(client *c) {
...
call(c,CMD_CALL_FULL);
...
}
void call(client *c, int flags) {
...
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
...
}
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
写入aof的缓存区
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
}