Floyd&Raft的源码分析(三)

这篇是这个系列的最后一篇了,整个分析持续了两个月多,包括floyd和pink源码,差不多就每个周日会花几个小时分析一下,和查些资料,毕竟自己也是在学习过程中,可能会有些理解不正确的地方。
在学习raft的过程中,看了好些文章和别人的分析,理解起来似乎比较容易,但如果是自己从零实现一个可能要花些时间,况且不会灵活的运用到具体项目中,和结合现有的开源项目,比如leveldb和rocksdb等。
这篇结束后,准备分析下leveldb里的一些实现,可能网上已经有很多类型的文章,但看别人分析还不如自己深入源码,以前也做过类似的事情,比如redis和leveldb,但时间较久也没有沉淀下来,所以也陌生了。计划是后两个月分析下libco/pebble里的协程实现,后面大半年就分析单机版的kv存储如leveldb和消息中间件,比较倾向于rocketmq或消息队列phxqueue。

先列一下floyd框架,再分析下raft节点增加和减少的情况。
floyd是单进程多线程的框架项目,就直接上代码了,不画流程图和类图了。看了下example程序,在main中实例化一个floyd对象,如下:

368 Status Floyd::Open(const Options& options, Floyd** floyd) {
369   *floyd = NULL;
370   Status s;
371   FloydImpl *impl = new FloydImpl(options);
372   s = impl->Init();
373   if (s.ok()) {
374     *floyd = impl;
375   } else {
376     delete impl;
377   }
378   return s;
379 }

全部功能都放在Init中,部分代码如下:

274 Status FloydImpl::Init() {
275   slash::CreatePath(options_.path);
276   if (NewLogger(options_.path + "/LOG", &info_log_) != 0) {
277     return Status::Corruption("Open LOG failed, ", strerror(errno));
278   }
279 
280   // TODO(anan) set timeout and retry
281   worker_client_pool_ = new ClientPool(info_log_);
282 
283   // Create DB
284   rocksdb::Options options;
285   options.create_if_missing = true;
286   options.write_buffer_size = 1024 * 1024 * 1024;
287   options.max_background_flushes = 8;
288   rocksdb::Status s = rocksdb::DB::Open(options, options_.path + "/db/", &db_);
289   if (!s.ok()) {
291     return Status::Corruption("Open DB failed, " + s.ToString());
292   }
293 
294   s = rocksdb::DB::Open(options, options_.path + "/log/", &log_and_meta_);
295   if (!s.ok()) {  
297     return Status::Corruption("Open DB log_and_meta failed, " + s.ToString());
298   }
300   // Recover Context
301   raft_log_ = new RaftLog(log_and_meta_, info_log_);
302   raft_meta_ = new RaftMeta(log_and_meta_, info_log_);
303   raft_meta_->Init();
304   context_ = new FloydContext(options_);
305   context_->RecoverInit(raft_meta_);
306 
307   // Recover Members when exist
308   std::string mval;
309   Membership db_members;
310   s = db_->Get(rocksdb::ReadOptions(), kMemberConfigKey, &mval);
311   if (s.ok()
312       && db_members.ParseFromString(mval)) {
315     for (int i = 0; i < db_members.nodes_size(); i++) {
316       context_->members.insert(db_members.nodes(i));
317     }
318   } else {
319     BuildMembership(options_.members, &db_members);
320     if(!db_members.SerializeToString(&mval)) {
322       return Status::Corruption("Serialize Membership failed");
323     }
324     s = db_->Put(rocksdb::WriteOptions(), kMemberConfigKey, mval);
325     if (!s.ok()) {
327       return Status::Corruption("Record membership in db failed! error: " + s.ToString());
328     }
330     for (const auto& m : options_.members) {
331       context_->members.insert(m);
332     }
333   }
337   primary_ = new FloydPrimary(context_, &peers_, raft_meta_, options_, info_log_);
340   worker_ = new FloydWorker(options_.local_port, 1000, this);
341   int ret = 0;
342   if ((ret = worker_->Start()) != 0) {
344     return Status::Corruption("failed to start worker, return " + std::to_string(ret));
345   }
347   apply_ = new FloydApply(context_, db_, raft_meta_, raft_log_, this, info_log_);
348 
349   InitPeers();
354   if ((ret = primary_->Start()) != 0) {
356     return Status::Corruption("failed to start primary thread, return " + std::to_string(ret));
357   }
358   primary_->AddTask(kCheckLeader);
361   apply_->Start();
365   return Status::OK();
366 }

