k8s-schedule

kube-schedule负责调度pod到集群内节点上,通过监听apiserver,查询未分配node的pod(NodeName字段),然后根据调度策略为这些pod分配节点
主要考虑的为

  • 公平调度
  • 资源高效利用
  • Qos
  • affinity 和 anti-affinity
  • 数据本地化等等
    分为两个阶段
  • predicate:过滤不符合条件的节点
  • priority:优先级排序,选择优先级高的节点
    下面我们看看具体这些功能在源码里怎么实现的

k8s-schedule 源码分析

本文仅分析默认调度器,调度逻辑为来一个pod调度一次。批次调度,有社区实现,后面有时间在讲

k8s-schedule 启动流程

启动流程,通过Setup()方法家在配置,初始化sched,绑定过滤插件以及打分插件

// cmd/kube-scheduler/app/server.go
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
    cc, sched, err := Setup(ctx, opts, registryOptions...)
    Run(ctx, cc, sched)
}
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    //文末会单独讲Informer
	cc.InformerFactory.Start(ctx.Done())
    // Wait for all caches to sync before scheduling.
    cc.InformerFactory.WaitForCacheSync(ctx.Done())
    // DynInformerFactory can be nil in tests.
    if cc.DynInformerFactory != nil {
    cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
	... 省略代码是否为leader,通过Lease对象实现 ...
    sched.Run(ctx)
}
//pkg/scheduler/scheduler.go
func (sched *Scheduler) Run(ctx context.Context) {
    sched.SchedulingQueue.Run()
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
}
//pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
}
//pkg/scheduler/schedule_one.go
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework,   
	podInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate,) 
(ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
    scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
}
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	//根据framework过滤节点,获取pod能被调度的一批节点,如果没有或仅有一个则返回,有多个接着执行下列逻辑
	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
	//优先级排序,选择优先级高的节点 
	priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
	
	//选择节点
	host, err := selectHost(priorityList)
	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

k8s Framework详解

接上文我们了解到最后执行predicate过滤节点,以及priority给合适的节点打分,依赖的是Framework加载的各种插件,我们先看下Framework的定义

type Framework interface {
    Handle
    //preEnqueue插件
    PreEnqueuePlugins() []PreEnqueuePlugin
    //负责pod进行排序
    QueueSortFunc() LessFunc
    //运行preFilter插件
    RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
    //运行PostFilter插件
    RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
    //运行配置的PreBind插件
    RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    //运行配置的PostBind插件
    RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    //运行配置的Reserve 插件
    RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status.
    //运行Reserve插件的Unreserve方法
    RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    //运行一组配置的 Permit 插件,
	RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    //等待Permit插件status需要等待的
	WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
	//运行配置绑定的插件
	RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    //是否定义Filter插件
	HasFilterPlugins() bool
	//是否定义PostFilter插件
    HasPostFilterPlugins() bool
	//是否定义Score插件
    HasScorePlugins() bool
	//返回插件列表
    ListPlugins() *config.Plugins
    ProfileName() string
    PercentageOfNodesToScore() *int32
    SetPodNominator(nominator PodNominator)
}

sched findNodesThatFitPod

findNodesThatFitPod 主要是根据配置启用的插件,以及扩展插件,过滤掉一些不符合要求的节点,如内存不能压缩,内存不够则会oom

//根据要调度的pod寻找合适的node
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
	//filter预处理,遍历pod里的init container和主container,计算pod总资源需求(如cpu,内存)
	preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
	//filter阶段,遍历所有节点过滤掉不符合节点
	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
	//处理扩展plugin
	feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
	return feasibleNodes, diagnosis, nil
}

sched prioritizeNodes

prioritizeNodes主要负责给节点打分

func prioritizeNodes(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, 
	state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) ([]framework.NodePluginScores, error) {
	// score,比如处理亲和性
	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
	//给节点打分
	nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
	//处理扩展plugin
	if len(extenders) != 0 && nodes != nil {
		allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
		var mu sync.Mutex
		var wg sync.WaitGroup
		for i := range extenders {
			if !extenders[i].IsInterested(pod) {
				continue
			}
			wg.Add(1)
			go func(extIndex int) {
				prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
			}(i)
		}
	}
	return nodesScores, nil
}

打分完了返回这一批调度结果后,又回到了schedulingCycle中,我们在看下schedulingCycle后面的逻辑

//pkg/scheduler/schedule_one.go
func (sched *Scheduler) schedulingCycle(.....){
	//假定选中pod
    sched.assume(assumedPod, scheduleResult.SuggestedHost)
	//运行Reserve插件,如bindVolume
    fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
	//运行Permit插件
    runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
    //触发Unreserve做一些善后操作,
    fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
}

执行完后,执行bind具体操作

//pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
    sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
}

func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState,fwk framework.Framework, 
	scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate) *framework.Status {
	assumedPod := assumedPodInfo.Pod
	//运行WaitOnPermit插件
	if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
		return status
	}
	//绑定相关
    fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
	// 绑定操作,具体实现在
	//staging/src/k8s.io/client-go/kubernetes/typed/core/v1/pod_expansion.go
	//b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
	// Run "postbind" plugins.
	fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
	//调度完成从SchedulingQueue取出
	if len(podsToActivate.Map) != 0 {
		sched.SchedulingQueue.Activate(podsToActivate.Map)
	}
	return nil
}