kubelet代码梳理及流程

    技术2023-06-11  83

     

    在 Kubernetes 集群中,在每个 Node 节点上都会启动一个 kubelet 服务进程。该进程用于处理 Master 节点下发到本节点的任务,管理 Pod 及 Pod 中的容器。每个 Kubelet 进程会在 APIServer 上注册节点自身信息,定期向 Master 节点汇报节点资源的使用情况,并通过 cAdvise 监控容器和节点资源。

     

    资源同步方式:

    kubelet有几种方式获取自身Node上所需要运行的Pod清单。我们使用通过API Server监听etcd目录,同步Pod列表的方式。

    kubelet通过API Server Client使用WatchAndList的方式监听etcd中/registry/nodes/${当前节点名称}和/registry/pods的目录,将获取的信息同步到本地缓存中。

    kubelet监听etcd,执行对Pod的操作,对容器的操作则是通过Docker Client执行,例如启动删除容器等。

    创建pod的前期准备工作

     

    syncLoop为控制主函数,在 for 循环中一直调用 syncLoopIteration。

    syncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。

     

    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,

        syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {

        select {

        case u, open := <-configCh:

            if !open {

                glog.Errorf("Update channel is closed. Exiting the sync loop.")

                return false

            }

     

            switch u.Op {

            case kubetypes.ADD:

                ...

            case kubetypes.UPDATE:

                ...

            case kubetypes.REMOVE:

                ...

            case kubetypes.RECONCILE:

                ...

            case kubetypes.DELETE:

                ...

            case kubetypes.RESTORE:

                ...

            case kubetypes.SET:

                ...

            }

            ...

        case e := <-plegCh:

            ...

        case <-syncCh:

            ...

        case update := <-kl.livenessManager.Updates():

            ...

        case <-housekeepingCh:

            ...

        }

        return true

    }

     

     

    HandlePodAdditon处理新建pod事件。

    dispatchwork把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。

    备注:probeManager add pod:如果定义了健康检查,启动goroutine进行健康检查。

     

           podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。

           podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发(我理解这里完成第一次下发,后续监听其他的event)。

     

    func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {

        pod := options.Pod

        uid := pod.UID

        var podUpdates chan UpdatePodOptions

        var exists bool

     

        p.podLock.Lock()

        defer p.podLock.Unlock()

     

        // 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel

        if podUpdates, exists = p.podUpdates[uid]; !exists {

            // 创建 channel

            podUpdates = make(chan UpdatePodOptions, 1)

            p.podUpdates[uid] = podUpdates

     

            // 启动 goroutine

            go func() {

                defer runtime.HandleCrash()

                p.managePodLoop(podUpdates)

            }()

        }

        // 下发更新事件

        if !p.isWorking[pod.UID] {

            p.isWorking[pod.UID] = true

            // 写入刚刚创建的channel

            podUpdates <- *options

        } else {

            update, found := p.lastUndeliveredWorkUpdate[pod.UID]

            if !found || update.UpdateType != kubetypes.SyncPodKill {

                p.lastUndeliveredWorkUpdate[pod.UID] = *options

            }

        }

    }

    managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是kubelet.syncPod

    kubelet.podWorkers = &fakePodWorkers{

    syncPodFn: kubelet.syncPod,

    cache:     kubelet.podCache,

    t:         t,

    }

     

    SyncPod是创建容器前的准备工作:

     

    在这个方法中,主要完成以下几件事情:

    如果是删除 pod,立即执行并返回同步 podStatus 到 kubelet.statusManager检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup如果是 static Pod,就创建或者更新对应的 mirrorPod创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好(此处可进行需求一的开发,生成默认nginx.conf)。调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑。

    func (kl *Kubelet) syncPod(o syncPodOptions) error {

        // pull out the required options

    // 该pod存放着要创建的所有信息,详见数据结构调研

        pod := o.pod

        mirrorPod := o.mirrorPod

        podStatus := o.podStatus

        updateType := o.updateType

        // 是否为 删除 pod

        if updateType == kubetypes.SyncPodKill {

            ...

        }

        ...

        // 检查 pod 是否能运行在本节点

        runnable := kl.canRunPod(pod)

        if !runnable.Admit {

            ...

        }

        // 更新 pod 状态

        kl.statusManager.SetPodStatus(pod, apiPodStatus)

        // 如果 pod 非 running 状态则直接 kill 掉

        if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {

            ...

        }

        // 加载网络插件

        if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {

            ...

        }

        pcm := kl.containerManager.NewPodContainerManager()

        if !kl.podIsTerminated(pod) {

            ...

            // 创建并更新 pod 的 cgroups

            if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {

                if !pcm.Exists(pod) {

                    ...

                }

            }

        }

        // 为 static pod 创建对应的 mirror pod

        if kubepod.IsStaticPod(pod) {

            ...

        }

        // 创建数据目录

        if err := kl.makePodDataDirs(pod); err != nil {

            ...

        }

        // 挂载 volume

        if !kl.podIsTerminated(pod) {

            if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {

                ...

            }

        }

        // 获取 secret 信息

        pullSecrets := kl.getPullSecretsForPod(pod)

        // 调用 containerRuntime 的 SyncPod 方法开始创建容器

        result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)

        kl.reasonCache.Update(pod.UID, result)

        if err := result.Error(); err != nil {

            ...

        }

        return nil

    }

    在该函数确认container创建成功返回之前,可以进行neutron上报。

    创建和启动容器

    containerRuntime(pkg/kubelet/kuberuntime)子模块的 SyncPod 函数才是真正完成 pod 内容器实体的创建。

    syncPod 主要执行以下几个操作:

        计算 sandbox 和 container 是否发生变化

        创建 sandbox 容器

        启动 init 容器

        启动业务容器

     

    最终由 startContainer 完成容器的启动。

     

    主要有以下几个步骤:

        拉取镜像

        生成业务容器的配置信息

        调用 docker api 创建容器

        启动容器

        执行 post start hook

     

    func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {

        // 1、检查业务镜像是否存在,不存在则到 Docker Registry 或是 Private Registry 拉取镜像。

        imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)

        if err != nil {

            ...

        }

        ref, err := kubecontainer.GenerateContainerRef(pod, container)

        if err != nil {

            ...

        }

        // 设置 RestartCount

        restartCount := 0

        containerStatus := podStatus.FindContainerStatusByName(container.Name)

        if containerStatus != nil {

            restartCount = containerStatus.RestartCount + 1

        }

        // 2、生成业务容器的配置信息

        containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)

        if cleanupAction != nil {

            defer cleanupAction()

        }

        ...

        // 3、通过 client.CreateContainer 调用 docker api 创建业务容器

        containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)

        if err != nil {

            ...

        }

        err = m.internalLifecycle.PreStartContainer(pod, container, containerID)

        if err != nil {

            ...

        }

        ...

        // 3、启动业务容器

        err = m.runtimeService.StartContainer(containerID)

        if err != nil {

            ...

        }

        containerMeta := containerConfig.GetMetadata()

        sandboxMeta := podSandboxConfig.GetMetadata()

        legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,

            sandboxMeta.Namespace)

        containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)

        if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {

            if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {

                glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",

                    legacySymlink, containerID, containerLog, err)

            }

        }

        // 4、执行 post start hook

        if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {

            kubeContainerID := kubecontainer.ContainerID{

                Type: m.runtimeName,

                ID:   containerID,

            }

            // runner.Run 这个方法的主要作用就是在业务容器起来的时候,

            // 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。

            // 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。

            msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)

            if handlerErr != nil {

                ...

            }

        }

        return "", nil

    }

    Processed: 0.015, SQL: 9