endpoint详解
在k8s中,endpoint用来存储一系列pod的ip,以便services实现负载均衡,小于1000个pod的时候使用endpoint,由于是全量更新,大于1000个pod的时候在
k8s v1.21中替换成endpointSlice,解决性能问题,下面我们来看看具体的定义以及实现
endpoint 监听部分
endpoint启动的时候监听pod,services,endpoint这3种资源的变化,监听的事件如下,需要注意的是,services跟endpoint是同名代表关联,下面我们去看下核心的处理逻辑
- serviceInformer
- AddFunc
- UpdateFunc
- DeleteFunc
- podInformer
- AddFunc
- UpdateFunc
- DeleteFunc
- endpointsInformer
- DeleteFunc
endpoint消费端逻辑
func (e *Controller) syncService(ctx context.Context, key string) error {
startTime := time.Now()
logger := klog.FromContext(ctx)
//获取到ns+name,endpoint与services name是相等的
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
defer func() {
logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime))
}()
//根据name获取到services
service, err := e.serviceLister.Services(namespace).Get(name)
if err != nil {
//除查询不到直接报错
if !errors.IsNotFound(err) {
return err
}
//查询不到services 删除endpoint
err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
e.triggerTimeTracker.DeleteService(namespace, name)
return nil
}
//仅对外部名称引用,需要k8s帮我们做负载均衡,而不走endpoint逻辑
if service.Spec.Type == v1.ServiceTypeExternalName {
return nil
}
//未定义select,一般用来创建一个services,不申明select,手动绑定上subsets对应如k8s外的服务,让k8s帮我们做负载均衡
if service.Spec.Selector == nil {
return nil
}
//根据select获取所有的pod
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
if err != nil {
return err
}
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
//subsets未endpoint里对应的pod list
subsets := []v1.EndpointSubset{}
var totalReadyEps int
var totalNotReadyEps int
for _, pod := range pods {
//pod 处于terminal状态,有ip,且未被删除
if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
continue
}
//根据pod上各字段的值获取EndpointAddress对象
// - ip: 10.244.166.136
// nodeName: node1
// targetRef:
// kind: Pod
// name: nginx-deployment-74b6b979f-rnjrq
// namespace: default
// uid: afd8bc83-c548-4ca5-b2ab-d45d5820e5aa
ep, err := podToEndpointAddressForService(service, pod)
if err != nil {
logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)
continue
}
epa := *ep
if endpointsliceutil.ShouldSetHostname(pod, service) {
epa.Hostname = pod.Spec.Hostname
}
// headless类型 没有pod且指定为ClusterIP:none及不需要分配ip
if len(service.Spec.Ports) == 0 {
//默认只会记录已就绪pod,添加上PublishNotReadyAddresses后,未就绪的pod也会被记录在endpoint里
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
//组装好subsets.ports
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
if err != nil {
logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
continue
}
// 创建一个subets.port对象
epp := endpointPortFromServicePort(servicePort, portNum)
var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
//就绪的pod
totalReadyEps = totalReadyEps + readyEps
//未就绪pod
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
//上一个for循环[]v1.EndpointSubset{}形式,重新打包成kubectl看到的格式
subsets = endpoints.RepackSubsets(subsets)
//获取当前的endpoint对象
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
//未找到endpoint,重新创建一个
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
}
// 如果版本为空, 则需要创建
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
//省略了一堆对于其它条件的判断,以及复制更新其它字段信息
if createEndpoints {
//不存在则创建
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
//存在则更新
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}
return nil
}
总结下endpoint逻辑,根据label获取到所有的pod,然后查询pod的状态,分为就绪或者未就绪pod,然后根据services里PublishNotReadyAddresses字段
判断是否需要添加未就绪的pod到endpoint里,以及添加service里的ports信息到endpoints里,更新是做的全量更新
全量更新在pod规模不大的情况下是没问题,如果对应的pod比如上万个,10万个任何一个pod修改都会触发endpoint全量更新,则会出现惊群效应,endpoint slice
就是解决该问题,以及增加了新特性,TopologyAwareHints,及根据区域调度,需要结合services理解,在讲完services后我们再来详细看endpoint slice的具体实现