redis aof 总结

简介

服务初始化时会启动一个后台线程运行任务
有3种配置
1 no 不会生成aof文件
2 always  等到beforesleep执行时写入
3 everysec 等到beforesleep执行时提交任务给后台线程


优点:
1 丢数据少
2文件易读易修复
缺点:
1 设置比always可能会影响性能
2 文件较大(其实有rewrite机制)
3 恢复速度较慢

后台线程相关代码

void initServer(void) {
...
  bioInit();
...
}

创建后台线程
void bioInit(void) {
...
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
...
}

执行后台任务
void *bioProcessBackgroundJobs(void *arg) {
...

    while(1) {
        listNode *ln;

如果任务为空,则等待唤醒
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
...
执行redis_fsync
else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
...
}

#ifdef __linux__
#define redis_fsync fdatasync
#else
#define redis_fsync fsync
#endif

主线程相关代码

int main(int argc, char **argv) {
...
    aeSetBeforeSleepProc(server.el,beforeSleep);

...
}

void beforeSleep(struct aeEventLoop *eventLoop) {
...
    flushAppendOnlyFile(0);


...
}
void flushAppendOnlyFile(int force) {
...
  if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
...
}

void aof_background_fsync(int fd) {
    bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}


void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
唤醒后台任务工作线程
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

bgrewrite命令

struct redisCommand redisCommandTable[] = {
...
    {"bgrewriteaof",bgrewriteaofCommand,1,"as",0,NULL,0,0,0,0,0},
...
}

void bgrewriteaofCommand(client *c) {
    if (server.aof_child_pid != -1) {
        addReplyError(c,"Background append only file rewriting already in progress");
    } else if (server.rdb_child_pid != -1) {
        server.aof_rewrite_scheduled = 1;
        addReplyStatus(c,"Background append only file rewriting scheduled");
    } else if (rewriteAppendOnlyFileBackground() == C_OK) {
        addReplyStatus(c,"Background append only file rewriting started");
    } else {
        addReply(c,shared.err);
    }
}

rewrite 子进程相关代码

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
如果开启aof,切当前的aof文件大小超过设置的server.aof_rewrite_min_size,并且变化超过了设置的aof_rewrite_perc则开启进程rewrite
 if (server.aof_state == AOF_ON &&
            server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
...
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
      ldbPendingChildren())
    {
...
else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
                if (!bysignal && exitcode == 0) receiveChildInfo();
...
}
...
}

int rewriteAppendOnlyFileBackground(void) {
...
创建ipc,用于父子进程通信
    if (aofCreatePipes() != C_OK) return C_ERR;
...
if ((childpid = fork()) == 0) {
...
   重写     
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty; 
            通知父进程aof重写结束
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
}
int rewriteAppendOnlyFile(char *filename) {
rdb,aof混合模式
  if (server.aof_use_rdb_preamble) {
        int error;
        if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
单aof模式
    } else {
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }
...
读取子进程写文件期间父进程数据的变化
    aofReadDiffFromParent();
...

}


rewrite父进程相关代码

void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
...
打开子进程创建的aof文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
            (int)server.aof_child_pid);
        newfd = open(tmpfile,O_WRONLY|O_APPEND);
        if (newfd == -1) {
            serverLog(LL_WARNING,
                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
            goto cleanup;
        }
...
写入子进程写aof文件期间父进程缓存的diff
       if (aofRewriteBufferWrite(newfd) == -1) {
            serverLog(LL_WARNING,
                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
            close(newfd);
            goto cleanup;
        }
...
覆盖之前的aof文件
if (rename(tmpfile,server.aof_filename) == -1) {
...
}

ssize_t aofRewriteBufferWrite(int fd) {
    listNode *ln;
    listIter li;
    ssize_t count = 0;

    listRewind(server.aof_rewrite_buf_blocks,&li);
    while((ln = listNext(&li))) {
        aofrwblock *block = listNodeValue(ln);
        ssize_t nwritten;

        if (block->used) {
            nwritten = write(fd,block->buf,block->used);
            if (nwritten != (ssize_t)block->used) {
                if (nwritten == 0) errno = EIO;
                return -1;
            }
            count += nwritten;
        }
    }
    return count;
}

int processCommand(client *c) {
...
        call(c,CMD_CALL_FULL);
...
}
void call(client *c, int flags) {
...
     if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
...
}


void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

写入aof的缓存区
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。