pod详解
- pod是容器集合,基于cgroup共享pid,ipc,network,和uts namesapce,是k8s调度的基本单位
- pod设计理念支持多个容器在一个pod中共享网络和文件系统,可以通过进程间通信和文件共享完成服务
下面我们来看下以下pod是怎么在k8s中被创建的,下面是粗略的流程图
➜ k8s git:(main) ✗ cat k8s-pod| graph-easy
+---------+
| csi |
+---------+
^
|
|
+---------+ +-----------+ +----------+ +---------+ +-----+
| kubectl | --> | apiserver | --> | schedule | --> | kubelet | --> | cni |
+---------+ +-----------+ +----------+ +---------+ +-----+
|
|
v
+---------+
| cri |
+---------+
#kubectl apply -f xx.yaml
apiVersion: v1
kind: Pod
metadata:
name: hello
spec:
containers:
- image: nginx:1.15
name: nginx
kubectl 执行yaml后,会通过apiserver鉴权等一系列操作,然后经过调度,此时会有controller过滤打分驱逐等等确定节点
节点上运行的kubelet会通过cri接口与containerd/docker生成拉起镜像,然后通过cni配置网络相关,通过csi配置好存储相关
下面我们一起来看看kublet具体的代码
kubelet启动流程
kubelet在k8s中主要负责跟node交互,如创建pod,上报node的cpu,内存,磁盘信息等等,以及pod的网络,磁盘分配,下面我们来看看kubelet的启动流程
本机启动命令如下,分别配置了kubeconfig,config以及与cri交互通过containerd.sock,下面我们看下启动的流程,然后结合代码讲解
/usr/bin/kubelet
--bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf
--kubeconfig=/etc/kubernetes/kubelet.conf
--config=/var/lib/kubelet/config.yaml
--container-runtime-endpoint=unix:///var/run/containerd/containerd.sock
--pod-infra-container-image=registry.aliyuncs.com/google_containers/pause:3.9
kubelet启动粗略流程,锁进代表进入新的函数 如func a(){b()},则描述为
a()
b()
server.go:
NewKubeletCommand()
//kubelet 启动需要的依赖项目
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
//依赖项
plugins, err := ProbeVolumePlugins(featureGate)
run(ctx, s, kubeDeps, featureGate)
//初始化 KubeClient,EventClient,HeartbeatClient 以及一些配置的校验
...
//初始化cri
kubeDeps.ContainerManager, err = cm.NewContainerManager(.....)
//pre 服务初始化
kubelet.PreInitRuntimeService()
//启动RunKubelet
RunKubelet(s, kubeDeps, s.RunOnce);
//创建并初始化kublet
k, err := createAndInitKubelet(kubeServer,kubeDeps,hostname,hostnameOverridden,nodeName,nodeIPs)
//启动Kubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
go k.Run(podCfg.Updates())
//start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
//监听file,apiserver,http
kl.syncLoop(ctx, updates, kl)
//handle pod event
kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)
//下面是处理pod相关,在下文详细讲解
pod
从以下管道中接收事件
- configCh: a channel to read config events from //static po 一般通过申明文件放在kubernetes/manifest目录下,用来创建管理组建的pod
- handler: the SyncHandler to dispatch pods to
- syncCh: a channel to read periodic sync events from
- housekeepingCh: a channel to read housekeeping events from
- plegCh: a channel to read PLEG updates from 分别对应如下方法去对pod进行操作
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
HandlePodAdditions(pods []*v1.Pod)
HandlePodUpdates(pods []*v1.Pod)
HandlePodRemoves(pods []*v1.Pod)
HandlePodReconcile(pods []*v1.Pod)
HandlePodSyncs(pods []*v1.Pod)
HandlePodCleanups(ctx context.Context) error
}
下面我们先看HandlePodAdditions()方法的大概实现
pod Add
//多余代码已经删除,仅保留主流程代码
//pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
//获取这次事件的所有的pod
for _, pod := range pods {
//将pod添加到podManager,如果podManager中不存在pod,则说明被apiserver删除,即被controller删除或者调用kubectl删除
kl.podManager.AddPod(pod)
//检查该pod是否可以被调度到该node
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
//在异步中执行pod同步
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
}
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
//pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
//省略了一些对状态的判断,如被删除了,提前中止
go func() {
p.podWorkerLoop(uid, outCh)
}()
}
}
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
}
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
// check soft admin, pod will be pending if check fails
runnable := kl.canRunPod(pod)
//检查网络插件
if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {}
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(ctx, pod, podStatus, pullSecrets, kl.backOff)
}
//pkg/kubelet/kuberuntime/kuberuntime_manager.go
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
//检查创建sandbox规范
podContainerChanges := m.computePodActions(ctx, pod, podStatus)
//创建pod,函数见下文
podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
//启动初始化容器
start(ctx, "init container", metrics.InitContainer, containerStartSpec(container));
//启动容器
for _, idx := range podContainerChanges.ContainersToStart {
start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}
}
//
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
// 创建pd日志目录
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
//与cri交互
podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
//最终调用 c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/RunPodSandbox", in, out, opts...)
//k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
}
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
//拉取镜像
m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig)
//创建容器
//c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/CreateContainer", in, out, opts...)
m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)
//设置cpu set
m.internalLifecycle.PreStartContainer(pod, container, containerID)
//cri启动容器
//k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go
//c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/StartContainer", in, out, opts...)
m.runtimeService.StartContainer(ctx, containerID)
}