pod详解

  • pod是容器集合,基于cgroup共享pid,ipc,network,和uts namesapce,是k8s调度的基本单位
  • pod设计理念支持多个容器在一个pod中共享网络和文件系统,可以通过进程间通信和文件共享完成服务
    下面我们来看下以下pod是怎么在k8s中被创建的,下面是粗略的流程图
➜  k8s git:(main) ✗ cat k8s-pod| graph-easy
                                                   +---------+
                                                   |   csi   |
                                                   +---------+
                                                     ^
                                                     |
                                                     |
+---------+     +-----------+     +----------+     +---------+     +-----+
| kubectl | --> | apiserver | --> | schedule | --> | kubelet | --> | cni |
+---------+     +-----------+     +----------+     +---------+     +-----+
                                                     |
                                                     |
                                                     v
                                                   +---------+
                                                   |   cri   |
                                                   +---------+
#kubectl apply -f xx.yaml
apiVersion: v1 
kind: Pod
metadata:
  name: hello 
spec:
  containers:
  - image: nginx:1.15
  name: nginx

kubectl 执行yaml后,会通过apiserver鉴权等一系列操作,然后经过调度,此时会有controller过滤打分驱逐等等确定节点
节点上运行的kubelet会通过cri接口与containerd/docker生成拉起镜像,然后通过cni配置网络相关,通过csi配置好存储相关
下面我们一起来看看kublet具体的代码

kubelet启动流程

kubelet在k8s中主要负责跟node交互,如创建pod,上报node的cpu,内存,磁盘信息等等,以及pod的网络,磁盘分配,下面我们来看看kubelet的启动流程
本机启动命令如下,分别配置了kubeconfig,config以及与cri交互通过containerd.sock,下面我们看下启动的流程,然后结合代码讲解

/usr/bin/kubelet 
--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf 
--kubeconfig=/etc/kubernetes/kubelet.conf 
--config=/var/lib/kubelet/config.yaml
--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock
--pod-infra-container-image=registry.aliyuncs.com/google_containers/pause:3.9
kubelet启动粗略流程,锁进代表进入新的函数 如func a(){b()},则描述为
a()
  b()
server.go:
    NewKubeletCommand()
    //kubelet 启动需要的依赖项目
    kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate) 
    //依赖项
    plugins, err := ProbeVolumePlugins(featureGate)
    run(ctx, s, kubeDeps, featureGate)
        //初始化 KubeClient,EventClient,HeartbeatClient 以及一些配置的校验
        ...
        //初始化cri
        kubeDeps.ContainerManager, err = cm.NewContainerManager(.....)
        //pre 服务初始化
        kubelet.PreInitRuntimeService()
        //启动RunKubelet
        RunKubelet(s, kubeDeps, s.RunOnce); 
            //创建并初始化kublet
            k, err := createAndInitKubelet(kubeServer,kubeDeps,hostname,hostnameOverridden,nodeName,nodeIPs)
            //启动Kubelet
            startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
                go k.Run(podCfg.Updates())
                    //start volume manager
                    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
                    //监听file,apiserver,http
                    kl.syncLoop(ctx, updates, kl)
                        //handle pod event
                        kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
                        //下面是处理pod相关,在下文详细讲解

pod

从以下管道中接收事件

  • configCh: a channel to read config events from //static po 一般通过申明文件放在kubernetes/manifest目录下,用来创建管理组建的pod
  • handler: the SyncHandler to dispatch pods to
  • syncCh: a channel to read periodic sync events from
  • housekeepingCh: a channel to read housekeeping events from
  • plegCh: a channel to read PLEG updates from 分别对应如下方法去对pod进行操作
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups(ctx context.Context) error
}

下面我们先看HandlePodAdditions()方法的大概实现

pod Add
//多余代码已经删除,仅保留主流程代码
//pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	//获取这次事件的所有的pod
	for _, pod := range pods {
		//将pod添加到podManager,如果podManager中不存在pod,则说明被apiserver删除,即被controller删除或者调用kubectl删除
		kl.podManager.AddPod(pod)
		//检查该pod是否可以被调度到该node
        if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
            kl.rejectPod(pod, reason, message)
            continue
        }
		//在异步中执行pod同步
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}
//pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
	//省略了一些对状态的判断,如被删除了,提前中止
    go func() {
        p.podWorkerLoop(uid, outCh)
    }()
}
}
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
    isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
}
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
    // check soft admin, pod will be pending if check fails
	runnable := kl.canRunPod(pod)
	//检查网络插件
	if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {}
    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff)
}
//pkg/kubelet/kuberuntime/kuberuntime_manager.go
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    //检查创建sandbox规范
	podContainerChanges := m.computePodActions(ctx, pod, podStatus)
	//创建pod,函数见下文
    podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
	
	//启动初始化容器
    start(ctx, "init container", metrics.InitContainer, containerStartSpec(container));
	//启动容器
    for _, idx := range podContainerChanges.ContainersToStart {
        start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }
}
//
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
    // 创建pd日志目录
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    //与cri交互
    podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
	//最终调用 c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
	//k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
}
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
    //拉取镜像
	m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig)
	//创建容器
	//c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/CreateContainer", in, out, opts...)
    m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
	
	//设置cpu set
    m.internalLifecycle.PreStartContainer(pod, container, containerID)
	//cri启动容器
	//k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
	//c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/StartContainer", in, out, opts...)
    m.runtimeService.StartContainer(ctx, containerID)
}