k8s调度器扩展

调度需求

Gang调度(pod group):部署一个工作负载时,满足一定数量的pod能调度成功,即可以正常调度,不然全部调度失败
-- 例如:模型训练时,算法要求至少运行8个任务并行作业才可以正常训练。当资源不足够时只能运行6个任务时,则希望这6个任务也不要运行(浪费计算资源)。
装箱调度
-- 减少对GPU卡的碎片化
-- 减少GPU卡的碎片化作用比较多:1. 资源的更高效利用,节约成本 2. vnlink模式的gpu,在同一台机器上有更高的性能
-- 用例:假设node的空闲的卡数为m,pod request的卡数为n,m - n的值越小,score值越高(m - n最小为0,n > m的情况会在filter直接过滤掉)
rescheduler
-- 用例:集群负载拓扑node1(podA(4卡), podB(3卡)),node2()。开始调度podC(4卡) -> 触发rescheduler,node1(podA(4卡), podC(4卡)) node2(podA(3卡))
app dependence:可以配置应用调度的依赖关系
-- 用例: 模型训练时,任务1运行生成结果A,任务2运行依赖A
节点池:可以将集群中的节点资源划分成不同的节点池
-- 用例:1. 多租户可以独占部分节点作为节点池
-- 用例:2. 节点的硬件质量、厂家,用途等不同,可以划分成不同的节点池。例如:旧机器划分成一个节点池,新机器划分到一个节点池,想要进行机器更换或者迁移时可以做到更高的可控性
自定义调度算法:用户在在filter、score等扩展点根据业务规则自定义算法
-- 用例:最优线路调度 slurm on k8s调度方案
-- 用户可以自定义扩展算法对节点进行调度,例如:slurm on k8s的方案,考虑到最好的性能,希望slurm cluster最好运行在一个leaf Pod网络内,且尽可能占用最少的leaf Pod数,对leaf Pod网络拓扑碎片化的影响最小
多调度策略:集群支持配置多调度策略,业务可以选择适合的调度策略进行调度
-- 例如: GPU服务集群(如slurm on k8s)调度时对pod之间最优路径有需求走自己的调度器,其他的CPU服务调度时走k8s默认调度器
-- 可以考虑在同一个调度器内根据pod标签进行逻辑划分,这样的设计功能比较耦合,对平台的高可用性也不太友好。
-- 可以考虑使用多调度器(调度器就是插件的组合),调度器名称的配置交给应用还是webhook实现或者其他方式需要考虑。
-- 同一个VC是同一类应用,走同一个调度策略?需要根据当前VC的业务场景确定调度策略
并发调度
-- 通过并发调度提效
-- k8s默认调度器是串行进行调度,支持配置多调度器,但是在资源紧张的情况下,由于不同的调度器之间的资源同步延迟,可能导致资源冲突
-- 对集群资源进行划分,走不通的调度器进行并发调度时则不会导致资源冲突

k8s默认调度器

image.png

核心流程:

Predicates: 预选,按照调度策略,从当前集群的所有节点中,“过滤”出一系列符合条件的节点。这些节点,都是可以运行待调度 Pod 的宿主机
Priorities: 优选,对预选完成的节点打分。这里打分的范围是 0-10 分,得分最高的节点就是最后被 Pod 绑定的最佳节点

k8s默认调度器扩展

一、扩展点介绍

k8s默认调度器是通过scheduler framework实现的,scheduler framework预留了一些扩展点,可以通过scheduler framework扩展点对调度器进行扩展


image.png

二、对扩展点进行扩展

package networkspeed

import (
    "context"
    "errors"
    "fmt"
    "os"
    "slices"
    "strconv"

    v1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/klog/v2"
    "k8s.io/kubernetes/pkg/scheduler/framework"
    frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)

const PluginName = "NetworkSpeed"

type Config struct {
    Selector  map[string]string `json:"selector"`
    Namespace string            `json:"namespace"`
    Timeout   int64             `json:"timeout"`
    Port      int               `json:"port"`
}

type NetworkSpeedPlugin struct {
    config *Config
    handle framework.Handle
}

func (n *NetworkSpeedPlugin) Name() string {
    return PluginName
}

func (n *NetworkSpeedPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
    target, module, ok := getProbeConfig(p)
    if !ok {
        return 0, nil
    }

    duration, err := n.doProbe(ctx, nodeName, target, module)
    if err != nil {
        klog.Error("doProbe error", "err", err.Error())
        return 0, nil
    }

    klog.InfoS(
        "doProbe",
        "node", nodeName,
        "target", target,
        "pod", p.Namespace+"/"+p.Name,
        "duration", duration,
    )
    annos := p.GetAnnotations()
    klog.Infof("annos: %v", annos)
    k := annos[nodeName]
    klog.Infof("node %s k: %s", nodeName, k)

    return duration.Microseconds(), nil
}

