Fabric PBFT可插拔共识详解

下面将从下面几个部分介绍PBFT共识机制

  1. Fabric事件机制
  2. Fabric可插拔共识机制
  3. Fabric共识引擎初始化
  4. Fabric共识接口

部分代码将以python缩进格式对源代码进行简化。

1. Fabric事件机制


fabric中几乎所有共识的驱动都是通过event来进行分发的。想要了解fabric共识时如何运作的,必须先了解fabric的分发机制。

fabric通过事件管理器用于来管理事件,一般需要管理多个事件并且按事件接收的先后顺序来处理。

事件管理器

type Manager interface {##
        Inject(Event)         // 允许事件管理器线程跳过队列的临时接口
        Queue() chan<- Event  // 返回一个类型为Event的channel,用于存储事件
        SetReceiver(Receiver) // 设置事件处理对象
        Start()               // 启动Manager线程
        Halt()                // 停止Manager线程
}
  1. Queue()接口返回一个类型为Event的channel,用于存储事件。Fabric用一个队列来存储事件,Queue()返回该channel对象。
  2. Start()方法会启动一个goroutine循环处理接收到的事件,通过channel能够保证只有接收到事件才会处理,不用每时每刻循环检查队列去执行事件,浪费CPU性能。
  3. SetReceiver(Recevier)需要设置事件管理器的实际处理者,Receiver接口需要实现ProcessEvent(Event) Event方法。
  4. Inject(Event)会在Queue()收到事件之后将事件发放给Receiver进行处理
  5. Halt()停止Manager线程

总结:Start() 检测 Queue() 中是否收到event; 收到event后Inject(Event)将event交给Receiver进行处理;不同的consensus实现不同的Receiver功能。


事件来源

这部分比较繁琐,可以跳过,直接进入总结

1.1 客户端通过调用fabric的RESTful接口/chaincode调用链代码或者部署链代码,fabric在处理请求的时候(fabric/core/rest/rest_api.go.ProcessChaincode)再通过JSON RPC向peer节点发起执行事务请求,hyperledger/fabric/core/devops.go的Deplopy、invokeOrQuery方法,会调用peer.ImplExecuteTransaction方法,如下面代码所示:

// hyperledger/fabric/core/peer/peer.go
func (p *Impl) ExecuteTransaction(transaction *pb.Transaction) (response *pb.Response) {
        if p.isValidator {
                response = p.sendTransactionsToLocalEngine(transaction)
        } else {
                peerAddresses := p.discHelper.GetRandomNodes(1)
                response = p.SendTransactionsToPeer(peerAddresses[0], transaction)
        }
        return response
}

1.2 peer节点在启动时,读取配置文件core.yaml的文件配置项peer.validator.enabled的值,peer根据这个值将自身设置为validator或者非validator。validator与非validator的区别在于:前者能够直接执行事务,而后者不直接执行事务而是通过gRPC的方式调用validator节点来执行事务(相当于转发事务),详细请参见SendTransactionsToPeer的实现,最终请求会定向到sendTransactionsToLocalEngine

1.3 sendTransactionsToLocalEngin方法会调用p.engine.ProcessTransactionMsgp.engine为结构体EngineImpl,这是Engine接口实例。Engine在启动peer时候创建,后面会详细说明。Engine这个接口用于管理peer网络的通讯和处理事务。EngineImpl的结构如下:

// hyperledger/fabric/consensus/helper/engine.go
type EngineImpl struct {
        consenter    consensus.Consenter // 每个共识插件都需要实现Consenter接口,包括RecvMsg方法和ExecutionConsumer接口的里函数(可以直接返回)
        helper       *Helper // 包含一些工具类方法,可以调用外部接口,比如获取网络信息,消息签名、验证,持久化一些对象等
        peerEndpoint *pb.PeerEndpoint
        consensusFan *util.MessageFan
}

1.4 ProcessTransactionMsg的代码如下,可以看见链代码查询事务直接执行不需要进行共识,因为读取某个peer节点的账本不会影响自身以及其他peer节点账本,所以不需要共识来同步。而链代码调用和部署事务会影响到单个peer节点账本和状态,所以会调用共识插件的RecvMsg函数来保证各个peer节点的账本和状态一致。

