引言
使用operator做资源控制,在用户将crd资源删除然后通过informer机制做资源清理时,队列中的消息还没处理完,但是运行实例发生了重启,队列中的消息还在不在,会不会有资源残留
informer流程图
运行流程
初始化 informer时,启动reflector,reflector 从 kube-apiserver list 所有 crd 资源,创建crd对象的create对象到 Deltafifo队列中,根据Deltafilfo对象crd资源会同步进本地缓存中,要是配置了对应的handler函数,也会触发handler的Add函数(若是需要对账就在这时做)。reflector后续监听crd事件创建对应的Delta对象进入Deltafifo 队列中。根据Delta对象更新本地缓存,并触发对应handler函数
DeltaFIFO 队列中的元素是 Delta 结构体,它封装了 DeltaType 和 Object,其中 DeltaType 表示针对资源对象的操作类型,如 Added(添加)、Updated(更新)、Deleted(删除)、Replaced(替换)、Sync(同步)等,Object 则是具体的资源对象。
DeltaFIFO 结构体实现了 client-go 中的 Queue 接口,该接口在 Store 接口的基础上扩展了 Pop、AddIfNotPresent、HasSynced、Close 等方法。
informer对DeltaFIFO队列进行了封装,暴露出不同类型事件的处理函数,用户可以进行配置。
接下来一般的操作可以创建一个FIFO全局队列,在处理函数中对资源进行过滤将Deltafifo 队列中的事件消费进入FIFO全局队列中。用户的流程根据业务逻辑对FIFO全局队列进行一致性消费即可。
reflector 会周期性将 indexer 数据同步到 Deltafifo,防止一些事件处理失败,重新处理。
由上文的运行流程可知,对于没有来的及处理Add和Update事件,在informer重启时,reflector会从kube-apiserver list所有的事件资源,sync到Deltafifo,并进一步触发Add事件。业务可以根据实际资源的状况进行对账,是进行Add还是Update。
对于没有处理的Delete事件如何处理呢?如: k8s资源可能已经清理了,但是实际的资源还有残留。这样Deltafifo中也将同步不到对应残留数据的事件。这时,我们就要借助k8s另一个特性:延迟删除finallize。这时, 在启动informer时,没来的及删除的资源还在,我们会在create事件里看到状态为deleting,deletetimestep不为空的对象,我们需要将对应的对象本地资源进行清理,然后清理掉延迟删除标识即可。或者想要做的更优雅一点,可以将deleting状态的create对象,再创建一个delete的Delta对象扔到Deltafifo队列中,即可再一次触发delete的handler函数。
若是除了k8s,本地数据库也记录了对象的操作,例如业务中删除某一个资源,数据库表状态已置位删除中,也可在初始化时直接创建对应的k8s资源对象赛道本地缓存中,然后依赖informer的定期同步对象会进入到Deltafifo中,进一步触发资源对账。
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
// 本地数据重入index
indexer.Add(&v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "mypod",
Namespace: v1.NamespaceDefault,
},
})
如上即是,informer启动过程的对账逻辑,可谓精妙。
在删除时,还需要注意:要是k8s资源被清理了,本地资源没有直接的对应关系(无法通过id唯一对应),cr和本地资源的对应关系维护在cr的spec中,这时需要在创建cr时添加finallize进行延迟删除,只有进行了实际的清理才去移除finallize里的资源项。这样在infromer重启时,还会触发对应资源的add事件(状态为deleting), 调协函数判断是deletetimestep不为空,继续执行删除逻辑。
上文因为要向Index塞入对象,涉及逆向操作,要是不懂其中的同步逻辑会很困惑。informer重启后,针对需要清理的本地对象还有一个处理思路:根据本地资源(入数据库记录),主动检查k8s资源是否存在或者是否删除中(由finallize),不存在或者删除中,即做清理操作。看起来没啥问题,但是这样操作则利用不到队列基础设施的优势,在清理出错时如何处理?(有调协器的话可以重新入队)只能不断重试。
参考:
client-go 架构:https://www.sfernetes.com/client-go-arch/#workqueue
workqueue示例:https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go