k8s-schedule
kube-schedule负责调度pod到集群内节点上,通过监听apiserver,查询未分配node的pod(NodeName字段),然后根据调度策略为这些pod分配节点
主要考虑的为
- 公平调度
- 资源高效利用
- Qos
- affinity 和 anti-affinity
- 数据本地化等等
分为两个阶段 - predicate:过滤不符合条件的节点
- priority:优先级排序,选择优先级高的节点
下面我们看看具体这些功能在源码里怎么实现的
k8s-schedule 源码分析
本文仅分析默认调度器,调度逻辑为来一个pod调度一次。批次调度,有社区实现,后面有时间在讲
k8s-schedule 启动流程
启动流程,通过Setup()方法家在配置,初始化sched,绑定过滤插件以及打分插件
// cmd/kube-scheduler/app/server.go
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
cc, sched, err := Setup(ctx, opts, registryOptions...)
Run(ctx, cc, sched)
}
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
//文末会单独讲Informer
cc.InformerFactory.Start(ctx.Done())
// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())
// DynInformerFactory can be nil in tests.
if cc.DynInformerFactory != nil {
cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
... 省略代码是否为leader,通过Lease对象实现 ...
sched.Run(ctx)
}
//pkg/scheduler/scheduler.go
func (sched *Scheduler) Run(ctx context.Context) {
sched.SchedulingQueue.Run()
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
}
//pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
}
//pkg/scheduler/schedule_one.go
func (sched *Scheduler) schedulingCycle(ctx context.Context, state *framework.CycleState, fwk framework.Framework,
podInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate,)
(ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
}
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
//根据framework过滤节点,获取pod能被调度的一批节点,如果没有或仅有一个则返回,有多个接着执行下列逻辑
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
//优先级排序,选择优先级高的节点
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
//选择节点
host, err := selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
}
k8s Framework详解
接上文我们了解到最后执行predicate过滤节点,以及priority给合适的节点打分,依赖的是Framework加载的各种插件,我们先看下Framework的定义
type Framework interface {
Handle
//preEnqueue插件
PreEnqueuePlugins() []PreEnqueuePlugin
//负责pod进行排序
QueueSortFunc() LessFunc
//运行preFilter插件
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
//运行PostFilter插件
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
//运行配置的PreBind插件
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//运行配置的PostBind插件
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
//运行配置的Reserve 插件
RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status.
//运行Reserve插件的Unreserve方法
RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
//运行一组配置的 Permit 插件,
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//等待Permit插件status需要等待的
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
//运行配置绑定的插件
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
//是否定义Filter插件
HasFilterPlugins() bool
//是否定义PostFilter插件
HasPostFilterPlugins() bool
//是否定义Score插件
HasScorePlugins() bool
//返回插件列表
ListPlugins() *config.Plugins
ProfileName() string
PercentageOfNodesToScore() *int32
SetPodNominator(nominator PodNominator)
}
sched findNodesThatFitPod
findNodesThatFitPod 主要是根据配置启用的插件,以及扩展插件,过滤掉一些不符合要求的节点,如内存不能压缩,内存不够则会oom
//根据要调度的pod寻找合适的node
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
//filter预处理,遍历pod里的init container和主container,计算pod总资源需求(如cpu,内存)
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
//filter阶段,遍历所有节点过滤掉不符合节点
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes)
//处理扩展plugin
feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
return feasibleNodes, diagnosis, nil
}
sched prioritizeNodes
prioritizeNodes主要负责给节点打分
func prioritizeNodes(ctx context.Context, extenders []framework.Extender, fwk framework.Framework,
state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) ([]framework.NodePluginScores, error) {
// score,比如处理亲和性
preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
//给节点打分
nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
//处理扩展plugin
if len(extenders) != 0 && nodes != nil {
allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
var mu sync.Mutex
var wg sync.WaitGroup
for i := range extenders {
if !extenders[i].IsInterested(pod) {
continue
}
wg.Add(1)
go func(extIndex int) {
prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
}(i)
}
}
return nodesScores, nil
}
打分完了返回这一批调度结果后,又回到了schedulingCycle中,我们在看下schedulingCycle后面的逻辑
//pkg/scheduler/schedule_one.go
func (sched *Scheduler) schedulingCycle(.....){
//假定选中pod
sched.assume(assumedPod, scheduleResult.SuggestedHost)
//运行Reserve插件,如bindVolume
fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
//运行Permit插件
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
//触发Unreserve做一些善后操作,
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
}
执行完后,执行bind具体操作
//pkg/scheduler/schedule_one.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
}
func (sched *Scheduler) bindingCycle(ctx context.Context, state *framework.CycleState,fwk framework.Framework,
scheduleResult ScheduleResult, assumedPodInfo *framework.QueuedPodInfo, start time.Time, podsToActivate *framework.PodsToActivate) *framework.Status {
assumedPod := assumedPodInfo.Pod
//运行WaitOnPermit插件
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
return status
}
//绑定相关
fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
// 绑定操作,具体实现在
//staging/src/k8s.io/client-go/kubernetes/typed/core/v1/pod_expansion.go
//b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
//调度完成从SchedulingQueue取出
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
}
return nil
}