以上代码主要工作做了:创始日志,创建客户端对象池(里面会对每个server addr创建一条连接,源码在pink项目中)用于后续发送请求,创建两个db,用于state machine和log entry;从log中恢复状态数据比如term/voteip/voteport/commit_index/last_applied等;接着从db中或参数中恢复节点信息;创建FloydPrimary线程,此时还没启动,它的主要工作是心跳,定时检查leader,执行command,向其他节点发送vote rpc或append log entry rpc,体现在这三个函数中,由AddTask分发:

 62   static void LaunchHeartBeatWrapper(void *arg);
 63   void LaunchHeartBeat();
 64   static void LaunchCheckLeaderWrapper(void *arg);
 65   void LaunchCheckLeader();
 66   static void LaunchNewCommandWrapper(void *arg);
 67   void LaunchNewCommand();

接着创建FloydWorker线程并启动它,它的工作是处理请求,比如给节点线程发vote rpc/append log entry rpc任务,由DealMessage分发,然后根据request_type具体走不同的逻辑,调用FloydImpl类中的函数;
接着创建FloydApply线程,它的工作是执行command,即把log entry中的command apply到 state machine中,处理成员变更的情况;
接着根据节点个数,创建对应个数的节点线程并启动,主要工作是向对应server发送vote rpc/append log entry rpc请求和处理响应,维护状态等;
最后是启动FloydPrimary并立即发一个选举leader的任务(节点刚启动时都为follower),接着启动FloydApply;

以上线程,有些创建便启动,有些等其他线程启动完后再启动,这里有个顺序依赖,主要看谁驱动谁,比如FloydPrimary线程里会用到节点线程,就不能以相反的顺序start否则可能引起coredump;

大致整个框架差不多分析完了,pink中的相关源码没有在这里列出来,跳过了。

在成员变更的同时,需要保证安全必一,即“在任何时候,都不会出现双主。”
主要有两种方法:
One-Server变更:一阶段变更,要求每次成员组从G1变成G2时,G2相比G1加一个成员或者减一个成员。
Joint Consensus:支持任意的变更,即从成员组G1变成G2,不要求G1和G2有什么关联,比如可以完全没有交集。

第一种比较容易理解,实现起来也简单,floyd中也使用的第一种,这边大概说一下基本流程吧,至于为什么这种方法可行,可以参考下面链接的分析;

以增加成员为例,raft在收到增加server成员请求时,每次只增加一台server,后台程序的Leader执行流程如下:
-->AddServer-->BuildAddServerRequest-->DoCommand-->ExecuteCommand-->BuildLogEntry-->Append-->AddTask-->NoticePeerTask-->AddAppendEntriesTask

假设经过大多数的返回后,leader 把command apply到状态机中后,后续follower也推进apply id,把这条日志apply 状态机中去,此时流程如下:
-->Apply-->MembershipChange-->AddNewPeer

219 void FloydImpl::AddNewPeer(const std::string& server) {
220   if (IsSelf(server)) {
221     return;
222   } 
223   // Add Peer
224   auto peers_iter = peers_.find(server);
225   if (peers_iter == peers_.end()) {
226     LOGV(INFO_LEVEL, info_log_, "FloydImpl::ApplyAddMember server %s:%d add new peer thread %s",
227         options_.local_ip.c_str(), options_.local_port, server.c_str());
228     Peer* pt = new Peer(server, &peers_, context_, primary_, raft_meta_, raft_log_,
229         worker_client_pool_, apply_, options_, info_log_);
230     peers_.insert(std::pair<std::string, Peer*>(server, pt));
231     pt->Start();
232   }   
233 }

http://loopjump.com/raft_paper_note/
http://loopjump.com/raft_one_server_reconfiguration/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容