// hyperledger/fabric/consensus/helper/engine.go
func (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx *pb.Transaction) (response *pb.Response) {
        //TODO: Do we always verify security, or can we supply a flag on the invoke ot this functions so to bypass check for locally generated transactions?
        if tx.Type == pb.Transaction_CHAINCODE_QUERY {
           // ... 
           result, _, err := chaincode.Execute(cxt, chaincode.GetChain(chaincode.DefaultChain), tx) // 直接执行查询事务,不需要共识
           // ...
        } else {
           // ...
           err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)  // 使用共识插件保证各个peer节点账本和状态保持一致
           if err != nil {
                    response = &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(err.Error())}
           }
           // ...

1.5 RecvMsg函数在Fabric中有两个版本,一个是PBFT版本,一个是Noops版本。

  • NOOPS:用于开发和测试使用的插件,当一个validator节点收到一个事务消息时,会把消息转为共识消息,并会向所有节点广播共识消息。一般情况下,所有节点都会接收到这条共识消息,并执行消息里的事务。这是一种比较朴素的共识方式,一旦因为网络或者其他原因,有些节点没收到广播消息,就会存在状态不一致问题,所以不只用于开发和测试。
  • PBFT:PBFT算法实现。简单地说当网络里的错误失效节点数量f与总的节点数量N满足关系N>3f时,PBFT算法也能保证各个节点的状态保持一致。但是实现PBFT算法的需要满足以下的约束条件,所以在选择共识算法时要对系统进行全面评估,基于系统自身情况选择,不能盲目选择。

这里我们对PBFT版本进行分析,查看一个event是如何进入事件管理器如何进行处理:

// consensus/pbft/external.go
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) error {
    eer.manager.Queue() <- batchMessageEvent{
        msg:    ocMsg,
        sender: senderHandle,
    }
    return nil
}

// consensus/util/events/events.go
func (em *managerImpl) Queue() chan<- Event {
    return em.events
}

可以看出PBFT收到消息之后调用共识插件的RecvMsg函数将event通过Queue()加入到消息管理器队列(em.events)中,加入em.events的事件会有一个专门的线程对其进行处理,下面将会对这个过程进行讨论。


总结:

  1. 客户端调用或部署链代码时,调用peer.ImplExecuteTransaction方法;
  2. ExecuteTransaction进入到sendTransactionsToLocalEngine函数进而执行p.engine.ProcessTransactionMsg
  3. ProcessTransactionMsg调用共识插件的RecvMsg函数将event通过Queue()加入到消息管理器队列(em.events)中。

消息处理

Fabric的共识消息是通过eventLoop注射给对应处理函数的。

// consensus/util/events/events.go
func (em *managerImpl) Start() {
        go em.eventLoop()
}

// eventLoop is where the event thread loops, delivering events
func (em *managerImpl) eventLoop() {
    for {
        select {
            case next := <-em.events:
            em.Inject(next)
            case <-em.exit:
            logger.Debug("eventLoop told to exit")
            return
        }
    }
}

// Inject can only safely be called by the managerImpl thread itself
func (em *managerImpl) Inject(event Event) {
    if em.receiver != nil 
        SendEvent(em.receiver, event)
}

// SendEvent performs the event loop on a receiver to completion
func SendEvent(receiver Receiver, event Event) {
    next := event
    for {
        next = receiver.ProcessEvent(next)
        if next == nil 
            break
    }
}

// SetReceiver sets the destination for events
func (em *managerImpl) SetReceiver(receiver Receiver) {
    em.receiver = receiver
}
  1. RecvMsg函数将event通过Queue()加入到em.events
  2. eventLoop函数不断的从em.events里取出事件,通过Inject注射给对应的Receiver,注意,通过SendEvent注射给接收者的ProcessEvent方法
  3. SendEvent循环获取receiver.ProcessEvent对象,如果不为nil,则不断的调用receiver.ProcessEvent直到找到对应的消息处理函数
  4. SetReceiver函数修改receiver对象

关于ProcessEvent和Receiver的细节在可插拔共识中详解。


总结:RecvMsg函数通过Inject函数将event分发给receiver.ProcessEvent


2、可插拔共识


fabric/consensus/consensus.go对外提供共识模块的方法调用。
其中最核心也是每个共识必须实现的接口是Consenter

// fabric/consensus/consensus.go
type ExecutionConsumer interface {
    Executed(tag interface{})                                
    Committed(tag interface{}, target *pb.BlockchainInfo)    
    RolledBack(tag interface{})                              
    StateUpdated(tag interface{}, target *pb.BlockchainInfo)
}