func (n *NetworkSpeedPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
    if _, err := n.getProber(nodeInfo.Node().Name); err != nil {
        klog.ErrorS(
            err, "Filter error",
            "node", nodeInfo.Node().Name,
            "pod", pod.Namespace+"/"+pod.Name,
        )
        return framework.NewStatus(framework.Unschedulable)
    }

    annotations := pod.GetAnnotations()
    annotations[nodeInfo.Node().GetName()] = "1" // k
    pod.SetAnnotations(annotations)
    return nil
}

func (n *NetworkSpeedPlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
    // 1. 获取pod group的实例个数
    podAnnotations := pod.GetAnnotations()
    podGroupReplicas, _ := strconv.Atoi(podAnnotations["replicas"])
    // 2. 遍历nodes, 计算node所属leafpod node的个数
    nodeLeafPodCount := make(map[string]int)
    for _, node := range nodes {
        nodeAnnotations := node.GetAnnotations()
        if v, ok := nodeAnnotations["leafpod"]; ok {
            nodeLeafPodCount[v]++
        }
    }
    klog.Infof("nodeLeafPodCount: %v", nodeLeafPodCount)
    // 3. 遍历node, 计算k值
    for _, node := range nodes {
        nodeAnnotations := node.GetAnnotations()
        if v, ok := nodeAnnotations["leafpod"]; ok {
            klog.Infof("node %s leafpod count: %d, k: %d", node.GetName(), nodeLeafPodCount[v], podGroupReplicas-nodeLeafPodCount[v])
            podAnnotations[node.GetName()] = fmt.Sprintf("%d", nodeLeafPodCount[v]-podGroupReplicas)
        }
    }
    // PreScore的时候,判断pod group实例的个数m,nodeInfo所属POD组的node的个数n,k = n -m, k越小则约倾向于调度到该node,k < 0, 则万不得已不要调度到该节点
    // 将node对应k值记录到pod的annotation中
    pod.SetAnnotations(podAnnotations)
    return nil
}

// 归一化分数
func (n *NetworkSpeedPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
    slices.SortFunc(scores, func(a, b framework.NodeScore) int {
        return int(a.Score - b.Score)
    })

    for i, score := range scores {
        if score.Score == 0 {
            continue
        }
        scores[i].Score = int64((len(scores) - i) * (100 / len(scores)))
    }

    klog.InfoS("NormalizeScore", "scores", scores)

    return nil
}

func (n *NetworkSpeedPlugin) ScoreExtensions() framework.ScoreExtensions {
    return n
}

func New(ctx context.Context, configuration runtime.Object, handle framework.Handle) (framework.Plugin, error) {
    var config Config
    if err := frameworkruntime.DecodeInto(configuration, &config); err != nil {
        return nil, err
    }

    if config.Selector == nil {
        return nil, errors.New("prober selector is nil")
    }

    if config.Timeout == 0 {
        config.Timeout = 3000
    }

    if config.Namespace == "" {
        data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
        if err != nil {
            return nil, fmt.Errorf("get namesapce error: %w", err)
        }

        config.Namespace = string(data)
    }

    if config.Port == 0 {
        config.Port = 9115
    }

    klog.InfoS("scheduler config", "config", config)

    return &NetworkSpeedPlugin{
        config: &config,
        handle: handle,
    }, nil
}

启动调度器

package main

import (
    "k8s-network-scheduler/plugins/networkspeed"
    "k8s.io/component-base/cli"
    "k8s.io/kubernetes/cmd/kube-scheduler/app"
    "os"
)

func main() {
    cmd := app.NewSchedulerCommand(
        app.WithPlugin(networkspeed.PluginName, networkspeed.New),
    )

    code := cli.Run(cmd)
    os.Exit(code)
}

三、调度器配置

apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
  leaderElect: false
profiles:
  - schedulerName: network-scheduler
    plugins:
      multiPoint:
        enabled:
          - name: NetworkSpeed
    pluginConfig:
      - name: NetworkSpeed
        args:
          selector:
            // 通过标签可以找到对应的插件应用程序
            app.kubernetes.io/instance: network-scheduler
            app.kubernetes.io/name: blackbox-exporter

配置参数介绍(可以配置不同的调度器以及插件权重)

https://kubernetes.io/zh-cn/docs/reference/scheduling/config/#multiple-profiles
https://blog.csdn.net/qq_24433609/article/details/130149592

scheduler-plugins调研

scheduler-plugins 就是通过scheduler framework扩展的一系列插件,如:coscheduling(pod group),TopologicalSort (app group,会在QueueSort扩展点发挥作用),Noderesourcetopology(可以配置MostAllocated、BalancedAllocation、LeastAllocated、LeastNUMANodes)

