调度需求
Gang调度(pod group):部署一个工作负载时,满足一定数量的pod能调度成功,即可以正常调度,不然全部调度失败
-- 例如:模型训练时,算法要求至少运行8个任务并行作业才可以正常训练。当资源不足够时只能运行6个任务时,则希望这6个任务也不要运行(浪费计算资源)。
装箱调度
-- 减少对GPU卡的碎片化
-- 减少GPU卡的碎片化作用比较多:1. 资源的更高效利用,节约成本 2. vnlink模式的gpu,在同一台机器上有更高的性能
-- 用例:假设node的空闲的卡数为m,pod request的卡数为n,m - n的值越小,score值越高(m - n最小为0,n > m的情况会在filter直接过滤掉)
rescheduler
-- 用例:集群负载拓扑node1(podA(4卡), podB(3卡)),node2()。开始调度podC(4卡) -> 触发rescheduler,node1(podA(4卡), podC(4卡)) node2(podA(3卡))
app dependence:可以配置应用调度的依赖关系
-- 用例: 模型训练时,任务1运行生成结果A,任务2运行依赖A
节点池:可以将集群中的节点资源划分成不同的节点池
-- 用例:1. 多租户可以独占部分节点作为节点池
-- 用例:2. 节点的硬件质量、厂家,用途等不同,可以划分成不同的节点池。例如:旧机器划分成一个节点池,新机器划分到一个节点池,想要进行机器更换或者迁移时可以做到更高的可控性
自定义调度算法:用户在在filter、score等扩展点根据业务规则自定义算法
-- 用例:最优线路调度 slurm on k8s调度方案
-- 用户可以自定义扩展算法对节点进行调度,例如:slurm on k8s的方案,考虑到最好的性能,希望slurm cluster最好运行在一个leaf Pod网络内,且尽可能占用最少的leaf Pod数,对leaf Pod网络拓扑碎片化的影响最小
多调度策略:集群支持配置多调度策略,业务可以选择适合的调度策略进行调度
-- 例如: GPU服务集群(如slurm on k8s)调度时对pod之间最优路径有需求走自己的调度器,其他的CPU服务调度时走k8s默认调度器
-- 可以考虑在同一个调度器内根据pod标签进行逻辑划分,这样的设计功能比较耦合,对平台的高可用性也不太友好。
-- 可以考虑使用多调度器(调度器就是插件的组合),调度器名称的配置交给应用还是webhook实现或者其他方式需要考虑。
-- 同一个VC是同一类应用,走同一个调度策略?需要根据当前VC的业务场景确定调度策略
并发调度
-- 通过并发调度提效
-- k8s默认调度器是串行进行调度,支持配置多调度器,但是在资源紧张的情况下,由于不同的调度器之间的资源同步延迟,可能导致资源冲突
-- 对集群资源进行划分,走不通的调度器进行并发调度时则不会导致资源冲突
k8s默认调度器
核心流程:
Predicates: 预选,按照调度策略,从当前集群的所有节点中,“过滤”出一系列符合条件的节点。这些节点,都是可以运行待调度 Pod 的宿主机
Priorities: 优选,对预选完成的节点打分。这里打分的范围是 0-10 分,得分最高的节点就是最后被 Pod 绑定的最佳节点
k8s默认调度器扩展
一、扩展点介绍
k8s默认调度器是通过scheduler framework实现的,scheduler framework预留了一些扩展点,可以通过scheduler framework扩展点对调度器进行扩展
二、对扩展点进行扩展
package networkspeed
import (
"context"
"errors"
"fmt"
"os"
"slices"
"strconv"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
)
const PluginName = "NetworkSpeed"
type Config struct {
Selector map[string]string `json:"selector"`
Namespace string `json:"namespace"`
Timeout int64 `json:"timeout"`
Port int `json:"port"`
}
type NetworkSpeedPlugin struct {
config *Config
handle framework.Handle
}
func (n *NetworkSpeedPlugin) Name() string {
return PluginName
}
func (n *NetworkSpeedPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
target, module, ok := getProbeConfig(p)
if !ok {
return 0, nil
}
duration, err := n.doProbe(ctx, nodeName, target, module)
if err != nil {
klog.Error("doProbe error", "err", err.Error())
return 0, nil
}
klog.InfoS(
"doProbe",
"node", nodeName,
"target", target,
"pod", p.Namespace+"/"+p.Name,
"duration", duration,
)
annos := p.GetAnnotations()
klog.Infof("annos: %v", annos)
k := annos[nodeName]
klog.Infof("node %s k: %s", nodeName, k)
return duration.Microseconds(), nil
}
func (n *NetworkSpeedPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if _, err := n.getProber(nodeInfo.Node().Name); err != nil {
klog.ErrorS(
err, "Filter error",
"node", nodeInfo.Node().Name,
"pod", pod.Namespace+"/"+pod.Name,
)
return framework.NewStatus(framework.Unschedulable)
}
annotations := pod.GetAnnotations()
annotations[nodeInfo.Node().GetName()] = "1" // k
pod.SetAnnotations(annotations)
return nil
}
func (n *NetworkSpeedPlugin) PreScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status {
// 1. 获取pod group的实例个数
podAnnotations := pod.GetAnnotations()
podGroupReplicas, _ := strconv.Atoi(podAnnotations["replicas"])
// 2. 遍历nodes, 计算node所属leafpod node的个数
nodeLeafPodCount := make(map[string]int)
for _, node := range nodes {
nodeAnnotations := node.GetAnnotations()
if v, ok := nodeAnnotations["leafpod"]; ok {
nodeLeafPodCount[v]++
}
}
klog.Infof("nodeLeafPodCount: %v", nodeLeafPodCount)
// 3. 遍历node, 计算k值
for _, node := range nodes {
nodeAnnotations := node.GetAnnotations()
if v, ok := nodeAnnotations["leafpod"]; ok {
klog.Infof("node %s leafpod count: %d, k: %d", node.GetName(), nodeLeafPodCount[v], podGroupReplicas-nodeLeafPodCount[v])
podAnnotations[node.GetName()] = fmt.Sprintf("%d", nodeLeafPodCount[v]-podGroupReplicas)
}
}
// PreScore的时候,判断pod group实例的个数m,nodeInfo所属POD组的node的个数n,k = n -m, k越小则约倾向于调度到该node,k < 0, 则万不得已不要调度到该节点
// 将node对应k值记录到pod的annotation中
pod.SetAnnotations(podAnnotations)
return nil
}
// 归一化分数
func (n *NetworkSpeedPlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
slices.SortFunc(scores, func(a, b framework.NodeScore) int {
return int(a.Score - b.Score)
})
for i, score := range scores {
if score.Score == 0 {
continue
}
scores[i].Score = int64((len(scores) - i) * (100 / len(scores)))
}
klog.InfoS("NormalizeScore", "scores", scores)
return nil
}
func (n *NetworkSpeedPlugin) ScoreExtensions() framework.ScoreExtensions {
return n
}
func New(ctx context.Context, configuration runtime.Object, handle framework.Handle) (framework.Plugin, error) {
var config Config
if err := frameworkruntime.DecodeInto(configuration, &config); err != nil {
return nil, err
}
if config.Selector == nil {
return nil, errors.New("prober selector is nil")
}
if config.Timeout == 0 {
config.Timeout = 3000
}
if config.Namespace == "" {
data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return nil, fmt.Errorf("get namesapce error: %w", err)
}
config.Namespace = string(data)
}
if config.Port == 0 {
config.Port = 9115
}
klog.InfoS("scheduler config", "config", config)
return &NetworkSpeedPlugin{
config: &config,
handle: handle,
}, nil
}
启动调度器
package main
import (
"k8s-network-scheduler/plugins/networkspeed"
"k8s.io/component-base/cli"
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"os"
)
func main() {
cmd := app.NewSchedulerCommand(
app.WithPlugin(networkspeed.PluginName, networkspeed.New),
)
code := cli.Run(cmd)
os.Exit(code)
}
三、调度器配置
apiVersion: kubescheduler.config.k8s.io/v1
kind: KubeSchedulerConfiguration
leaderElection:
leaderElect: false
profiles:
- schedulerName: network-scheduler
plugins:
multiPoint:
enabled:
- name: NetworkSpeed
pluginConfig:
- name: NetworkSpeed
args:
selector:
// 通过标签可以找到对应的插件应用程序
app.kubernetes.io/instance: network-scheduler
app.kubernetes.io/name: blackbox-exporter
配置参数介绍(可以配置不同的调度器以及插件权重)
https://kubernetes.io/zh-cn/docs/reference/scheduling/config/#multiple-profiles
https://blog.csdn.net/qq_24433609/article/details/130149592
scheduler-plugins调研
scheduler-plugins 就是通过scheduler framework扩展的一系列插件,如:coscheduling(pod group),TopologicalSort (app group,会在QueueSort扩展点发挥作用),Noderesourcetopology(可以配置MostAllocated、BalancedAllocation、LeastAllocated、LeastNUMANodes)
部署
scheduler-plugins/manifests/install at master · kubernetes-sigs/scheduler-plugins (github.com)
Installation | Scheduler Plugins (k8s.io)
扩展
编写扩展插件,在启动文件里scheduler-plugins/cmd/scheduler/main.go引入扩展插件,重新编译
package main
import (
"os"
"k8s.io/component-base/cli"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for rest client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/kubernetes/cmd/kube-scheduler/app"
"sigs.k8s.io/scheduler-plugins/pkg/capacityscheduling"
"sigs.k8s.io/scheduler-plugins/pkg/coscheduling"
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/networkoverhead"
"sigs.k8s.io/scheduler-plugins/pkg/networkaware/topologicalsort"
"sigs.k8s.io/scheduler-plugins/pkg/noderesources"
"sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology"
"sigs.k8s.io/scheduler-plugins/pkg/podstate"
"sigs.k8s.io/scheduler-plugins/pkg/preemptiontoleration"
"sigs.k8s.io/scheduler-plugins/pkg/qos"
"sigs.k8s.io/scheduler-plugins/pkg/sysched"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/loadvariationriskbalancing"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/lowriskovercommitment"
"sigs.k8s.io/scheduler-plugins/pkg/trimaran/targetloadpacking"
// Ensure scheme package is initialized.
_ "sigs.k8s.io/scheduler-plugins/apis/config/scheme"
)
func main() {
// Register custom plugins to the scheduler framework.
// Later they can consist of scheduler profile(s) and hence
// used by various kinds of workloads.
command := app.NewSchedulerCommand(
app.WithPlugin(capacityscheduling.Name, capacityscheduling.New),
app.WithPlugin(coscheduling.Name, coscheduling.New),
app.WithPlugin(loadvariationriskbalancing.Name, loadvariationriskbalancing.New),
app.WithPlugin(networkoverhead.Name, networkoverhead.New),
app.WithPlugin(topologicalsort.Name, topologicalsort.New),
app.WithPlugin(noderesources.AllocatableName, noderesources.NewAllocatable),
app.WithPlugin(noderesourcetopology.Name, noderesourcetopology.New),
app.WithPlugin(preemptiontoleration.Name, preemptiontoleration.New),
app.WithPlugin(targetloadpacking.Name, targetloadpacking.New),
app.WithPlugin(lowriskovercommitment.Name, lowriskovercommitment.New),
app.WithPlugin(sysched.Name, sysched.New),
// Sample plugins below.
// app.WithPlugin(crossnodepreemption.Name, crossnodepreemption.New),
app.WithPlugin(podstate.Name, podstate.New),
app.WithPlugin(qos.Name, qos.New),
)
code := cli.Run(command)
os.Exit(code)
}
volcano调研
架构
可以看出,Volcano由scheduler、controllermanager、admission和vcctl组成:
scheduler通过一系列的action和plugin调度Job,并为它找到一个最适合的节点。与k8s本身的调度器相比,Volcano支持针对Job的多种调度算法。
controllermanager管理CRD资源的生命周期。它主要由Queue ControllerManager、PodGroupControllerManager 、 VCJob ControllerManager构成。
admission负责对CRD API资源进行校验。
vcctl是Volcano的命令行客户端工具。
部署
调度流程
enqueue
proportion 插件:Queue相关的的插件
sla插件(enqueue和allocate action)
allocate
rescheduler
metric
各调度器对比
功能项 \ 调度器 | volcano | scheduler-plugins | 自定义扩展点 | 默认调度器 |
---|---|---|---|---|
Gang调度 | 支持 | 支持 | 不支持 | 不支持 |
装箱调度 | 支持 | 支持 | 不支持 | 不支持 |
rescheduler | 支持 | 不支持 | 不支持 | 不支持 |
app dependence | 支持(vcjob支持,没有预留queue的扩展点) | 支持(通过queueSort实现) | 不支持 | 不支持 |
节点池 | 不支持 | 支持、可以通过多调度器+标签实现 | 不支持 | 不支持 |
自定义调度算法 | 支持 通过http extender方式对插件进行扩展(多语言友好) 扩展自定义action和plugin | 支持 需要重新编译scheduler plugins 多语言支持不太友好(需要自定义接口转发,或者sdk库实现) | 支持 | 不支持 |
节点拓扑(MostAllocated、BalancedAllocation、LeastAllocated) | 支持 | 支持 | 不支持 | 不支持 |
多调度策略 | 支持不太好 -- volcano scheduler configmap配置全局调度的plugin -- plugin的参数可以配置在vcjob作业的annotation中单独生效 -- 支持配置muti scheduler -- vcjob中支持配置个性化的plugin(开启ssh免密,svc等) | 支持,可以配置多调度器 | 支持,配置多调度器 | 不支持 |
并发调度 | 支持,可以通过定义Queue对资源大小,优先级限制防止资源冲突 | 支持,不建议做(在资源紧张情况下可能存在资源冲突问题。配合节点池可以考虑做) | 支持,不建议做(在资源紧张情况下可能存在资源冲突问题) | 不支持 |
技术生态 | 成熟 | 不太成熟,资料较少 | 成熟 | 成熟 |
云原生 | Accepted to CNCF on April 9, 2020. Incubating Projects | kubernetes sig小组发布,符合scheduler framework | 原生, scheduler framework方式扩展 | 原生 |
其他 | vcjob支持丰富的调度策略 -- 1. 配置超时时间 -- 2. 设置一级资源、二级资源 -- 3. 支持对接kubeflow、MPI、spark等主流的AI、HPC、大数据计算平台 -- 4. 支持rescheduler 支持多队列资源分配 volcano device plugin支持共享gpu显存 volcano metric |