Deployment 详解
Deployment 控制器是 K8s 中的一个关键组件,用于管理应用程序的部署、扩展和滚动更新。在本文中,我们将深入分析 Kubernetes 的 Deployment
控制器源代码,以了解它是如何实现这些功能的。
Deployment 用例
摘抄自官方文档
- 创建Deployment 以将ReplicaSet上线。ReplicaSet在后台创建Pod。检查ReplicaSet 的上线状态,查看其是否成功。
- 通过更新 Deployment的PodTemplateSpec,声明 Pod 的新状态。 新的ReplicaSet会被创建,Deployment以受控速率将Pod从旧 ReplicaSet迁移到新ReplicaSet。每个新的 ReplicaSet 都会更新Deployment的修订版本。
- 如果 Deployment的当前状态不稳定,回滚到较早的 Deployment 版本。 每次回滚都会更新Deployment的修订版本。
- 扩大 Deployment规模以承担更多负载。
- 暂停 Deployment的上线以应用对 PodTemplateSpec所作的多项修改, 然后恢复其执行以启动新的上线版本。
- 使用 Deployment状态来判定上线过程是否出现停滞。
- 清理较旧的不再需要的ReplicaSet。
总的来说,就是使用Deployment管理ReplicaSet,通过ReplicaSet去控制pod的数量,如果template里由改动(通过hash值判断),则重新创建新的rs,并添加
新的版本,history里会记录下来,支持回滚,暂停以及滚动更新,下面我们从以下几点去看下具体的源码 示例deploy-demo.yaml如下
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx-deployment
labels:
app: nginx
spec:
replicas: 3
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:1.14.2
ports:
- containerPort: 80
deployment 监听
- podInformer:监听pod删除事件,逻辑比较简单就不分析代码了,主要是判断对应的deployment是否存在,不存在则不继续处理,或者deployment存在
且strategy策略为Recreate(将原有的pod全部删除),在最后一个pod的del事件后添加到deployment队列里 - rsInformer:监听ReplicaSet增删改事件,所有事件都会添加到队列里
- dInformer:监听Deployment增删改事件 rs是属于deployment,添加到队列
syncDeployment方法
前一篇我们分析了replicaset控制器,informer部分代码基本一致,不同的是监听方面,我们直接从具体逻辑开始,syncDeployment则为队列消费端具体逻辑
func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
//获取ns,name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
startTime := time.Now()
//注意 这里deployment是只读对象
deployment, err := dc.dLister.Deployments(namespace).Get(name)
//已经被删除,直接返回nil
if errors.IsNotFound(err) {
return nil
}
//前面取出的deployment是只读,深拷贝一份方便后续修改
d := deployment.DeepCopy()
everything := metav1.LabelSelector{}
//deployment未定义选择器
if reflect.DeepEqual(d.Spec.Selector, &everything) {
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
if d.Status.ObservedGeneration < d.Generation {
d.Status.ObservedGeneration = d.Generation
dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return nil
}
//获取同ns下所有的关联匹配后的rs,并维护ownerReferences
rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
//根据selector获取关联的pod,并返回map[types.Uid][]*v1.Pod
podMap, err := dc.getPodMapForDeployment(d, rsList)
//处理删除逻辑,例如容器未处理kill信号,默认等30s
if d.DeletionTimestamp != nil {
//更新该deploy状态
return dc.syncStatusOnly(ctx, d, rsList)
}
//检查是否通过kubectl rollout pause设置暂停,并在添加pause condition
//spec:
//paused: true
//status:
//availableReplicas: 3
//conditions:
//- lastTransitionTime: "2023-09-14T16:58:11Z"
// lastUpdateTime: "2023-09-14T16:58:11Z"
// message: Deployment is paused
// reason: DeploymentPaused
// status: Unknown
// type: Progressing
// observedGeneration: 2
if err = dc.checkPausedConditions(ctx, d); err != nil {
return err
}
if d.Spec.Paused {
return dc.sync(ctx, d, rsList)
}
//如果annotations里有deprecated.deployment.rollback.to版本,则执行回滚,下文会详细介绍
if getRollbackTo(d) != nil {
return dc.rollback(ctx, d, rsList)
}
//判断是否处于scaling状态,是则sync,下文详细讲解sync方法
scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
if err != nil {
return err
}
if scalingEvent {
return dc.sync(ctx, d, rsList)
}
//滚动更新部分,yaml定义如下
//rollingUpdate:
//maxSurge: 25%
//maxUnavailable: 25%
//type: RollingUpdate
switch d.Spec.Strategy.Type {
case apps.RecreateDeploymentStrategyType:
// 重建模式
return dc.rolloutRecreate(ctx, d, rsList, podMap)
case apps.RollingUpdateDeploymentStrategyType:
//滚动更新模式
return dc.rolloutRolling(ctx, d, rsList)
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
总结下syncDeployment的具体逻辑
- 判断DeletionTimestamp不为空,执行syncStatusOnly处理删除逻辑
- Paused暂停操作逻辑
- getRollbackTo(d)通过rollback处理回滚逻辑
- scalingEvent 通过dc.sync处理scale逻辑
- 更新操作,根据策略rolloutRolling/Recreate进行对应的逻辑 上文大概看了下syncDeployment的逻辑,下面我们根据deploy的创建,修改,删除,以及扩容,缩容,暂停来看下每种情况对应的逻辑与代码
Deployments的删除
删除逻辑很简单,执行kubeclt delete deploy后,会添加上DeletionTimestamp,从queue里获取到key后,查询不到代表已经删除,说明之前已经更新过status,则直接返回nil,
若是还没被删除,则进入syncStatusOnly更新Deploy status信息 deploy,rs,pod的具体删除逻辑在garbagecollector controller完成,后续会详细介绍
//todo 后面补充讲解calculateStatus()具体的计算规则
if d.DeletionTimestamp != nil {
return dc.syncStatusOnly(ctx, d, rsList)
}
//pkg/controller/deployment/sync.go
func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
//获取所有旧的以及最新的replicaset,replicaset pod template与deployment一致,则为新的,其它都为旧的
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
//计算并更新deployment对象的status字段
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}
func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
//具体的计算status逻辑,todo 后续有空补充,从新的rs里获取最新的status
newStatus := calculateStatus(allRSs, newRS, d)
if reflect.DeepEqual(d.Status, newStatus) {
return nil
}
//更新状态
newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
return err
}
Deployments更新
Deployments更新主要包含以下部分,下面我们对应每一项看下具体代码怎么实现的,将以RollingUpdate模式分析
- 更新pod-template里内容
- 修改label
- 回滚Deployments
- 扩容/缩放Deployments
- 暂停/恢复Deployments上线
更新pod-template里内容,扩容/缩容
- 更新pod-template里内容:会创建一个新的rs,旧的rs缩容,新的rs扩容
- 直接修改replicas进行括缩容 当更新后,一般都会发生括缩容,代码通过isScalingEvent判断是否需要扩缩容,然后通过sync执行具体逻辑
func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
allRSs := append(oldRSs, newRS)
logger := klog.FromContext(ctx)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
// 从 rs annotation 中拿到 deployment.kubernetes.io/desired-replicas 的值
desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
//annotations:
//deployment.kubernetes.io/desired-replicas: "3"
//deployment.kubernetes.io/max-replicas: "4"
//deployment.kubernetes.io/revision: "1"
//跟Replicas定义的不想等则需要扩容
if desired != *(d.Spec.Replicas) {
return true, nil
}
}
return false, nil
}
根据rs的annotations里desired-replicas判断需要扩容或缩容后,进入sync逻辑
func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
//扩缩容具体逻辑
if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
return err
}
//暂停逻辑,后文介绍
if d.Spec.Paused && getRollbackTo(d) == nil {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}
allRSs := append(oldRSs, newRS)
//更新状态
return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}
scale先求出需要扩缩容的pod数量,按照策略对 rs 数组进行新旧排序,为了让每个rs都扩缩点, 经过一轮 proportion计算再对rs进行scale扩缩容.
func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
//如果deployment配置的是滚动更新,需要对rs进行扩缩容
if deploymentutil.IsRollingUpdate(deployment) {
allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
//最大可创建的pod数
allowedSize := int32(0)
if *(deployment.Spec.Replicas) > 0 {
//定义的Replicas+最大允许数
allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
}
//计算出需要创建或者减少的pod数
deploymentReplicasToAdd := allowedSize - allRSsReplicas
var scalingOperation string
switch {
case deploymentReplicasToAdd > 0:
//扩容,新的rs放前面
sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
scalingOperation = "up"
case deploymentReplicasToAdd < 0:
//缩容,旧的rs放前面,删除比较早的pod
sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
scalingOperation = "down"
}
deploymentReplicasAdded := int32(0)
nameToSize := make(map[string]int32)
logger := klog.FromContext(ctx)
//遍历所有的rs
for i := range allRSs {
rs := allRSs[i]
if deploymentReplicasToAdd != 0 {
//估算出rs需要扩容或缩容的pod数
proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
//求差值
deploymentReplicasAdded += proportion
} else {
nameToSize[rs.Name] = *(rs.Spec.Replicas)
}
}
//1 最活跃rs添加/减少需要增加的pod数
//2 其它rs 增加/减少pod数
for i := range allRSs {
rs := allRSs[i]
if i == 0 && deploymentReplicasToAdd != 0 {
leftover := deploymentReplicasToAdd - deploymentReplicasAdded
nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
if nameToSize[rs.Name] < 0 {
nameToSize[rs.Name] = 0
}
}
//执行扩缩容
if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
return err
}
}
}
return nil
}
Deployment 升级
- Recreate模式:直接重新创建
- RolloutRolling:滚动升级,先尝试对新的rs进行扩容,更新状态后等待下一次informer触发事件,再次进入,对旧的rs缩容,缩容完成更新状态,再次等待下次
informer触发事件,直到达到预期
dc.rolloutRolling(ctx, d, rsList)
//省略了与该逻辑无关的代码
//省略部分分支代码
func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
//扩容
scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
if scaledUp {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
//缩容
//todo reconcileOldReplicaSets后续补充,及计算缩扩规则
scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
if scaledDown {
// Update DeploymentStatus
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}
//滚动完毕清理
if deploymentutil.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
return err
}
}
// 同步状态
return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}