Deployment 详解

Deployment 控制器是 K8s 中的一个关键组件,用于管理应用程序的部署、扩展和滚动更新。在本文中,我们将深入分析 Kubernetes 的 Deployment
控制器源代码,以了解它是如何实现这些功能的。

Deployment 用例

摘抄自官方文档

  • 创建Deployment 以将ReplicaSet上线。ReplicaSet在后台创建Pod。检查ReplicaSet 的上线状态,查看其是否成功。
  • 通过更新 Deployment的PodTemplateSpec,声明 Pod 的新状态。 新的ReplicaSet会被创建,Deployment以受控速率将Pod从旧 ReplicaSet迁移到新ReplicaSet。每个新的 ReplicaSet 都会更新Deployment的修订版本。
  • 如果 Deployment的当前状态不稳定,回滚到较早的 Deployment 版本。 每次回滚都会更新Deployment的修订版本。
  • 扩大 Deployment规模以承担更多负载。
  • 暂停 Deployment的上线以应用对 PodTemplateSpec所作的多项修改, 然后恢复其执行以启动新的上线版本。
  • 使用 Deployment状态来判定上线过程是否出现停滞。
  • 清理较旧的不再需要的ReplicaSet。
    总的来说,就是使用Deployment管理ReplicaSet,通过ReplicaSet去控制pod的数量,如果template里由改动(通过hash值判断),则重新创建新的rs,并添加
    新的版本,history里会记录下来,支持回滚,暂停以及滚动更新,下面我们从以下几点去看下具体的源码 示例deploy-demo.yaml如下
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  labels:
    app: nginx
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80

deployment 监听

  • podInformer:监听pod删除事件,逻辑比较简单就不分析代码了,主要是判断对应的deployment是否存在,不存在则不继续处理,或者deployment存在
    且strategy策略为Recreate(将原有的pod全部删除),在最后一个pod的del事件后添加到deployment队列里
  • rsInformer:监听ReplicaSet增删改事件,所有事件都会添加到队列里
  • dInformer:监听Deployment增删改事件 rs是属于deployment,添加到队列

syncDeployment方法

前一篇我们分析了replicaset控制器,informer部分代码基本一致,不同的是监听方面,我们直接从具体逻辑开始,syncDeployment则为队列消费端具体逻辑

func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
	//获取ns,name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	startTime := time.Now()
	//注意 这里deployment是只读对象
	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	//已经被删除,直接返回nil
	if errors.IsNotFound(err) {
		return nil
	}
	//前面取出的deployment是只读,深拷贝一份方便后续修改
	d := deployment.DeepCopy()

	everything := metav1.LabelSelector{}
	//deployment未定义选择器
	if reflect.DeepEqual(d.Spec.Selector, &everything) {
		dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
		if d.Status.ObservedGeneration < d.Generation {
			d.Status.ObservedGeneration = d.Generation
			dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
		}
		return nil
	}
	//获取同ns下所有的关联匹配后的rs,并维护ownerReferences
	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
	//根据selector获取关联的pod,并返回map[types.Uid][]*v1.Pod
	podMap, err := dc.getPodMapForDeployment(d, rsList)

	//处理删除逻辑,例如容器未处理kill信号,默认等30s
	if d.DeletionTimestamp != nil {
		//更新该deploy状态
		return dc.syncStatusOnly(ctx, d, rsList)
	}

	//检查是否通过kubectl rollout pause设置暂停,并在添加pause condition
    //spec:
        //paused: true
	//status:
       //availableReplicas: 3
       //conditions:
        //- lastTransitionTime: "2023-09-14T16:58:11Z"
        //  lastUpdateTime: "2023-09-14T16:58:11Z"
        //  message: Deployment is paused
        //  reason: DeploymentPaused
        //  status: Unknown
        //  type: Progressing
        //  observedGeneration: 2
	if err = dc.checkPausedConditions(ctx, d); err != nil {
		return err
	}
	if d.Spec.Paused {
		return dc.sync(ctx, d, rsList)
	}
	
	//如果annotations里有deprecated.deployment.rollback.to版本,则执行回滚,下文会详细介绍
	if getRollbackTo(d) != nil {
		return dc.rollback(ctx, d, rsList)
	}
	//判断是否处于scaling状态,是则sync,下文详细讲解sync方法
	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
	if err != nil {
		return err
	}
	if scalingEvent {
		return dc.sync(ctx, d, rsList)
	}

	//滚动更新部分,yaml定义如下 
	//rollingUpdate:
    //maxSurge: 25%
    //maxUnavailable: 25%
    //type: RollingUpdate
	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		// 重建模式
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		//滚动更新模式
		return dc.rolloutRolling(ctx, d, rsList)
	}
	return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

总结下syncDeployment的具体逻辑

  • 判断DeletionTimestamp不为空,执行syncStatusOnly处理删除逻辑
  • Paused暂停操作逻辑
  • getRollbackTo(d)通过rollback处理回滚逻辑
  • scalingEvent 通过dc.sync处理scale逻辑
  • 更新操作,根据策略rolloutRolling/Recreate进行对应的逻辑 上文大概看了下syncDeployment的逻辑,下面我们根据deploy的创建,修改,删除,以及扩容,缩容,暂停来看下每种情况对应的逻辑与代码

Deployments的删除

