ReplicaSet详解

ReplicaSet是k8s中的一个重要控制器,用于确保指定数量的Pod副本在集群中运行。本文将深入分析k8s的ReplicaSet源码,探讨它是如何实现Pod副本的自动化管理的。
在k8s中,ReplicaSet的主要任务是维护指定数量的Pod副本。当发生节点故障或其他问题导致Pod副本不足时,ReplicaSet会自动创建新的Pod以满足所需的副本数量。
反之,如果有过多的Pod副本,ReplicaSet会自动删除多余的Pod。

ReplicaSet定义

ReplicaSet官方文档,yaml定义如下,下文我们将用该yaml文件从ReplicaSet控制器初始化,增删改去分析具体代码

apiVersion: apps/v1
kind: ReplicaSet
metadata:
  name: frontend
  labels:
    app: guestbook
    tier: frontend
spec:
  # modify replicas according to your case
  replicas: 3
  selector:
    matchLabels:
      tier: frontend
  template:
    metadata:
      labels:
        tier: frontend
    spec:
      containers:
        - name: nginx-replicase
          image: nginx

ReplicaSet控制器初始化

ReplicaSet控制器在k8s controller manager里注册,注册位置如下,注册的时候主要是监听了ReplicaSet informer,pod informer的Add,Update
Delete事件,然后将对应事件放入queue,等待后续消费

//cmd/kube-controller-manager/app/controllermanager.go
register(names.ReplicaSetController, startReplicaSetController)
//cmd/kube-controller-manager/app/apps.go
func startReplicaSetController()
    ReplicaSet.NewReplicaSetController(log,ReplicaSetInformer,podInformer).Run()
        NewBaseController()

下面我们重点看下NewBaseController方法,重点看下rsInformer,podInformer监听的方法,对应rs以及pod的创建,更新,删除,添加到queue中,在run方法
中不断消费操作对应的k8s对象。下面我们看下ReplicaSet事件的消费

func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			rsc.addRS(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			rsc.updateRS(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			rsc.deleteRS(logger, obj)
		},
	})
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			rsc.addPod(logger, obj)
		},
		// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
		// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
		// local storage, so it should be ok.
		UpdateFunc: func(oldObj, newObj interface{}) {
			rsc.updatePod(logger, oldObj, newObj)
		},
		DeleteFunc: func(obj interface{}) {
			rsc.deletePod(logger, obj)
		},
	})
}

run方法里并发worker数量消费前面投入queue的任务,1秒超时

// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, rsc.worker, time.Second)
	}
	<-ctx.Done()
}

work具体的逻辑为不断从队列里取出元素,具体的执行逻辑在rsc.syncHandler方法

func (rsc *ReplicaSetController) worker(ctx context.Context) {
	for rsc.processNextWorkItem(ctx) {
	}
}
func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
	key, quit := rsc.queue.Get()
    defer rsc.queue.Done(key)
	err := rsc.syncHandler(ctx, key.(string))
	rsc.queue.AddRateLimited(key)
	return true
}

总结下注册的时候做了哪些事

  • 1 通过ReplicaSet,Pod Informer监听pod与replicaSet对象的add,update,delete事件
  • 2 将事件放入RateLimitQueue中
  • 3 死循环去消费队列里对象为replicaSet与pod的事件

ReplicaSet消费逻辑

replicaSet从queue里获取到的事件有replicaSet和pod

func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
	//获取name,namespace
	namespace, name, err := cache.SplitMetaNamespaceKey(key)

	//根据namespace,name 获取rs对象
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
	//代表被删除,清空expectations里对应的监听(保证幂等),pod交由垃圾回收器自动删除
    if apierrors.IsNotFound(err) {
        rsc.expectations.DeleteExpectations(logger, key)
        return nil
    }
	//观察是否有add/del事件
	rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
	
	//获取所有的pod并根据labels选择
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	filteredPods := controller.FilterActivePods(logger, allPods)
    //过滤掉部分pod,如存在删除时间的,pod ownerReferences属性与该rs uid不一致的
	filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)

	var manageReplicasErr error
	//如果有观察到的add/del事件,并且
	//DeletionTimestamp优雅终止使用,比如设置30s退出,pod内没有接收kill信号处理
	if rsNeedsSync && rs.DeletionTimestamp == nil {
		//获取到过滤后的pod,以及该rs,真正处理逻辑
		manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
	}
	rs = rs.DeepCopy()
	//rs的新状态
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)

	//新增或者删除pod,更新rs状态
	updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
	}
	return manageReplicasErr
}

我们看到消费端的队列,rs的增删改,pod的增删改都是从队列里获取key,格式为name/namespace

  • rs的删除:直接删除rs,然后pod由垃圾回收器自动回收
  • rs的创建,修改;pod的增删改:都通过manageReplicas方法去处理
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	rsKey, err := controller.KeyFunc(rs)
	//当前存在pod比期望的小
	if diff < 0 {
		diff *= -1
		if diff > rsc.burstReplicas {
			//一次 sync 中创建/删除的 Pod 数量限制
			diff = rsc.burstReplicas
		}
		successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
			//创建pod
			err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
			if err != nil {
				if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
					return nil
				}
			}
			return err
		})
		
		if skippedPods := diff - successfulCreations; skippedPods > 0 {
			//剩余一部分pod 由于超过了burstReplicas数量暂未处理
			for i := 0; i < skippedPods; i++ {
				rsc.expectations.CreationObserved(logger, rsKey)
			}
		}
		return err
	} else if diff > 0 {
		if diff > rsc.burstReplicas {
			diff = rsc.burstReplicas
		}
		//获取rs所有的pod
		relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
		utilruntime.HandleError(err)
        
		//diff删除的个数,优先删除早期创建的pod
		podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
		rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))
		for _, pod := range podsToDelete {
			go func(targetPod *v1.Pod) {
				//删除多的pod,具体规则为根据node
				if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
					podKey := controller.PodKey(targetPod)
					rsc.expectations.DeletionObserved(logger, rsKey, podKey)
				}
			}(pod)
		}
	}
	return nil
}

manageReplicas 主要是看当前pod与定义的pod数相比,多则删除,少则创建。 updateReplicaSetStatus方法主要更新rs的状态

unc updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
    //到达期望状态,直接返回
	if rs.Status.Replicas == newStatus.Replicas &&
		rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
		rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
		rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
		rs.Generation == rs.Status.ObservedGeneration &&
		reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
		return rs, nil
	}

	newStatus.ObservedGeneration = rs.Generation
	var getErr, updateErr error
	var updatedRS *apps.ReplicaSet
	for i, rs := 0, rs; ; i++ {
		//直到执行statusUpdateRetries次后终止
		rs.Status = newStatus
		//更新状态,成功后返回
		updatedRS, updateErr = c.UpdateStatus(context.TODO(), rs, metav1.UpdateOptions{})
		if updateErr == nil {
			return updatedRS, nil
		}
		if i >= statusUpdateRetries {
			break
		}
		//获取最新状态,循环
		if rs, getErr = c.Get(context.TODO(), rs.Name, metav1.GetOptions{}); getErr != nil {
			return nil, getErr
		}
	}
	return nil, updateErr
}