部署

scheduler-plugins/manifests/install at master · kubernetes-sigs/scheduler-plugins (github.com)
Installation | Scheduler Plugins (k8s.io)

扩展

编写扩展插件,在启动文件里scheduler-plugins/cmd/scheduler/main.go引入扩展插件,重新编译

package main

import (
    "os"

    "k8s.io/component-base/cli"
    _ "k8s.io/component-base/metrics/prometheus/clientgo" // for rest client metric registration
    _ "k8s.io/component-base/metrics/prometheus/version"  // for version metric registration
    "k8s.io/kubernetes/cmd/kube-scheduler/app"

    "sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
    "sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
    "sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
    "sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
    "sigs.k8s.io/scheduler-plugins/pkg/noderesources"
    "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology"
    "sigs.k8s.io/scheduler-plugins/pkg/podstate"
    "sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration"
    "sigs.k8s.io/scheduler-plugins/pkg/qos"
    "sigs.k8s.io/scheduler-plugins/pkg/sysched"
    "sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
    "sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment"
    "sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"

    // Ensure scheme package is initialized.
    _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
)

func main() {
    // Register custom plugins to the scheduler framework.
    // Later they can consist of scheduler profile(s) and hence
    // used by various kinds of workloads.
    command := app.NewSchedulerCommand(
        app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
        app.WithPlugin(coscheduling.Name, coscheduling.New),
        app.WithPlugin(loadvariationriskbalancing.Name, loadvariationriskbalancing.New),
        app.WithPlugin(networkoverhead.Name, networkoverhead.New),
        app.WithPlugin(topologicalsort.Name, topologicalsort.New),
        app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
        app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
        app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
        app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
        app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
        app.WithPlugin(sysched.Name, sysched.New),
        // Sample plugins below.
        // app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
        app.WithPlugin(podstate.Name, podstate.New),
        app.WithPlugin(qos.Name, qos.New),
    )

    code := cli.Run(command)
    os.Exit(code)
}

volcano调研

架构

image.png

可以看出,Volcano由scheduler、controllermanager、admission和vcctl组成:
scheduler通过一系列的action和plugin调度Job,并为它找到一个最适合的节点。与k8s本身的调度器相比,Volcano支持针对Job的多种调度算法。
controllermanager管理CRD资源的生命周期。它主要由Queue ControllerManager、PodGroupControllerManager 、 VCJob ControllerManager构成。
admission负责对CRD API资源进行校验。
vcctl是Volcano的命令行客户端工具。

部署

调度流程

enqueue


image.png

image.png

proportion 插件:Queue相关的的插件
sla插件(enqueue和allocate action)


image.png

allocate
image.png

image.png

image.png

image.png

image.png

rescheduler


image.png

metric
image.png

各调度器对比

功能项 \ 调度器 volcano scheduler-plugins 自定义扩展点 默认调度器
Gang调度 支持 支持 不支持 不支持
装箱调度 支持 支持 不支持 不支持
rescheduler 支持 不支持 不支持 不支持
app dependence 支持(vcjob支持,没有预留queue的扩展点) 支持(通过queueSort实现) 不支持 不支持
节点池 不支持 支持、可以通过多调度器+标签实现 不支持 不支持
自定义调度算法 支持 通过http extender方式对插件进行扩展(多语言友好) 扩展自定义action和plugin 支持 需要重新编译scheduler plugins 多语言支持不太友好(需要自定义接口转发,或者sdk库实现) 支持 不支持
节点拓扑(MostAllocated、BalancedAllocation、LeastAllocated) 支持 支持 不支持 不支持
多调度策略 支持不太好 -- volcano scheduler configmap配置全局调度的plugin -- plugin的参数可以配置在vcjob作业的annotation中单独生效 -- 支持配置muti scheduler -- vcjob中支持配置个性化的plugin(开启ssh免密,svc等) 支持,可以配置多调度器 支持,配置多调度器 不支持
并发调度 支持,可以通过定义Queue对资源大小,优先级限制防止资源冲突 支持,不建议做(在资源紧张情况下可能存在资源冲突问题。配合节点池可以考虑做) 支持,不建议做(在资源紧张情况下可能存在资源冲突问题) 不支持
技术生态 成熟 不太成熟,资料较少 成熟 成熟
云原生 Accepted to CNCF on April 9, 2020. Incubating Projects kubernetes sig小组发布,符合scheduler framework 原生, scheduler framework方式扩展 原生
其他 vcjob支持丰富的调度策略 -- 1. 配置超时时间 -- 2. 设置一级资源、二级资源 -- 3. 支持对接kubeflow、MPI、spark等主流的AI、HPC、大数据计算平台 -- 4. 支持rescheduler 支持多队列资源分配 volcano device plugin支持共享gpu显存 volcano metric
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容