删除逻辑很简单,执行kubeclt delete deploy后,会添加上DeletionTimestamp,从queue里获取到key后,查询不到代表已经删除,说明之前已经更新过status,则直接返回nil,
若是还没被删除,则进入syncStatusOnly更新Deploy status信息 deploy,rs,pod的具体删除逻辑在garbagecollector controller完成,后续会详细介绍

//todo 后面补充讲解calculateStatus()具体的计算规则
if d.DeletionTimestamp != nil {
    return dc.syncStatusOnly(ctx, d, rsList)
}
//pkg/controller/deployment/sync.go
func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
    //获取所有旧的以及最新的replicaset,replicaset pod template与deployment一致,则为新的,其它都为旧的
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
    if err != nil {
        return err
    }
    allRSs := append(oldRSs, newRS)
	//计算并更新deployment对象的status字段
    return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}
func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
    //具体的计算status逻辑,todo 后续有空补充,从新的rs里获取最新的status
	newStatus := calculateStatus(allRSs, newRS, d)
    if reflect.DeepEqual(d.Status, newStatus) {
        return nil
    }
	//更新状态
    newDeployment := d
    newDeployment.Status = newStatus
    _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
    return err
}

Deployments更新

Deployments更新主要包含以下部分,下面我们对应每一项看下具体代码怎么实现的,将以RollingUpdate模式分析

  • 更新pod-template里内容
  • 修改label
  • 回滚Deployments
  • 扩容/缩放Deployments
  • 暂停/恢复Deployments上线

更新pod-template里内容,扩容/缩容

  • 更新pod-template里内容:会创建一个新的rs,旧的rs缩容,新的rs扩容
  • 直接修改replicas进行括缩容 当更新后,一般都会发生括缩容,代码通过isScalingEvent判断是否需要扩缩容,然后通过sync执行具体逻辑
func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
	allRSs := append(oldRSs, newRS)
	logger := klog.FromContext(ctx)
	for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
        // 从 rs annotation 中拿到 deployment.kubernetes.io/desired-replicas 的值
		desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
		//annotations:
            //deployment.kubernetes.io/desired-replicas: "3"
            //deployment.kubernetes.io/max-replicas: "4"
            //deployment.kubernetes.io/revision: "1"
		//跟Replicas定义的不想等则需要扩容
		if desired != *(d.Spec.Replicas) {
			return true, nil
		}
	}
	return false, nil
}

根据rs的annotations里desired-replicas判断需要扩容或缩容后,进入sync逻辑

func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	//扩缩容具体逻辑
	if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
		return err
	}
	//暂停逻辑,后文介绍
	if d.Spec.Paused && getRollbackTo(d) == nil {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}
	allRSs := append(oldRSs, newRS)
	//更新状态
	return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

scale先求出需要扩缩容的pod数量,按照策略对 rs 数组进行新旧排序,为了让每个rs都扩缩点, 经过一轮 proportion计算再对rs进行scale扩缩容.

func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
	//如果deployment配置的是滚动更新,需要对rs进行扩缩容
	if deploymentutil.IsRollingUpdate(deployment) {
		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
        //最大可创建的pod数
		allowedSize := int32(0)
		if *(deployment.Spec.Replicas) > 0 {
			//定义的Replicas+最大允许数
			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
		}
		//计算出需要创建或者减少的pod数
		deploymentReplicasToAdd := allowedSize - allRSsReplicas
		var scalingOperation string
		switch {
		case deploymentReplicasToAdd > 0:
			//扩容,新的rs放前面
			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
			scalingOperation = "up"

		case deploymentReplicasToAdd < 0:
			//缩容,旧的rs放前面,删除比较早的pod
			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
			scalingOperation = "down"
		}
		deploymentReplicasAdded := int32(0)
		nameToSize := make(map[string]int32)
		logger := klog.FromContext(ctx)
		//遍历所有的rs
		for i := range allRSs {
			rs := allRSs[i]
			if deploymentReplicasToAdd != 0 {
				//估算出rs需要扩容或缩容的pod数
				proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
				nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
				//求差值
				deploymentReplicasAdded += proportion
			} else {
				nameToSize[rs.Name] = *(rs.Spec.Replicas)
			}
		}
		//1 最活跃rs添加/减少需要增加的pod数
		//2 其它rs 增加/减少pod数
		for i := range allRSs {
			rs := allRSs[i]
			if i == 0 && deploymentReplicasToAdd != 0 {
				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
				nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
				if nameToSize[rs.Name] < 0 {
					nameToSize[rs.Name] = 0
				}
			}
			//执行扩缩容
			if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
				return err
			}
		}
	}
	return nil
}

Deployment 升级

  • Recreate模式:直接重新创建
  • RolloutRolling:滚动升级,先尝试对新的rs进行扩容,更新状态后等待下一次informer触发事件,再次进入,对旧的rs缩容,缩容完成更新状态,再次等待下次
    informer触发事件,直到达到预期
 dc.rolloutRolling(ctx, d, rsList)
//省略了与该逻辑无关的代码
//省略部分分支代码
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
	
	//扩容
	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
	if scaledUp {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}
    //缩容
	//todo reconcileOldReplicaSets后续补充,及计算缩扩规则
	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
	if scaledDown {
		// Update DeploymentStatus
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}
    //滚动完毕清理
	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}
	// 同步状态
	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}