endpoint详解

在k8s中,endpoint用来存储一系列pod的ip,以便services实现负载均衡,小于1000个pod的时候使用endpoint,由于是全量更新,大于1000个pod的时候在
k8s v1.21中替换成endpointSlice,解决性能问题,下面我们来看看具体的定义以及实现

endpoint 监听部分

endpoint启动的时候监听pod,services,endpoint这3种资源的变化,监听的事件如下,需要注意的是,services跟endpoint是同名代表关联,下面我们去看下核心的处理逻辑

  • serviceInformer
    • AddFunc
    • UpdateFunc
    • DeleteFunc
  • podInformer
    • AddFunc
    • UpdateFunc
    • DeleteFunc
  • endpointsInformer
    • DeleteFunc

endpoint消费端逻辑

func (e *Controller) syncService(ctx context.Context, key string) error {
	startTime := time.Now()
	logger := klog.FromContext(ctx)
	//获取到ns+name,endpoint与services name是相等的
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	defer func() {
		logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime))
	}()
    //根据name获取到services
	service, err := e.serviceLister.Services(namespace).Get(name)
	if err != nil {
		//除查询不到直接报错
		if !errors.IsNotFound(err) {
			return err
		}
		//查询不到services 删除endpoint
		err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
		if err != nil && !errors.IsNotFound(err) {
			return err
		}
		e.triggerTimeTracker.DeleteService(namespace, name)
		return nil
	}
    
	//仅对外部名称引用,需要k8s帮我们做负载均衡,而不走endpoint逻辑
	if service.Spec.Type == v1.ServiceTypeExternalName {
		return nil
	}
    //未定义select,一般用来创建一个services,不申明select,手动绑定上subsets对应如k8s外的服务,让k8s帮我们做负载均衡
	if service.Spec.Selector == nil {
		return nil
	}
    //根据select获取所有的pod
	pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
	if err != nil {
		return err
	}
	
	endpointsLastChangeTriggerTime := e.triggerTimeTracker.
		ComputeEndpointLastChangeTriggerTime(namespace, service, pods)

	//subsets未endpoint里对应的pod list
	subsets := []v1.EndpointSubset{}
	var totalReadyEps int
	var totalNotReadyEps int

	for _, pod := range pods {
		//pod 处于terminal状态,有ip,且未被删除
		if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
			continue
		}
        //根据pod上各字段的值获取EndpointAddress对象
		//  - ip: 10.244.166.136
        //    nodeName: node1
        //    targetRef:
        //        kind: Pod
        //        name: nginx-deployment-74b6b979f-rnjrq
        //        namespace: default
        //        uid: afd8bc83-c548-4ca5-b2ab-d45d5820e5aa
		ep, err := podToEndpointAddressForService(service, pod)
		if err != nil {
			logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)
			continue
		}

		epa := *ep
		if endpointsliceutil.ShouldSetHostname(pod, service) {
			epa.Hostname = pod.Spec.Hostname
		}

        // headless类型 没有pod且指定为ClusterIP:none及不需要分配ip
		if len(service.Spec.Ports) == 0 {
			//默认只会记录已就绪pod,添加上PublishNotReadyAddresses后,未就绪的pod也会被记录在endpoint里
			if service.Spec.ClusterIP == api.ClusterIPNone {
				subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
				// No need to repack subsets for headless service without ports.
			}
		} else {
			//组装好subsets.ports
			for i := range service.Spec.Ports {
				servicePort := &service.Spec.Ports[i]
				portNum, err := podutil.FindPort(pod, servicePort)
				if err != nil {
					logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
					continue
				}
                // 创建一个subets.port对象
				epp := endpointPortFromServicePort(servicePort, portNum)

				var readyEps, notReadyEps int
				subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
				//就绪的pod
				totalReadyEps = totalReadyEps + readyEps
				//未就绪pod
				totalNotReadyEps = totalNotReadyEps + notReadyEps
			}
		}
	}
	//上一个for循环[]v1.EndpointSubset{}形式,重新打包成kubectl看到的格式
	subsets = endpoints.RepackSubsets(subsets)

	//获取当前的endpoint对象
	currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
	if err != nil {
		if !errors.IsNotFound(err) {
			return err
		}
		//未找到endpoint,重新创建一个
		currentEndpoints = &v1.Endpoints{
			ObjectMeta: metav1.ObjectMeta{
				Name:   service.Name,
				Labels: service.Labels,
			},
		}
	}
    // 如果版本为空, 则需要创建
	createEndpoints := len(currentEndpoints.ResourceVersion) == 0
	
	//省略了一堆对于其它条件的判断,以及复制更新其它字段信息
	if createEndpoints {
		//不存在则创建
		_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
	} else {
		//存在则更新
		_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
	}
	return nil
}

总结下endpoint逻辑,根据label获取到所有的pod,然后查询pod的状态,分为就绪或者未就绪pod,然后根据services里PublishNotReadyAddresses字段
判断是否需要添加未就绪的pod到endpoint里,以及添加service里的ports信息到endpoints里,更新是做的全量更新
全量更新在pod规模不大的情况下是没问题,如果对应的pod比如上万个,10万个任何一个pod修改都会触发endpoint全量更新,则会出现惊群效应,endpoint slice
就是解决该问题,以及增加了新特性,TopologyAwareHints,及根据区域调度,需要结合services理解,在讲完services后我们再来详细看endpoint slice的具体实现