1. 前言
转载请说明原文出处, 尊重他人劳动成果!
本文将分析
kubernetes/pkg/scheduler/internal
中的文件, 其中包括node_tree.go
和cache.go
源码位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/scheduler/internal/cache
分支: tming-v1.13 (基于v1.13版本)
2. node_tree
2.1 nodeArray
type nodeArray struct {
// 所有的节点
nodes []string
// 遍历nodes时的下标位置
lastIndex int
}
nodeArray
其实就是一个存节点的数组, 并且留了一个下标在next()
中体现
//取nodeArray中节点的下一个
func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
klog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
}
如果遍历到最后一个则返回空 并且返回该数组已经遍历结束. 因此需要有一个
lastIndex
来表示下一个要遍历的节点的位置.
2.2 NodeTree
type NodeTree struct {
// key 是一个zone value是一个nodeArray(一个存有该zones下所有的array)
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
// 所有的zones
zones []string // a list of all the zones in the tree (keys)
// 遍历zones的时候的下标所在位置
zoneIndex int
// 节点的个数
NumNodes int
mu sync.RWMutex
}
可以看到
NodeTree
本质上就是一个Map
来存储每个zone
下有哪些节点.
zones
: 存着所有的zones
, 与tree
的key
组成的数组是一样的.
zoneIndex
: 与nodeArray
中的lastIndex
类似, 它存着zones
数组中要遍历的下一个zone
的下标.
接下来可以看看
NodeTree
的AddNode
方法进而来理解该数据结构.
2.2.1 AddNode
func (nt *NodeTree) AddNode(n *v1.Node) {
nt.mu.Lock()
defer nt.mu.Unlock()
nt.addNode(n)
}
/**
1. 获得该节点所在的zone
2. 如果该zone不存在 添加到zones 和 tree中
3. 如果该zone存在
3.1 检查该node是不是已经在tree中nodearray中
3.2 如存在则直接返回 不存在则添加
*/
func (nt *NodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
if nodeName == n.Name {
klog.Warningf("node %v already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
}
klog.V(5).Infof("Added node %v in group %v to NodeTree", n.Name, zone)
nt.NumNodes++
}
很常规的添加节点方法, 还有删除节点, 增加删除
zone
方法就不介绍了, 基本上就是Map
的一些操作.
2.2.3 Next
// 从头开始 (因为已经整个Map遍历完了)
func (nt *NodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}
// Next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
// Next() 返回下一个节点
// 遍历整个zones中的每个node
// 说白了就是把整个Map结构想像成一个List 然后遍历它
func (nt *NodeTree) Next() string {
nt.mu.Lock()
defer nt.mu.Unlock()
if len(nt.zones) == 0 {
return ""
}
numExhaustedZones := 0
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
numExhaustedZones++
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
}
} else {
return nodeName
}
}
}
2.2.4 总结
可以看到
NodeTree
其实就是一个Map
结构, 存储着所有zone
中所有的节点. 类似于java
中Map<String, List<String>>
的数据结构.
但是为什么又有一些别的属性呢, 主要是为了实现
Next()
方法, 该方法相当于从Map
中一个一个遍历的取节点, 所以才有zoneIndex
,lastIndex
等属性, 进而就有了nodeArray
结构体.
所以整个
NodeTree
除了维护自身数据结构的增删改查等方法以外就是该Next
方法供外界调用.
3. cache
整个
schedulerCache
数据结构的设计是为了实现该Cache
接口.
type Cache interface {
// 将该pod设置为assumed 状态
AssumePod(pod *v1.Pod) error
// 设置该Pod bindingFinished=true
FinishBinding(pod *v1.Pod) error
// 从cache中删除该Pod(该pod必须为assumed状态)
ForgetPod(pod *v1.Pod) error
// AddPod either confirms a pod if it's assumed, or adds it back if it's expired.
// If added back, the pod's information would be added again.
AddPod(pod *v1.Pod) error
// 只能从Added 状态调用
UpdatePod(oldPod, newPod *v1.Pod) error
// 只能从Added 状态调用
RemovePod(pod *v1.Pod) error
// 从podState中获得一个pod
GetPod(pod *v1.Pod) (*v1.Pod, error)
// 判断该pod是否为assumed 状态
IsAssumedPod(pod *v1.Pod) (bool, error)
// 添加一个节点 该节点所有信息会保存起来
AddNode(node *v1.Node) error
// 更新节点
UpdateNode(oldNode, newNode *v1.Node) error
// 删除节点
RemoveNode(node *v1.Node) error
// UpdateNodeNameToInfoMap updates the passed infoMap to the current contents of Cache.
// The node info contains aggregated information of pods scheduled (including assumed to be)
// on this node.
UpdateNodeNameToInfoMap(infoMap map[string]*schedulercache.NodeInfo) error
// 从nodes中返回所有pod
List(labels.Selector) ([]*v1.Pod, error)
// 从nodes中返回所有符合条件的pod
FilteredList(filter algorithm.PodFilter, selector labels.Selector) ([]*v1.Pod, error)
// 备份assumed pods 和 节点
Snapshot() *Snapshot
// 返回nodetree
NodeTree() *NodeTree
}
type Snapshot struct {
AssumedPods map[string]bool
Nodes map[string]*schedulercache.NodeInfo
}
在该数据结构的设计中,
pod
有5个状态Initial
,Assumed
,Added
,Expired
和Deleted
.
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// ^ + + | +
// | | | | |
// | | | Add | | Remove
// | | | | |
// | | | + |
// +----------------+ +-----------> Expired +----> Deleted
// Forget Expire
关系如上
Initial
状态可以通过调用AssumePod
方法进而成为Assumed
状态.
Initial
状态可以通过调用AddPod
方法进而成为Added
状态.
Assumed
状态可以通过调用AddPod
方法进而成为Added
状态.
Assumed
状态可以通过调用ForgetPod
方法进而成为Initial
状态.
Assumed
状态可以通过调用ExpirePod
方法(过期时间到了)进而成为Expired
状态.
Expired
状态可以通过调用AddPod
方法进而成为Added
状态.
Added
状态可以通过调用UpdatePod
方法进而成为Added
状态.
Added
状态可以通过调用RemovePod
方法进而成为Deleted
状态.
其中
Deleted
,Initial
和Expired
实际上该pod
在cache
中是不存在的.
另外有一段话说得挺清楚的, 自行理解吧.
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.
3.1 schedulerCache
3.1.1 结构
整个结构有三个结构体, 包括
schedulerCache
,podState
和imageState
.
schedulerCache
中主要有assumedPods
用一个map
结构存储该pod
是否是assumed
状态.podStates
用map
结构存储该pod
的状态, 其中包括是否已经完成binding
.nodes
用一个map
存储该节点的总体信息以及有哪些pod
.imageStates
用一个map
存储着该image
的信息, 其中包括哪些节点有该image
.
type schedulerCache struct {
stop <-chan struct{}
// ttl是assume pod 过期的时间
ttl time.Duration
// period是每隔period调用清理过期的assumed pod
period time.Duration
// This mutex guards all fields within this cache struct.
mu sync.RWMutex
// a set of assumed pod keys.
// The key could further be used to get an entry in podStates.
// 已经assumed pod
assumedPods map[string]bool
// a map from pod key to podState.
// 存着一些pod的状态
podStates map[string]*podState
// 每个节点的信息
nodes map[string]*schedulercache.NodeInfo
nodeTree *NodeTree
// A map from image name to its imageState.
// 每个image的信息
imageStates map[string]*imageState
}
type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
// assumedPod过期时间
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
// bindingFinished为true的时候 过期才会起作用
bindingFinished bool
}
type imageState struct {
// Size of the image
// iamge 大小
size int64
// A set of node names for nodes having this image present
// 拥有该image的所有节点
nodes sets.String
}
3.1.2 AssumePod
可以看到调用
assumePod
就是将该Pod
存入到此三个数据结构中PodState
,assumedPods
和nodes
中.
// 1. 获得key
// 2. 根据podStates来检查该pod是否已经存在 如果存在则返回错误, 因为一个pod不能assume两次
// 3. 调用addPod添加该pod
// 4. 存到podState中 此时(deadline和bindingFinished没有被赋值)
// 5. 存到assumedPods中 表明该pod处于assume状态
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
}
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
// Assumes that lock is already acquired.
// 1. 从nodes中得到NodeInfo
// 2. 然后将该Pod加入到NodeInfo中
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.AddPod(pod)
}
类似的相反操作就是
ForgetPod
是从schedulerCache
中完全删除该Pod
.
3.1.3 New 和 run方法
var (
cleanAssumedPeriod = 1 * time.Second
)
// New returns a Cache implementation.
// It automatically starts a go routine that manages expiration of assumed pods.
// "ttl" is how long the assumed pod will get expired.
// "stop" is the channel that would close the background goroutine.
func New(ttl time.Duration, stop <-chan struct{}) Cache {
cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
cache.run()
return cache
}
func (cache *schedulerCache) run() {
go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
}
func (cache *schedulerCache) cleanupExpiredAssumedPods() {
cache.cleanupAssumedPods(time.Now())
}
// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
cache.mu.Lock()
defer cache.mu.Unlock()
// 从assumed状态的pods中遍历
for key := range cache.assumedPods {
ps, ok := cache.podStates[key]
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
// 如果没有完成binding 跳过
if !ps.bindingFinished {
klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
// 如果过期时间已经到了 则调用expirePod方法
if now.After(*ps.deadline) {
klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {
klog.Errorf("ExpirePod failed for %s: %v", key, err)
}
}
}
}
// 1. 调用removePod删除该节点 (从nodes中删除)
// 2. 从assumedPods中删除
// 3. 从podStates中删除
// 整个已经从schedulerCache中完全删除
func (cache *schedulerCache) expirePod(key string, ps *podState) error {
if err := cache.removePod(ps.pod); err != nil {
return err
}
delete(cache.assumedPods, key)
delete(cache.podStates, key)
return nil
}
可以看到后台会启动一个
goroutine
每隔Period
时间将那些过期的assumed Pod
设置过期状态, 说白了就是从cache
完全删除.
3.1.4 AddPod
有三个状态可以调用
AddPod
方法, 分别是Initial
,expired
和Assumed
状态.
func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
key, err := schedulercache.GetPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
currState, ok := cache.podStates[key]
switch {
// assumed pod -> 过来
case ok && cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
// Clean this up.
// 更换nodes的信息
cache.removePod(currState.pod)
cache.addPod(pod)
}
// 删除assumed 状态 变为added状态
delete(cache.assumedPods, key)
// deadline为nil bindingFinished=false
cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok:
// 可以从expired状态/也可以是initial状态 -> 过来
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod %v was already in added state", key)
}
return nil
}
调用完之后该
Pod
已经保存到nodes
中.
assumed pod -> AddPod
: 表明该Pod
已经在它所在的节点上已经运行了, 所以此时由assumed Pod
转为Added
状态了.
expired pod -> AddPod
: 由于某种原因可能是网络原因, 可能会错失一些Event
, 该过程中没有调用AddPod
并且该Pod
已经过期所以在cache
已经不存在了, 所以重新加到nodes
和podStates
中.
UpdatePod 和 RemovePod 就不多说了, 是从
Added
状态中才可以调用, 说白了就是更新一下nodes
中信息.
3.1.5 AddNode
func (cache *schedulerCache) AddNode(node *v1.Node) error {
cache.mu.Lock()
defer cache.mu.Unlock()
n, ok := cache.nodes[node.Name]
if !ok {
n = schedulercache.NewNodeInfo()
cache.nodes[node.Name] = n
} else {
cache.removeNodeImageStates(n.Node())
}
// 添加到nodetree中
cache.nodeTree.AddNode(node)
// 设置imagestates 和 nodeinfo
cache.addNodeImageStates(node, n)
return n.SetNode(node)
}
func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *schedulercache.NodeInfo) {
newSum := make(map[string]*schedulercache.ImageStateSummary)
// 遍历该节点下所有的image
for _, image := range node.Status.Images {
for _, name := range image.Names {
// update the entry in imageStates
state, ok := cache.imageStates[name]
if !ok {
state = &imageState{
size: image.SizeBytes,
nodes: sets.NewString(node.Name),
}
cache.imageStates[name] = state
} else {
// 把该节点添加到此image的imageStates中
state.nodes.Insert(node.Name)
}
// create the imageStateSummary for this image
if _, ok := newSum[name]; !ok {
newSum[name] = cache.createImageStateSummary(state)
}
}
}
// 把该node下的ImageStateSummary放到该node下
nodeInfo.SetImageStates(newSum)
}
就是对于
node
节点以及其image
的维护, 包括UpdateNode
,RemoveNode
.
3.1.6 List 和 FilteredList
List 和 FilteredList 注意是将
nodes
中所有节点中符合条件的pods
返回
Snapshot 是将所有nodes
和assumed pods
备份
3.2 总结
该
schedulerCache
其实是个工具数据结构, 在kube-scheduler
调度的时候会用到, 在后续具体分析调度的时候会有涉及到具体如何使用.