ReplicaSet详解
ReplicaSet是k8s中的一个重要控制器,用于确保指定数量的Pod副本在集群中运行。本文将深入分析k8s的ReplicaSet源码,探讨它是如何实现Pod副本的自动化管理的。
在k8s中,ReplicaSet的主要任务是维护指定数量的Pod副本。当发生节点故障或其他问题导致Pod副本不足时,ReplicaSet会自动创建新的Pod以满足所需的副本数量。
反之,如果有过多的Pod副本,ReplicaSet会自动删除多余的Pod。
ReplicaSet定义
ReplicaSet官方文档,yaml定义如下,下文我们将用该yaml文件从ReplicaSet控制器初始化,增删改去分析具体代码
apiVersion: apps/v1
kind: ReplicaSet
metadata:
name: frontend
labels:
app: guestbook
tier: frontend
spec:
# modify replicas according to your case
replicas: 3
selector:
matchLabels:
tier: frontend
template:
metadata:
labels:
tier: frontend
spec:
containers:
- name: nginx-replicase
image: nginx
ReplicaSet控制器初始化
ReplicaSet控制器在k8s controller manager里注册,注册位置如下,注册的时候主要是监听了ReplicaSet informer,pod informer的Add,Update
Delete事件,然后将对应事件放入queue,等待后续消费
//cmd/kube-controller-manager/app/controllermanager.go
register(names.ReplicaSetController, startReplicaSetController)
//cmd/kube-controller-manager/app/apps.go
func startReplicaSetController()
ReplicaSet.NewReplicaSetController(log,ReplicaSetInformer,podInformer).Run()
NewBaseController()
下面我们重点看下NewBaseController方法,重点看下rsInformer,podInformer监听的方法,对应rs以及pod的创建,更新,删除,添加到queue中,在run方法
中不断消费操作对应的k8s对象。下面我们看下ReplicaSet事件的消费
func NewBaseController(logger klog.Logger, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rsc.addRS(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
rsc.updateRS(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
rsc.deleteRS(logger, obj)
},
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rsc.addPod(logger, obj)
},
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: func(oldObj, newObj interface{}) {
rsc.updatePod(logger, oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
rsc.deletePod(logger, obj)
},
})
}
run方法里并发worker数量消费前面投入queue的任务,1秒超时
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, rsc.worker, time.Second)
}
<-ctx.Done()
}
work具体的逻辑为不断从队列里取出元素,具体的执行逻辑在rsc.syncHandler方法
func (rsc *ReplicaSetController) worker(ctx context.Context) {
for rsc.processNextWorkItem(ctx) {
}
}
func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
key, quit := rsc.queue.Get()
defer rsc.queue.Done(key)
err := rsc.syncHandler(ctx, key.(string))
rsc.queue.AddRateLimited(key)
return true
}
总结下注册的时候做了哪些事
- 1 通过ReplicaSet,Pod Informer监听pod与replicaSet对象的add,update,delete事件
- 2 将事件放入RateLimitQueue中
- 3 死循环去消费队列里对象为replicaSet与pod的事件
ReplicaSet消费逻辑
replicaSet从queue里获取到的事件有replicaSet和pod
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
//获取name,namespace
namespace, name, err := cache.SplitMetaNamespaceKey(key)
//根据namespace,name 获取rs对象
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
//代表被删除,清空expectations里对应的监听(保证幂等),pod交由垃圾回收器自动删除
if apierrors.IsNotFound(err) {
rsc.expectations.DeleteExpectations(logger, key)
return nil
}
//观察是否有add/del事件
rsNeedsSync := rsc.expectations.SatisfiedExpectations(logger, key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
//获取所有的pod并根据labels选择
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
filteredPods := controller.FilterActivePods(logger, allPods)
//过滤掉部分pod,如存在删除时间的,pod ownerReferences属性与该rs uid不一致的
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
var manageReplicasErr error
//如果有观察到的add/del事件,并且
//DeletionTimestamp优雅终止使用,比如设置30s退出,pod内没有接收kill信号处理
if rsNeedsSync && rs.DeletionTimestamp == nil {
//获取到过滤后的pod,以及该rs,真正处理逻辑
manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs)
}
rs = rs.DeepCopy()
//rs的新状态
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
//新增或者删除pod,更新rs状态
updatedRS, err := updateReplicaSetStatus(logger, rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
我们看到消费端的队列,rs的增删改,pod的增删改都是从队列里获取key,格式为name/namespace
- rs的删除:直接删除rs,然后pod由垃圾回收器自动回收
- rs的创建,修改;pod的增删改:都通过manageReplicas方法去处理
func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
//当前存在pod比期望的小
if diff < 0 {
diff *= -1
if diff > rsc.burstReplicas {
//一次 sync 中创建/删除的 Pod 数量限制
diff = rsc.burstReplicas
}
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
//创建pod
err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
return nil
}
}
return err
})
if skippedPods := diff - successfulCreations; skippedPods > 0 {
//剩余一部分pod 由于超过了burstReplicas数量暂未处理
for i := 0; i < skippedPods; i++ {
rsc.expectations.CreationObserved(logger, rsKey)
}
}
return err
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
//获取rs所有的pod
relatedPods, err := rsc.getIndirectlyRelatedPods(logger, rs)
utilruntime.HandleError(err)
//diff删除的个数,优先删除早期创建的pod
podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff)
rsc.expectations.ExpectDeletions(logger, rsKey, getPodKeys(podsToDelete))
for _, pod := range podsToDelete {
go func(targetPod *v1.Pod) {
//删除多的pod,具体规则为根据node
if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil {
podKey := controller.PodKey(targetPod)
rsc.expectations.DeletionObserved(logger, rsKey, podKey)
}
}(pod)
}
}
return nil
}
manageReplicas 主要是看当前pod与定义的pod数相比,多则删除,少则创建。 updateReplicaSetStatus方法主要更新rs的状态
unc updateReplicaSetStatus(logger klog.Logger, c appsclient.ReplicaSetInterface, rs *apps.ReplicaSet, newStatus apps.ReplicaSetStatus) (*apps.ReplicaSet, error) {
//到达期望状态,直接返回
if rs.Status.Replicas == newStatus.Replicas &&
rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
rs.Generation == rs.Status.ObservedGeneration &&
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
return rs, nil
}
newStatus.ObservedGeneration = rs.Generation
var getErr, updateErr error
var updatedRS *apps.ReplicaSet
for i, rs := 0, rs; ; i++ {
//直到执行statusUpdateRetries次后终止
rs.Status = newStatus
//更新状态,成功后返回
updatedRS, updateErr = c.UpdateStatus(context.TODO(), rs, metav1.UpdateOptions{})
if updateErr == nil {
return updatedRS, nil
}
if i >= statusUpdateRetries {
break
}
//获取最新状态,循环
if rs, getErr = c.Get(context.TODO(), rs.Name, metav1.GetOptions{}); getErr != nil {
return nil, getErr
}
}
return nil, updateErr
}