type Consenter interface {
    RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error
    ExecutionConsumer
}
  1. RecvMsg处理共识引擎在收到客户端发来的event事件
  2. ExecutedCommittedRolledBackStateUpdated等方法来监听异步交易中的各个步骤执行情况,并进行处理。

Fabric的PBFT都是通过加入em.events之后通过eventLoop捕获事件之后再进行处理。所有event都是通过Receiver.ProcessEvent等待事件处理。

// fabric/consensus/pbft/external.go   
// RecvMsg is called by the stack when a new message is received
func (eer *externalEventReceiver) RecvMsg(ocMsg *pb.Message, senderHandle *pb.PeerID) 
    eer.manager.Queue() <- batchMessageEvent{msg:    ocMsg, sender: senderHandle,}

// Executed is called whenever Execute completes
func (eer *externalEventReceiver) Executed(tag interface{}) 
    eer.manager.Queue() <- executedEvent{tag}

// Committed is called whenever Commit completes
func (eer *externalEventReceiver) Committed(tag interface{}, target *pb.BlockchainInfo) 
    eer.manager.Queue() <- committedEvent{tag, target}

// RolledBack is called whenever a Rollback completes
func (eer *externalEventReceiver) RolledBack(tag interface{}) 
    eer.manager.Queue() <- rolledBackEvent{}

// StateUpdated is a signal from the stack that it has fast-forwarded its state
func (eer *...) StateUpdated(tag interface{}, target *pb.BlockchainInfo) 
    eer.manager.Queue() <- stateUpdatedEvent{ chkpt:  tag.(*...),target: target}

PBFT在ProcessEvent中会对各个事件进行处理

// allow the primary to send a batch when the timer expires
func (op *obcBatch) ProcessEvent(event events.Event) events.Event {
        logger.Debugf("Replica %d batch main thread looping", op.pbft.id)
        switch et := event.(type) {  
        case batchMessageEvent:
                ocMsg := et
                return op.processMessage(ocMsg.msg, ocMsg.sender)  // ocMsg的消息类型仍为链代码事务类型
        case executedEvent:
                op.stack.Commit(nil, et.tag.([]byte))
        case committedEvent:
                logger.Debugf("Replica %d received committedEvent", op.pbft.id)
                return execDoneEvent{}
        // ...       
        case stateUpdatedEvent:
                op.reqStore = newRequestStore()
                return op.pbft.ProcessEvent(event)
        default:
                return op.pbft.ProcessEvent(event)
}

pbft.ProcessEvent(event)定义了PBFT共识三个阶段的函数实现

func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {
    // ...
    case *RequestBatch:
          err = instance.recvRequestBatch(et)
    case *PrePrepare:
          err = instance.recvPrePrepare(et)
    case *Prepare:
          err = instance.recvPrepare(et)
    case *Commit:
          err = instance.recvCommit(et)
    case *Checkpoint:
          return instance.recvCheckpoint(et)
    case *ViewChange:
          return instance.recvViewChange(et)
    case *NewView:
          return instance.recvNewView(et)
    case *FetchRequestBatch:
          err = instance.recvFetchRequestBatch(et)
    case returnRequestBatchEvent:
          return instance.recvReturnRequestBatch(et)
    // ...
}

总结:

  1. Fabric所有共识必须实现Consenter接口
  2. 修改共识需要修改Receiver来对事件进行处理
  3. PBFT通过事件管理器将event交给receiver.ProcessEvent进行处理

这个时候肯定会有人问了,说好的可插拔共识机制呢,怎么全部变成PBFT实现了。别急,这不准备开始了:

  1. 自定义共识时,首先实现Consenter接口
  2. 其次调用SetReceiver修改Receiver
  3. 重写receiver.ProcessEvent
  4. 所有处理可以通过事件管理器将event交给receiver.ProcessEvent进行处理然后放回事件管理器。

没看明白?为什么实现Consenter接口就可以自定义共识了,为什么就可插拔了?这就对了,因为还没讲到共识引擎初始化呢!上面介绍了一部分可插拔的事件处理机制。后面介绍一下自定义可插拔共识,介绍如何Fabric实现可插拔。

3、Fabric共识引擎初始化


初始化共识引擎

PBFT的共识引擎在启动peer时自动初始化。具体调用过程为

// peer/node/start.go
func serve(args []string) error
    peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)

// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
    engineOnce.Do(func() 
        engine = new(EngineImpl)
        engine.helper = NewHelper(coord)
        engine.consenter = controller.NewConsenter(engine.helper) 
        engine.helper.setConsenter(engine.consenter)
        engine.peerEndpoint, err = coord.GetPeerEndpoint()
        engine.consensusFan = util.NewMessageFan()

        go func() 
            for msg := range engine.consensusFan.GetOutChannel() 
                engine.consenter.RecvMsg(msg.Msg, msg.Sender)
    )
    return engine, err
}
  1. GetEngine的作用是进行共识模块的初始化,同时启动一个goroutine等待消息进入。
  2. GetEngine初始化一个consenter和一个helper,并互相把一个句柄赋值给了对方。这样做的目的,就是为了可以让外部调用内部,内部可以调用外部。

注意:这里的consenter即为第二节的共识接口。

选择Consenter

engine.consenter是在consensus/controller/controller.go里选择

// consensus/helper/engine.go
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error)
    engine.consenter = controller.NewConsenter(engine.helper)

// consensus/controller/controller.go
func NewConsenter(stack consensus.Stack) consensus.Consenter {
    plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
    if plugin == "pbft" 
        return pbft.GetPlugin(stack)
    return noops.GetNoops(stack)
}

默认选择的是noops,如果需要添加自己编写的共识模块需要在这里自行添加判断。

NOOPS:用于开发和测试使用的插件,当一个validator节点收到一个事务消息时,会把消息转为共识消息,并会向所有节点广播共识消息。一般情况下,所有节点都会接收到这条共识消息,并执行消息里的事务。

初始化PBFT

如果选择了PBFT则会调用consensus/pbft/pbft.go进行初始化。
使用PBFTbatch模式启动时会调用newObcBatch进行PBFT算法初始化

// consensus/pbft/batch.go
func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {
    ...
    op.manager = events.NewManagerImpl()    
    op.manager.SetReceiver(op)
    etf := events.NewTimerFactoryImpl(op.manager)
    op.pbft = newPbftCore(id, config, op, etf)
    op.manager.Start()
    blockchainInfoBlob := stack.GetBlockchainInfoBlob()
    op.externalEventReceiver.manager = op.manager
    ...
    return op
}

newObcBatch主要做了这几项工作

  • 初始化了eventLoop的消息队列
  • 设置了消息的接收者,用来处理对应的消息
  • 创建监听消息超时的定时器
  • 初始化pbft算法
  • 启动消息队列,不断监听事件的到来并且分发给接收者处理

总结:可以看出共识引擎初始化时的几个步骤:

  1. 初始化eventLoop
  2. 设置Receiver
  3. 初始化共识算法
  4. 启动事件管理器

至此,之前所讲的所有东西全部启动了。回忆一下整个流程:

  1. 事件由客户端发起,经过一系列步骤执行到RecvMsg函数
  2. RecvMsg函数由Consenter定义,ConsenterNewConsenter内初始化,NewConsenter可插播
  3. Consenter提供共识模块的方法调用ExecutionConsumer接口, Consenter可插拔
  4. receiver.ProcessEvent负责处理事件管理器队列内的事件,Receiver可插播

总结: 所有与共识有关的接口全部可插拔,并且提供了相应的接口,自定义实现ConsenterReceiver,在初始化的时候 SetReceiver,即可实现自定义共识。

4. Fabric共识接口


consensus.go主要包含了共识算法插件内部对外暴露的接口和外部对内暴露的接口。总结如下:

Stack
Consenter
Manager
目录结构:

可以看到共识模块目录如下。

consensus
├── controller
├── executor
├── helper
│ └── persist
├── noops
├── pbft
└── util
└── events
目录含义如下

controller 用来控制Fabric选择什么样的共识算法,默认是noops。
executor 封装了消息队列中对交易的处理。
helper 对外提供接口调用和数据持久化接口。
noops 提供了如何编写Fabric共识算法的Demo。
pbft PBFT算法的具体实现。
util 实现了一个peer节点到共识算法的一个消息通道,和一个消息队列。

推荐阅读:

[1] Hyperledger Fabric中PBFT算法详解:
https://zhuanlan.zhihu.com/p/48899458
[2] Fabric源码分析-共识模块:
https://zhuanlan.zhihu.com/p/35255567

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,194评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,058评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,780评论 0 346
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,388评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,430评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,764评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,907评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,679评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,122评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,459评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,605评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,270评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,867评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,734评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,961评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,297评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,472评论 2 348

推荐阅读更多精彩内容