kubelet与api-server交互

    技术2023-06-22  80

    在kubelet创建成功后,会将状态回写api-server,通知落到该node上的pod已经创建成功,分析两者如何交互。

    syncLoop有个for循环,主要运行syncLoopIteration,并且和pleg组件有交互。

    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {

        select {

        case u, open := <-configCh:

            if !open {

                glog.Errorf("Update channel is closed. Exiting the sync loop.")

                return false

            }

     

            switch u.Op {

            case kubetypes.ADD:

                ...

            case kubetypes.UPDATE:

                ...

            case kubetypes.REMOVE:

                ...

            case kubetypes.RECONCILE:

                ...

            case kubetypes.DELETE:

                ...

            case kubetypes.RESTORE:

                ...

            case kubetypes.SET:

                ...

            }

            ...

        case e := <-plegCh:

            ...

        case <-syncCh:

            ...

        case update := <-kl.livenessManager.Updates():

            ...

        case <-housekeepingCh:

            ...

        }

        return true

    }

     

     

    HandlePodAdditions

    syncLoopIteration中调用的HandlePodAdditions的逻辑:

    先对要add的pods按创建时间排序

    然后遍历pods 

    1. 先把pod写入podManager(如果podManager中没有某个pod,就意味着这个pod已经在apiserver中被删除了,并且除了cleanup不再会做别的操作) 

    2. 处理MirrorPod(为了监控static pod的状态,kubelet通过api server为每个static pod创建一个mirror pod) 

    3. 检查是否admit pod

    4. 将pod分发到某个worker去进行sync操作 

    5. 将pod传给probeManager做健康检查

    func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {

    start := kl.clock.Now()

    sort.Sort(sliceutils.PodsByCreationTime(pods))

    // Responsible for checking limits in resolv.conf

    // The limits do not have anything to do with individual pods

    if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {

    kl.dnsConfigurer.CheckLimitsForResolvConf()

    }

    for _, pod := range pods {

    existingPods := kl.podManager.GetPods()

    // Always add the pod to the pod manager. Kubelet relies on the pod

    // manager as the source of truth for the desired state. If a pod does

    // not exist in the pod manager, it means that it has been deleted in

    // the apiserver and no action (other than cleanup) is required.

    kl.podManager.AddPod(pod)

     

    if kubepod.IsMirrorPod(pod) {

    kl.handleMirrorPod(pod, start)

    continue

    }

     

    if !kl.podIsTerminated(pod) {

    // Only go through the admission process if the pod is not

    // terminated.

     

    // We failed pods that we rejected, so activePods include all admitted

    // pods that are alive.

    activePods := kl.filterOutTerminatedPods(existingPods)

     

    // Check if we can admit the pod; if not, reject it.

    if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {

    kl.rejectPod(pod, reason, message)

    continue

    }

    }

    mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)

    kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)

    kl.probeManager.AddPod(pod)

    }

    }

    主要看第4步:dispatchwork()做了什么。我们按照dispatchWork --> podWorkers.UpdatePod --> podWorkers.managePodLoop的代码链路一路追踪下去,发现最终调用了 podWorkers的syncPodFn,而syncPodFn是在NewMainKubelet对podWorkers初始化时赋值的,赋值为klet.syncPod,所以真正做同步工作的是syncPod

    ...

    klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

    ...

    syncPod是同步单个pod的事务脚本。主要工作流是:

    如果要创建pod,记录pod worker的启动延时调用generateAPIPodStatus为pod准备一个v1.PodStatus对象,用来保存pod状态,并会写到statusManager,回写API server如果pod被视为第一次running,记录pod启动延迟在status manager中更新pod的状态kill掉不该是running的pod如果pod是static pod,且没有mirror pod,创建一个mirror pod如果不存在,则为pod创建数据目录等待volume被attach/mount获取pod的pull secrets调用容器运行时的SyncPod回调更新reasonCache(缓存的是所有容器最近创建失败的原因,用于产生容器状态)

    上面的注释中比较重要的是:

    syncPod会通过status Manager去回写apiserver pod的状态会等待volume被attach/mount之后再继续执行调用的容器运行时的SyncPod 现在我们总结一下:

    syncLoop主要就是将pod同步成期望状态。另外通过grpc与dockershim通信,让dockershim向docker发送创建删除容器的请求,并通过CNI去配置pod网络创建出来的pod实际上就是pause容器加上用户自己的容器(如init容器、业务容器)到这里SyncLoop就完成它的一次循环的工作了,当然每次循环的处理动作要看收到的数据。那么pod创建成功后,我们通过kubectl get pods看到的状态变为running

     

    api-server回写

    kubelet.syncPod中会往statusManager中更新pod状态,但是这个步骤在创建容器之前,创建容器完成后,kubelet中PLEG这个组件去同步。

    func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {

    klog.Info("Starting kubelet main sync loop.")

    // The syncTicker wakes up kubelet to checks if there are any pod workers

    // that need to be sync'd. A one-second period is sufficient because the

    // sync interval is defaulted to 10s.

    syncTicker := time.NewTicker(time.Second)

    defer syncTicker.Stop()

    housekeepingTicker := time.NewTicker(housekeepingPeriod)

    defer housekeepingTicker.Stop()

    # 该位置watch

    plegCh := kl.pleg.Watch()

    const (

    base   = 100 * time.Millisecond

    max    = 5 * time.Second

    factor = 2

    )

    duration := base

    for {

    if err := kl.runtimeState.runtimeErrors(); err != nil {

    klog.Infof("skipping pod synchronization - %v", err)

    // exponential backoff

    time.Sleep(duration)

    duration = time.Duration(math.Min(float64(max), factor*float64(duration)))

    continue

    }

    // reset backoff if we have a success

    duration = base

     

    kl.syncLoopMonitor.Store(kl.clock.Now())

    #该位置传入该channel

    if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {

    break

    }

    kl.syncLoopMonitor.Store(kl.clock.Now())

    }

    }

    pleg watch到数据传入了syncLoopIteration。pleg是用来在pod生命周期中生成事件的,它周期性地去监听容器状态。

    syncLoopIteration中回去watch pleg的eventchannel,而pleg周期性地发现pod(或container)的变化,生成事件,写入eventchannel中。这样syncLoopIteration在select到数据以后会进行相应的处理

    func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,

    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {

    case e := <-plegCh:

    if isSyncPodWorthy(e) {

    // PLEG event for a pod; sync it.

    if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {

    klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)

    handler.HandlePodSyncs([]*v1.Pod{pod})

    } else {

    // If the pod no longer exists, ignore the event.

    klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)

    }

    }

     

    if e.Type == pleg.ContainerDied {

    if containerID, ok := e.Data.(string); ok {

    kl.cleanUpContainersInPod(e.ID, containerID)

    }

    }

    }

    将pod状态(假如是第一次创建到running,则为ContainerStarted)更新到statusManger。一路追踪代码发现数据写入了statusManager中的podStatusChannel,而statusManager在启动时就会select这个channel,并在statusManager的syncPod中去调用kubeClient去更新apiserver中pod的状态

    func (m *manager) Start() {

    // Don't start the status manager if we don't have a client. This will happen

    // on the master, where the kubelet is responsible for bootstrapping the pods

    // of the master components.

    if m.kubeClient == nil {

    klog.Infof("Kubernetes client is nil, not starting status manager.")

    return

    }

     

    klog.Info("Starting to sync pod status with apiserver")

    //lint:ignore SA1015 Ticker can link since this is only called once and doesn't handle termination.

    syncTicker := time.Tick(syncPeriod)

    // syncPod and syncBatch share the same go routine to avoid sync races.

    go wait.Forever(func() {

    select {

    case syncRequest := <-m.podStatusChannel:

    klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",

    syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)

    m.syncPod(syncRequest.podUID, syncRequest.status)

    case <-syncTicker:

    m.syncBatch()

    }

    }, 0)

    }

    syncPod函数调用statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))同步信息到api-server

    // syncPod syncs the given status with the API server. The caller must not hold the lock.

    func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {

    if !m.needsUpdate(uid, status) {

    klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)

    return

    }

     

    // TODO: make me easier to express from client code

    pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})

    if errors.IsNotFound(err) {

    klog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid)

    // If the Pod is deleted the status will be cleared in

    // RemoveOrphanedStatuses, so we just ignore the update here.

    return

    }

    if err != nil {

    klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)

    return

    }

     

    translatedUID := m.podManager.TranslatePodUID(pod.UID)

    // Type convert original uid just for the purpose of comparison.

    if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) {

    klog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)

    m.deletePodStatus(uid)

    return

    }

     

    oldStatus := pod.Status.DeepCopy()

    newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, *oldStatus, mergePodStatus(*oldStatus, status.status))

    klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)

    if err != nil {

    klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)

    return

    }

    pod = newPod

     

    klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)

    m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version

     

    // We don't handle graceful deletion of mirror pods.

    if m.canBeDeleted(pod, status.status) {

    deleteOptions := metav1.NewDeleteOptions(0)

    // Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.

    deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))

    err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)

    if err != nil {

    klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)

    return

    }

    klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))

    m.deletePodStatus(uid)

    }

    }

    PatchPodStatus定义在pkg/util/pod/pod.go中,真正向api-server建连

    // PatchPodStatus patches pod status.

    func PatchPodStatus(c clientset.Interface, namespace, name string, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, error) {

    patchBytes, err := preparePatchBytesforPodStatus(namespace, name, oldPodStatus, newPodStatus)

    if err != nil {

    return nil, nil, err

    }

     

    updatedPod, err := c.CoreV1().Pods(namespace).Patch(name, types.StrategicMergePatchType, patchBytes, "status")

    if err != nil {

    return nil, nil, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err)

    }

    return updatedPod, patchBytes, nil

    }

     

    接下来要调用client-go客户端来进行restful的调用。

     

    client-go

    client-go/kubernetes/typed/core/v1/pod.go

    type PodInterface interface {

    Create(*v1.Pod) (*v1.Pod, error)

    Update(*v1.Pod) (*v1.Pod, error)

    UpdateStatus(*v1.Pod) (*v1.Pod, error)

    Delete(name string, options *metav1.DeleteOptions) error

    DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error

    Get(name string, options metav1.GetOptions) (*v1.Pod, error)

    List(opts metav1.ListOptions) (*v1.PodList, error)

    Watch(opts metav1.ListOptions) (watch.Interface, error)

    Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)

    GetEphemeralContainers(podName string, options metav1.GetOptions) (*v1.EphemeralContainers, error)

    UpdateEphemeralContainers(podName string, ephemeralContainers *v1.EphemeralContainers) (*v1.EphemeralContainers, error)

     

    PodExpansion

    }

    // Patch applies the patch and returns the patched pod.

    func (c *pods) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error) {

    result = &v1.Pod{}

    err = c.client.Patch(pt).

    Namespace(c.ns).

    Resource("pods").

    SubResource(subresources...).

    Name(name).

    Body(data).

    #注意这个Do方法

    Do().

    Into(result)

    return

    }

    Do方法会在接下来调用

    kubernetes\staging\src\k8s.io\client-go\rest\client.go

    type Interface interface {

    GetRateLimiter() flowcontrol.RateLimiter

    Verb(verb string) *Request

    Post() *Request

    Put() *Request

    Patch(pt types.PatchType) *Request

    Get() *Request

    Delete() *Request

    APIVersion() schema.GroupVersion

    }

     

     

    // Patch begins a PATCH request. Short for c.Verb("Patch").

    func (c *RESTClient) Patch(pt types.PatchType) *Request {

    return c.Verb("PATCH").SetHeader("Content-Type", string(pt))

    }

     

     

    func (c *RESTClient) Verb(verb string) *Request {

    backoff := c.createBackoffMgr()

     

    if c.Client == nil {

    return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, 0)

    }

    return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle, c.Client.Timeout)

    }

    kubernetes\staging\src\k8s.io\client-go\rest\request.go

    func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter, timeout time.Duration) *Request {

    if backoff == nil {

    klog.V(2).Infof("Not implementing request backoff strategy.")

    backoff = &NoBackoff{}

    }

     

    pathPrefix := "/"

    if baseURL != nil {

    pathPrefix = path.Join(pathPrefix, baseURL.Path)

    }

    r := &Request{

    client:      client,

    verb:        verb,

    baseURL:     baseURL,

    pathPrefix:  path.Join(pathPrefix, versionedAPIPath),

    content:     content,

    serializers: serializers,

    backoffMgr:  backoff,

    throttle:    throttle,

    timeout:     timeout,

    }

    switch {

    case len(content.AcceptContentTypes) > 0:

    r.SetHeader("Accept", content.AcceptContentTypes)

    case len(content.ContentType) > 0:

    r.SetHeader("Accept", content.ContentType+", */*")

    }

    return r

    }

     

     

    func (r *Request) Do() Result {

    if err := r.tryThrottle(); err != nil {

    return Result{err: err}

    }

     

    var result Result

    err := r.request(func(req *http.Request, resp *http.Response) {

    result = r.transformResponse(resp, req)

    })

    if err != nil {

    return Result{err: err}

    }

    return result

    }

    r.resource = "pods"

    r.namespace = "default"

    r.subresource = "status"

    r.resourcename = "tengine2-test"

    r.body = byte.NewReader(data)

     

     

    func (r *Request) request(fn func(*http.Request, *http.Response)) error {

    //Metrics for total request latency

    start := time.Now()

    defer func() {

    metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))

    }()

     

    if r.err != nil {

    klog.V(4).Infof("Error in request: %v", r.err)

    return r.err

    }

     

    // TODO: added to catch programmer errors (invoking operations with an object with an empty namespace)

    if (r.verb == "GET" || r.verb == "PUT" || r.verb == "DELETE") && r.namespaceSet && len(r.resourceName) > 0 && len(r.namespace) == 0 {

    return fmt.Errorf("an empty namespace may not be set when a resource name is provided")

    }

    if (r.verb == "POST") && r.namespaceSet && len(r.namespace) == 0 {

    return fmt.Errorf("an empty namespace may not be set during creation")

    }

     

    client := r.client

    if client == nil {

    client = http.DefaultClient

    }

     

    // Right now we make about ten retry attempts if we get a Retry-After response.

    maxRetries := 10

    retries := 0

    for {

    #该函数生成url

    url := r.URL().String()

    req, err := http.NewRequest(r.verb, url, r.body)

    func (r *Request) URL() *url.URL {

    p := r.pathPrefix

    if r.namespaceSet && len(r.namespace) > 0 {

    p = path.Join(p, "namespaces", r.namespace)

    }

    if len(r.resource) != 0 {

    p = path.Join(p, strings.ToLower(r.resource))

    }

    // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed

    if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {

    p = path.Join(p, r.resourceName, r.subresource, r.subpath)

    }

    finalURL := &url.URL{}

    根据上面r结构体的信息,可知最后访问的url为:prefix/namespaces/default/pods/tengine2-test/status,body为old status和new status

     

    至此,向api-server发送流程结束

    Processed: 0.017, SQL: 9