引言
记得一次面试,被问到通过informer做垃圾回收时,队列中的消息还没处理完,informer的实例发生了重启,如何将丢失的任务处理完
informer流程图
运行流程
例如现在创建一个 pods,kubelet 中的 controller 是如何运行的(K8s 中源码中也大量使用 client-go,主要是大量的 controller)
初始化并启动 informer,informer 启动会初始化并启动 reflector,reflector 从 kube-apiserver list 所有 pod 资源,并 sync 到 Deltafifo 中。
Deltafifo 存有全部 pod 资源,informer 通过 pop 函数消费 deltafifo 事件并存储到 indexer 中。
如果需要调用 pod 资源,那么可以直接从 indexer 中获取
informer 初始化完成后,Reflector 开始 Watch Pod 相关的事件
如果创建一个 pod,1. 那么 Reflector 会监听到这个事件,然后将这个事件发送到 DeltaFIFO 中
informer pop 消费改 ADD 事件,并将该 pod 存储到 indexer
informer 处理器函数同样拿到该 ADD 事件去处理该事件,通过workqueue获取到事件的key,再通过indexer获取到真正操作的对象
reflector 会周期性将 indexer 数据同步到 Deltafifo,防止一些事件处理失败,重新处理。
由上文的运行流程可知,对于没有来的及处理Add和Update事件,在informer重启时,reflector会从kube-apiserver list所有的事件资源,sync到Deltafifo,并进一步触发Add事件。业务可以根据实际资源的状况进行对账,是进行Add还是Update。
对于没有处理的Delete事件如何处理呢?实际上,k8s资源可能已经清理了,但是实际的资源还有残留,这时候需要在启动informer时,将本地全量资源入到indexer,进一步依赖reflector的周期同步,将资源入到Deltafifio, informer检查到k8s实际上该资源已经不存在了会触发Delete事件。
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
// 本地数据重入index
indexer.Add(&v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "mypod",
Namespace: v1.NamespaceDefault,
},
})
如上即是,informer启动过程的对账逻辑,可谓精妙。
在删除时,还需要注意:要是k8s资源被清理了,本地资源没有直接的对应关系(无法通过id唯一对应),cr和本地资源的对应关系维护在cr的spec中,这时需要在创建cr时添加finallize进行延迟延迟,只有进行了实际的清理才去移除finallize里的资源项。这样在infromer重启时,还会触发对应资源的add事件(状态为deleting), 调协函数判断是deletetimestep不为空,继续执行删除逻辑。
上文因为要向Index塞入对象,涉及逆向操作,要是不懂其中的同步逻辑会很困惑。informer重启后,针对需要清理的本地对象还有一个处理思路:根据本地资源(入数据库记录),主动检查k8s资源是否存在或者是否删除中(由finallize),不存在或者删除中,即做清理操作。看起来没啥问题,但是这样操作则利用不到队列基础设施的优势,在清理出错时如何处理?(有调协器的话可以重新入队)只能不断重试。
参考:
client-go 架构:https://www.sfernetes.com/client-go-arch/#workqueue
workqueue示例:https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go