DPDK vhost-user之前后端通知机制场景分析(十)

    技术2022-07-11  70

    所谓前后端通知,必然涉及两个方向:前端通知后端,后端通知前端。而我们知道vhost有txq和rxq,对于每种queue都伴随有这两种通知。而通知方式又根据是否支持event_idx有着不同的实现,最后virtio1.1引入的packed ring后,通知相对split ring又有不同。下面我们以txq,rxq的两个方向共四种情况来分析前后端的通知实现。其中前端以kernel4.9 virtio_net实现为例分析,后端以dpdk 18.11 vhost_user实现分析。在展开前后端通知分析前,我们先了解两个背景知识:前端中断处理函数的注册和后端vhost_user的kick方式。

    前端中断处理函数注册

    后端对前端的通知,是以中断方式传递到前端的。分析通知的接收处理就少不了要了解这些中断处理函数。所以我们先看一下前端是怎么注册中断处理函数的。这些需要从virtio_net的加载函数virtnet_probe说起,具体如下图所示。 我们知道virtio设备分为morden和lagecy两种,我们以morden设备为例。对于morden设备,会调用virtio_pci_modern_probe初始化config ops:

    vp_dev->vdev.config = &virtio_pci_config_ops;

    其find_vqs函数对应为vp_modern_find_vqs。vp_modern_find_vqs 其中主要调用vq_find_vqs函数。vp_find_vqs函数完成队列中断处理函数的初始化,根据设备对中断的支持,分为以下三种情况:

    所有txq,rxq以及ctrlq都共享一个中断处理;ctrlq单独使用一个中断处理,其他txq和rxq共享一个中断处理;可以每个queue(包含txq,rxq以及ctrlq)各一个中断处理;

    virtnet_find_vqs(kernel 4.9)

    static int virtnet_find_vqs(struct virtnet_info *vi) { vq_callback_t **callbacks; struct virtqueue **vqs; int ret = -ENOMEM; int i, total_vqs; const char **names; /* We expect 1 RX virtqueue followed by 1 TX virtqueue, followed by * possible N-1 RX/TX queue pairs used in multiqueue mode, followed by * possible control vq. */ total_vqs = vi->max_queue_pairs * 2 + virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ); /* Allocate space for find_vqs parameters */ vqs = kzalloc(total_vqs * sizeof(*vqs), GFP_KERNEL); if (!vqs) goto err_vq; callbacks = kmalloc(total_vqs * sizeof(*callbacks), GFP_KERNEL); if (!callbacks) goto err_callback; names = kmalloc(total_vqs * sizeof(*names), GFP_KERNEL); if (!names) goto err_names; /* Parameters for control virtqueue, if any */ if (vi->has_cvq) { callbacks[total_vqs - 1] = NULL; names[total_vqs - 1] = "control"; } /* Allocate/initialize parameters for send/receive virtqueues */ for (i = 0; i < vi->max_queue_pairs; i++) { callbacks[rxq2vq(i)] = skb_recv_done; callbacks[txq2vq(i)] = skb_xmit_done; sprintf(vi->rq[i].name, "input.%d", i); sprintf(vi->sq[i].name, "output.%d", i); names[rxq2vq(i)] = vi->rq[i].name; names[txq2vq(i)] = vi->sq[i].name; } ret = vi->vdev->config->find_vqs(vi->vdev, total_vqs, vqs, callbacks, names); if (ret) goto err_find; if (vi->has_cvq) { vi->cvq = vqs[total_vqs - 1]; if (virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VLAN)) vi->dev->features |= NETIF_F_HW_VLAN_CTAG_FILTER; } for (i = 0; i < vi->max_queue_pairs; i++) { vi->rq[i].vq = vqs[rxq2vq(i)]; vi->sq[i].vq = vqs[txq2vq(i)]; } kfree(names); kfree(callbacks); kfree(vqs); return 0; err_find: kfree(names); err_names: kfree(callbacks); err_callback: kfree(vqs); err_vq: return ret; }

    vp_find_vqs (kernel 4.9)

    /* the config->find_vqs() implementation */ int vp_find_vqs(struct virtio_device *vdev, unsigned nvqs, struct virtqueue *vqs[], vq_callback_t *callbacks[], const char * const names[]) { int err; /* Try MSI-X with one vector per queue. */ err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, true, true); if (!err) return 0; /* Fallback: MSI-X with one vector for config, one shared for queues. */ err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, true, false); if (!err) return 0; /* Finally fall back to regular interrupts. */ return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, false, false); }

    函数首先尝试方式3每个queue各一个中断处理,如果失败再尝试方式2,如果再失败就只能使用方式1了。

    我们看到每种方式都是调用同一个函数vp_try_to_find_vqs,只是传入的参数不同。那我们就来看下这个函数的主要内容。

    vp_try_to_find_vqs

    static int vp_try_to_find_vqs(struct virtio_device *vdev, unsigned nvqs, struct virtqueue *vqs[], vq_callback_t *callbacks[], const char * const names[], bool use_msix, bool per_vq_vectors) { struct virtio_pci_device *vp_dev = to_vp_device(vdev); u16 msix_vec; int i, err, nvectors, allocated_vectors; vp_dev->vqs = kmalloc(nvqs * sizeof *vp_dev->vqs, GFP_KERNEL); if (!vp_dev->vqs) return -ENOMEM; if (!use_msix) { /* 方式1,所有txq,rxq以及ctrlq都共享一个中断处理 */ /* Old style: one normal interrupt for change and all vqs. */ err = vp_request_intx(vdev); if (err) goto error_find; } else { if (per_vq_vectors) { /* Best option: one for change interrupt, one per vq. */ /* 方式3,可以每个queue一个中断处理 */ nvectors = 1; for (i = 0; i < nvqs; ++i) if (callbacks[i]) ++nvectors; } else { /* Second best: one for change, shared for all vqs. */ /* 方式2,ctrlq一个中断,其他txq和rxq共享一个中断处理 */ nvectors = 2; } err = vp_request_msix_vectors(vdev, nvectors, per_vq_vectors); if (err) goto error_find; } vp_dev->per_vq_vectors = per_vq_vectors; allocated_vectors = vp_dev->msix_used_vectors; for (i = 0; i < nvqs; ++i) { /* 方式3的处理 */ if (!names[i]) { vqs[i] = NULL; continue; } else if (!callbacks[i] || !vp_dev->msix_enabled) msix_vec = VIRTIO_MSI_NO_VECTOR; else if (vp_dev->per_vq_vectors) msix_vec = allocated_vectors++; else msix_vec = VP_MSIX_VQ_VECTOR; vqs[i] = vp_setup_vq(vdev, i, callbacks[i], names[i], msix_vec); if (IS_ERR(vqs[i])) { err = PTR_ERR(vqs[i]); goto error_find; } if (!vp_dev->per_vq_vectors || msix_vec == VIRTIO_MSI_NO_VECTOR) continue; /* allocate per-vq irq if available and necessary */ snprintf(vp_dev->msix_names[msix_vec], sizeof *vp_dev->msix_names, "%s-%s", dev_name(&vp_dev->vdev.dev), names[i]); err = request_irq(vp_dev->msix_entries[msix_vec].vector, vring_interrupt, 0, vp_dev->msix_names[msix_vec], vqs[i]); if (err) { vp_del_vq(vqs[i]); goto error_find; } } return 0; error_find: vp_del_vqs(vdev); return err; }

    其中vp_request_intx函数完成方式1的处理,具体就是通过request_irq注册中断处理函数vp_interrupt;vp_request_msix_vectors函数完成方式2的处理,其中调用request_irq给ctrlq注册中断处理函数为vp_config_changed(方式3的ctrlq中断处理也是这里注册),调用request_irq给数据queue(txq,rxq)注册中断处理函数为vp_vring_interrupt;而方式3的剩余处理在本函数的后半部分,为每个数据queue调用request_irq注册中断处理函数vring_interrupt。如果查看代码会发现方式2数据queue共享的中断处理vp_vring_interrupt函数中也是通过遍历所有queue调用vring_interrupt实现的。所以我们重点关注vring_interrupt函数的实现,这是数据queue中断处理的核心。

    vring_interrupt

    irqreturn_t vring_interrupt(int irq, void *_vq) { struct vring_virtqueue *vq = to_vvq(_vq); if (!more_used(vq)) { /* 如果没有更新uesd desc则不需要特殊处理直接返回 */ pr_debug("virtqueue interrupt with no work for %p\n", vq); return IRQ_NONE; } if (unlikely(vq->broken)) return IRQ_HANDLED; pr_debug("virtqueue callback for %p (%p)\n", vq, vq->vq.callback); if (vq->vq.callback) vq->vq.callback(&vq->vq); return IRQ_HANDLED; } static inline bool more_used(const struct vring_virtqueue *vq) { return vq->last_used_idx != virtio16_to_cpu(vq->vq.vdev, vq->vring.used->idx); }

    如果more_used返回false表示vq->last_used_idx== vring.used->idx,这说明当前没有uesd desc需要更新处理,所以中断直接返回。否则就调用对应queue的callback函数。而callback函数在之前virtnet_find_vqs的调用中被设置。rxq和txq的callback分别注册为了skb_recv_done和skb_xmit_done。

    callbacks[rxq2vq(i)] = skb_recv_done; callbacks[txq2vq(i)] = skb_xmit_done;

    所以接收队列和发送队列的中断处理主要就是分别调用skb_recv_done和skb_xmit_done函数。

    后端vhost_user的kick方式

    下面我们再看下后端vhost_user是如果kick前端的。首先是split ring的情况。

    vhost_vring_call_split (dpdk 1811)

    static __rte_always_inline void vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq) { /* Flush used->idx update before we read avail->flags. */ rte_smp_mb(); /* Don't kick guest if we don't reach index specified by guest. */ if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) { uint16_t old = vq->signalled_used; uint16_t new = vq->last_used_idx; if (vhost_need_event(vhost_used_event(vq), new, old) && (vq->callfd >= 0)) { vq->signalled_used = vq->last_used_idx; eventfd_write(vq->callfd, (eventfd_t) 1); } } else { /* Kick the guest if necessary. */ if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) && (vq->callfd >= 0)) eventfd_write(vq->callfd, (eventfd_t)1); } }

    split的方式比较简单,当开启event_idx的时候,就根据old,new以及前端消耗到的位置avail->ring[(vr)->size]来判断是否kick;如果没有开启event_idx时则只要前端没有设置VRING_AVAIL_F_NO_INTERRUPT就kick前端,注意不开启event_idx时,后端也不是无脑kick的,只有前端没设置VRING_AVAIL_F_NO_INTERRUPT时才会kick,至于前端什么时候设置VRING_AVAIL_F_NO_INTERRUPT,一会再分析。

    下面我们看packed ring的kick前端处理方式。在介绍packed处理之前我们先看其相关结构。

    struct vhost_virtqueue { union { struct vring_desc *desc; struct vring_packed_desc *desc_packed; }; union { struct vring_avail *avail; struct vring_packed_desc_event *driver_event; }; union { struct vring_used *used; struct vring_packed_desc_event *device_event; }; …… };

    在packed 方式中,由于uesd ring和avail ring不再需要,取而代之的是两个desc_event结构。在split方式中我们知道前后端控制是否相互通知是通过avail->flag和uesd->flag的设置来完成的,但packed中分别是通过driver_event和device_event来完成的。

    其中driver_event是后端只读的,是前端控制后端更新uesd desc时是否发送通知的,对应于split 方式的avail->flag;

    device_event是前端只读的,是后端控制前端更新avail desc时是否发送通知的,对应于split方式的uesd->flag。

    driver_event和device_event都是vring_packed_desc_event结构,其具体结构如下。

    struct vring_packed_desc_event { uint16_t off_wrap; uint16_t flags; };

    其中flag可以取三个值分别为:

    #define VRING_EVENT_F_ENABLE 0x0 #define VRING_EVENT_F_DISABLE 0x1 #define VRING_EVENT_F_DESC 0x2

    driver_event取值VRING_EVENT_F_DISABLE相当于split方式avail->flag设置VRING_AVAIL_F_NO_INTERRUPT,device_event取值VRING_EVENT_F_DISABLE相当于uesd->flag设置VRING_USED_F_NO_NOTIFY。

    剩下关键的就是最后这个VRING_EVENT_F_DESC flag了,官方spec是这么解释这个flag的: Enable events for a specific descriptor,(as specified by Descriptor Ring Change Event Offset/Wrap Counter),Only valid if VIRTIO_F_RING_EVENT_IDX has been negotiated. 可以看出这个flag的作用是指定某一个desc发生变化后触发通知,而这个flag生效的前提就是开启了event_idx。这个解释似乎还是不太直观,其实我们可以对比split 的event_idx处理方式来理解。我们知道split方式中,如果开启了event_idx,则前端需要通过avail->ring的最后一个desc告诉后端前端的uesd desc处理到哪里了,后端根据这个值来决定是否需要kick前端;而后端则使用uesd->ring的最后一个desc告诉前端后端的avail desc处理到哪里了,前端根据这个来决定是否来通知后端。但是在packed方式中没有了avail ring和uesd ring,那如果开启了event_idx前端后端如何才能告知对方自己处理到什么位置了呢?答案就是通过这里的off_wrap成员,可以看到这个成员是一个uint16_t,其中后15位指定了前后端处理到什么位置了,而最高位是为了解决翻转的Wrap Counter,而整个off_wrap字段有意义的前提就是flag被设置为了VRING_EVENT_F_DESC。

    下面我们对比packed和split方式后端是否开启通知,来了解下vring_packed_desc_event这三个flag的作用。首先看split的开关中断处理,即vhost_enable_notify_split :

    vhost_enable_notify_split(dpdk 1811)

    static inline void vhost_enable_notify_split(struct virtio_net *dev, struct vhost_virtqueue *vq, int enable) { /* 没有开启EVENT_IDX以vq->used->flags为准 */ if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) { if (enable) vq->used->flags &= ~VRING_USED_F_NO_NOTIFY; else vq->used->flags |= VRING_USED_F_NO_NOTIFY; } else {/* 开启EVENT_IDX后不再使用used->flags */ if (enable) vhost_avail_event(vq) = vq->last_avail_idx; } }

    可以看到在没有开启EVENT_IDX时,控制guest是否通知后端是用过控制vq->used->flags设置VRING_USED_F_NO_NOTIFY来实现的,但是如果开启了EVENT_IDX就不再使用used->flags,而是使用EVENT_IDX特有中断限速方式。

    下面再看packed的guest通知开启关闭方式实现vhost_enable_notify_packed。

    vhost_enable_notify_packed(dpdk 1811)

    static inline void vhost_enable_notify_packed(struct virtio_net *dev, struct vhost_virtqueue *vq, int enable) { uint16_t flags; if (!enable) { vq->device_event->flags = VRING_EVENT_F_DISABLE; return; } flags = VRING_EVENT_F_ENABLE; if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) { flags = VRING_EVENT_F_DESC; vq->device_event->off_wrap = vq->last_avail_idx | vq->avail_wrap_counter << 15; } rte_smp_wmb(); vq->device_event->flags = flags; }

    当不开启event_idx时,packed 方式使用device_event->flags是否设置VRING_EVENT_F_DISABLE代替split 方式设置的VRING_USED_F_NO_NOTIFY。如果开启了event_idx则device_event->flags一定会被设置为VRING_EVENT_F_DESC的,另外注意device_event->off_wrap的初始设置,低15位被设置为vq->last_avail_idx,高位被设置为vq->avail_wrap_counter。

    最后我们看一下packed方式下后端是如何决定是否通知前端的,即vhost_vring_call_packed函数的实现。

    vhost_vring_call_packed(dpdk 1811)

    static __rte_always_inline void vhost_vring_call_packed(struct virtio_net *dev, struct vhost_virtqueue *vq) { uint16_t old, new, off, off_wrap; bool signalled_used_valid, kick = false; /* Flush used desc update. */ rte_smp_mb(); /* 如果没有开启EVENT_IDX, 则以dev->driver_event->flags是否设置VRING_EVENT_F_DISABLE为准确定是否通知前端 */ if (!(dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX))) { if (vq->driver_event->flags != VRING_EVENT_F_DISABLE) kick = true; goto kick; } /* old 表示上一次通知前端时的used idx,new表示当前的uesd idx */ old = vq->signalled_used; new = vq->last_used_idx; vq->signalled_used = new; //注意和split的区别,split是kick前端后才进行signalled_used赋值,而这里是直接赋值 signalled_used_valid = vq->signalled_used_valid; vq->signalled_used_valid = true; /* 如果开启了event_idx但是driver_event->flags没有设置VRING_EVENT_F_DESC(正常情况是不存在的)则按照不开启event_idx时处理,及根据VRING_EVENT_F_DISABLE决定是否kick */ if (vq->driver_event->flags != VRING_EVENT_F_DESC) { if (vq->driver_event->flags != VRING_EVENT_F_DISABLE) kick = true; goto kick; } if (unlikely(!signalled_used_valid)) { kick = true; goto kick; } rte_smp_rmb(); off_wrap = vq->driver_event->off_wrap; off = off_wrap & ~(1 << 15); /* 从低15位获取desc的idx */ if (new <= old) old -= vq->size; /* 根据最高位的warp counter决定是否翻转 */ if (vq->used_wrap_counter != off_wrap >> 15) off -= vq->size; /* off表示前端当前处理的位置,根据off,new,old决定是否kick前端 */ if (vhost_need_event(off, new, old)) kick = true; kick: if (kick) eventfd_write(vq->callfd, (eventfd_t)1); }

    在不开启event_idx的时候,和开启event_idx但是driver_event->flags != VRING_EVENT_F_DESC时(正常情况不存在)都是按照前端是否设置VRING_EVENT_F_DISABLE来决定的。

    下面一点和split略有不同,就是vq->signalled_used赋值的位置,split是kick前端后条件满足时才进行signalled_used赋值,而这里是直接赋值,并且引入了一个signalled_used_valid变量。首先明确一下,这个变化和packed本身无关,其实split在后续patch也采用了类似处理(详见:http://mails.dpdk.org/archives/dev/2019-March/126684.html )。其中vq->signalled_used_valid的引入是为了在热迁移后以及前端驱动reload后,将vq->signalled_used的值标记为无效,正常情况vq->signalled_used记录的是上次通知前端时后端处理到的uesd idx,但是当热迁移发生或者前端驱动reload后这个值将不再有意义。signalled_used_valid是用在第一次更新used ring、virtio-net driver reload和live migration之后,所以在ring初始化的时候、guest发送VHOST_USER_GET_VRING_BASE的时候才赋值为false。

    另外将vq->signalled_used赋值位置前移,而不是只有满足kick条件才赋值,这个改动是为了一个优化。之前产生interrupt的条件是: last_used_idx – event_idx <= last_used_idx – signalled_used (event_idx为前端更新到的uesd idx)。以前的实现是产生interrupt的时候才会更新signalled_used,而现在是每次更新uesd_ring之后,不管是否有interrupt都更新它,现在的做法和kernel vhost_net的实现是一样的。这样做的好处是可以减少不必要的kick事件,以vm接收方向为例,如果guest kernel有NAPI,那么在guest以poll的方式收包的时候会停止更新event_idx。假设NAPI在Δt时间内都会poll(也就是不更新event_idx),那么对原来的实现而言,last_used_idx就是一直增长,而event_idx和signalled_used都是不变的;在这种情况下,产生Interrupt只有两种情况,一是guest NAPI停止poll、更新event_idx,另一种是last_used_idx超过2^16,比如(last_used_idx=13, signaled_used=65535, event_idx=10 )。而对现在的实现而言,last_used_idx也一直增长,event_idx也是固定不变的,不过(last_used_idx – signalled_used)是一个相对固定的值(其值等于这一次更新的used ring的descriptor数,我们称它为Δf);因此在这种情况下guest NAPI结束后,更新event_idx,但(last_used_idx – event_idx)的值未必小于等于Δf,所以不一定需要产生interrupt。所以,在guest如果没有NAPI,现在的实现与以前的相比可以减小(last_used_idx – signaled_used)的值,这样可以减小(last_used_idx – event_idx)小于等于这个值的的概率,从而减小产生interrupt的概率。

    最后一点就是packed方式是如何获取前端当前的消耗位置的,也就是event_idx的,是通过off_wrap的低15位以及高位的warp counter来完成的。

    有了以上背景知识,我们就开始分别分析前后端通知的四种情况。

    发送队列前端通知后端

    我们看发送队列(txq)通知后端的情况。首先明确对于发送队列,前端通知(kick)后端的作用是什么?发送队列kick后端就是为了告诉后端前端已经将数据放入了共享ring(具体就是avail desc)中,后端可以来取数据了。

    所以我们看前端virtio_net驱动的代码实现,在发送函数start_xmit的结尾有如下调用:

    if (kick || netif_xmit_stopped(txq)) virtqueue_kick(sq->vq);

    我们来看virtqueue_kick的实现。

    virtqueue_kick (kernel4.9 )

    bool virtqueue_kick(struct virtqueue *vq) { if (virtqueue_kick_prepare(vq)) return virtqueue_notify(vq); return true; }

    virtqueue_notify(kernel4.9)

    /** * virtqueue_notify - second half of split virtqueue_kick call. * @vq: the struct virtqueue * * This does not need to be serialized. * * Returns false if host notify failed or queue is broken, otherwise true. */ bool virtqueue_notify(struct virtqueue *_vq) { struct vring_virtqueue *vq = to_vvq(_vq); if (unlikely(vq->broken)) return false; /* Prod other side to tell it about changes. */ if (!vq->notify(_vq)) { vq->broken = true; return false; } return true; } EXPORT_SYMBOL_GPL(virtqueue_notify);

    可以看到真正发送kick通知的函数是virtqueue_notify,其调用的条件是virtqueue_kick_prepare,只有其返回true的时候才会kick后端。我们看下virtqueue_kick_prepare的实现。

    virtqueue_kick_prepare (kernel4.9 )

    bool virtqueue_kick_prepare(struct virtqueue *_vq) { struct vring_virtqueue *vq = to_vvq(_vq); u16 new, old; bool needs_kick; START_USE(vq); /* We need to expose available array entries before checking avail * event. */ virtio_mb(vq->weak_barriers); old = vq->avail_idx_shadow - vq->num_added; /*上次通知时的avail_idx*/ new = vq->avail_idx_shadow; /* 本次发送报文后的avail_idx */ vq->num_added = 0; if (vq->event) { /* 如果支持event_idx */ needs_kick = vring_need_event(virtio16_to_cpu(_vq->vdev, vring_avail_event(&vq->vring)), new, old); } else { needs_kick = !(vq->vring.used->flags & cpu_to_virtio16(_vq->vdev, VRING_USED_F_NO_NOTIFY)); } END_USE(vq); return needs_kick; }

    可以看到如果支持event_idx就更加old和new以及used->ring[(vr)->num]的范围来通知后端,详细过程可以参考之前写的event_idx相关文章。如果不支持event_idx就看used->flags是否设置了VRING_USED_F_NO_NOTIFY,如果没有设置就通知后端,否则就不通知。这里我们也可以看到,当开启event_idx后,VRING_USED_F_NO_NOTIFY也就失去了作用。此外VRING_USED_F_NO_NOTIFY这个flag是后端设置的,对前端是只读的,用来告诉前端是否需要通知后端。

    然后我们看后端处理,这里我们因为使用的是dpdk,而dpdk一般采用的polling模式,设置VRING_USED_F_NO_NOTIFY,所以前端是不会kick后端的,但是如果开启了event_idx呢?我们知道开启event_idx时,前端kick后端需要后端通过used->ring[(vr)->num]告诉前端当前avail desc消耗到什么位置了,但是当前dpdk vhost_user并没有这个处理,因此当前如果打开event_idx,后端dpdk依然是无法收到中断的。

    发送队列后端通知前端

    我们再看发送队列后端通知前端的过程。发送队列为什么要通知前端呢?因为后端将前端放入avail ring中的数据取出后需要告诉前端对应的数据已经被取走了,你可以把相关数据buffer释放了。而究竟释放那些buffer是取决于uesd ring的,所以通知前端本质上是为了告诉前端uesd ring有更新了。

    但是有一点要注意,我们知道uesd ring是前后端共享的,所以如果后端更新了uesd ring,即使不通知前端,前端应该也是可以感知到的。所以前端释放buffer不一定要依赖后端kick。事实上也的确如此。

    我们还看发送start_xmit,

    start_xmit (kernel4.9 )

    static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); int qnum = skb_get_queue_mapping(skb); struct send_queue *sq = &vi->sq[qnum]; int err; struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum); bool kick = !skb->xmit_more; /* Free up any pending old buffers before queueing new ones. */ free_old_xmit_skbs(sq); /* timestamp packet in software */ skb_tx_timestamp(skb); /* Try to transmit */ err = xmit_skb(sq, skb); /* This should not happen! */ if (unlikely(err)) { dev->stats.tx_fifo_errors++; if (net_ratelimit()) dev_warn(&dev->dev, "Unexpected TXQ (%d) queue failure: %d\n", qnum, err); dev->stats.tx_dropped++; dev_kfree_skb_any(skb); return NETDEV_TX_OK; } /* Don't wait up for transmitted skbs to be freed. */ skb_orphan(skb); nf_reset(skb); /* If running out of space, stop queue to avoid getting packets that we * are then unable to transmit. * An alternative would be to force queuing layer to requeue the skb by * returning NETDEV_TX_BUSY. However, NETDEV_TX_BUSY should not be * returned in a normal path of operation: it means that driver is not * maintaining the TX queue stop/start state properly, and causes * the stack to do a non-trivial amount of useless work. * Since most packets only take 1 or 2 ring slots, stopping the queue * early means 16 slots are typically wasted. */ if (sq->vq->num_free < 2+MAX_SKB_FRAGS) { netif_stop_subqueue(dev, qnum); if (unlikely(!virtqueue_enable_cb_delayed(sq->vq))) { /* More just got used, free them then recheck. */ free_old_xmit_skbs(sq); if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) { netif_start_subqueue(dev, qnum); virtqueue_disable_cb(sq->vq); } } } if (kick || netif_xmit_stopped(txq)) virtqueue_kick(sq->vq); return NETDEV_TX_OK; }

    其中开头部分就首先调用free_old_xmit_skbs根据uesd ring将之前的buffer释放掉。具体流程如下图所示。 其中detach_buf负责更加uesd ring来将对应的desc释放掉,并将对应地址dma unmmap。另外要注意的是virtqueue_get_buf函数的最后有如下调用:

    virtqueue_get_buf (kernel4.9 )

    void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len) { struct vring_virtqueue *vq = to_vvq(_vq); void *ret; unsigned int i; u16 last_used; last_used = (vq->last_used_idx & (vq->vring.num - 1)); i = virtio32_to_cpu(_vq->vdev, vq->vring.used->ring[last_used].id); *len = virtio32_to_cpu(_vq->vdev, vq->vring.used->ring[last_used].len); …… /* detach_buf clears data, so grab it now. */ ret = vq->desc_state[i].data; detach_buf(vq, i); vq->last_used_idx++; /* If we expect an interrupt for the next entry, tell host * by writing event index and flush out the write before * the read in the next get_buf call. */ if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT)) virtio_store_mb(vq->weak_barriers, &vring_used_event(&vq->vring), cpu_to_virtio16(_vq->vdev, vq->last_used_idx)); END_USE(vq); return ret; }

    当前端没有设置VRING_AVAIL_F_NO_INTERRUPT时,会更新avail->ring[(vr)->num],这也是后端开启event_idx时kick前端的条件。VRING_AVAIL_F_NO_INTERRUPT是前端设置,后端只读的。为什么要更新avail->ring[(vr)->num]呢?avail->ring[(vr)->num]中记录的前端已经处理到那个uesd idx了,因为可以及时告诉后端前端处理到什么位置了,后端来根据情况决定是否需要kick前端。

    那现在问题又回来了,对于txq既然前端不需要后端kick也能释放buffer,那后端kick有什么用呢?我们再回头看一下start_xmit发送函数,有一下逻辑。

    start_xmit (kernel4.9 )

    static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); int qnum = skb_get_queue_mapping(skb); struct send_queue *sq = &vi->sq[qnum]; int err; struct netdev_queue *txq = netdev_get_tx_queue(dev, qnum); bool kick = !skb->xmit_more; /* Free up any pending old buffers before queueing new ones. */ free_old_xmit_skbs(sq); /* timestamp packet in software */ skb_tx_timestamp(skb); /* Try to transmit */ err = xmit_skb(sq, skb); /* This should not happen! */ if (unlikely(err)) { dev->stats.tx_fifo_errors++; if (net_ratelimit()) dev_warn(&dev->dev, "Unexpected TXQ (%d) queue failure: %d\n", qnum, err); dev->stats.tx_dropped++; dev_kfree_skb_any(skb); return NETDEV_TX_OK; } /* Don't wait up for transmitted skbs to be freed. */ skb_orphan(skb); nf_reset(skb); /* If running out of space, stop queue to avoid getting packets that we * are then unable to transmit. * An alternative would be to force queuing layer to requeue the skb by * returning NETDEV_TX_BUSY. However, NETDEV_TX_BUSY should not be * returned in a normal path of operation: it means that driver is not * maintaining the TX queue stop/start state properly, and causes * the stack to do a non-trivial amount of useless work. * Since most packets only take 1 or 2 ring slots, stopping the queue * early means 16 slots are typically wasted. */ if (sq->vq->num_free < 2+MAX_SKB_FRAGS) { netif_stop_subqueue(dev, qnum); if (unlikely(!virtqueue_enable_cb_delayed(sq->vq))) { /* More just got used, free them then recheck. */ free_old_xmit_skbs(sq); if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) { netif_start_subqueue(dev, qnum); virtqueue_disable_cb(sq->vq); } } } if (kick || netif_xmit_stopped(txq)) virtqueue_kick(sq->vq); return NETDEV_TX_OK; }

    当前端发送速率过快,从而vq->num_free较少时会调用netif_stop_subqueue(将队列状态设置为__QUEUE_STATE_DRV_XOFF),这样队列的start_xmit函数下次在__dev_queue_xmit中就不会被调用。要想打破这样一个状态,就需要后端的kick了。这里还有一个十分关键的函数,就这在stop_queue之后调用的virtqueue_enable_cb_delayed函数。这个函数中有着至关重要的一个操作,如下:

    virtqueue_enable_cb_delayed

    bool virtqueue_enable_cb_delayed(struct virtqueue *_vq) { struct vring_virtqueue *vq = to_vvq(_vq); u16 bufs; START_USE(vq); /* We optimistically turn back on interrupts, then check if there was * more to do. */ /* Depending on the VIRTIO_RING_F_USED_EVENT_IDX feature, we need to * either clear the flags bit or point the event index at the next * entry. Always update the event index to keep code simple. */ /* 取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来 */ if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) { vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT; if (!vq->event) vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow); } /* TODO: tune this threshold */ bufs = (u16)(vq->avail_idx_shadow - vq->last_used_idx) * 3 / 4; /* 更新avail->ring[(vr)->num]以供event_idx使用 */ virtio_store_mb(vq->weak_barriers, &vring_used_event(&vq->vring), cpu_to_virtio16(_vq->vdev, vq->last_used_idx + bufs)); if (unlikely((u16)(virtio16_to_cpu(_vq->vdev, vq->vring.used->idx) - vq->last_used_idx) > bufs)) { END_USE(vq); return false; } END_USE(vq); return true; }

    首先这个函数做了一个关键操作,就是取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来。那么VRING_AVAIL_F_NO_INTERRUPT这个flag是什么时候被置上的呢?这个我们后面回答。另外一点就是会更新更新avail->ring[(vr)->num]告诉前端uesd desc当前消耗到哪里了,但注意这里不是更新为vq->last_used_idx,而是还加了一个bufs,为的就是让后端延时一会再kick前端。

    下面我们看一下dpdk后端kick前端的时机,我们以split方式为例说明。

    virtio_dev_tx_split(dpdk18.11)

    static __rte_always_inline uint16_t virtio_dev_tx_split(struct virtio_net *dev, struct vhost_virtqueue *vq, struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count) { uint16_t i; uint16_t free_entries; if (unlikely(dev->dequeue_zero_copy)) { struct zcopy_mbuf *zmbuf, *next; for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list); zmbuf != NULL; zmbuf = next) { next = TAILQ_NEXT(zmbuf, next); if (mbuf_is_consumed(zmbuf->mbuf)) { update_shadow_used_ring_split(vq, zmbuf->desc_idx, 0); TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next); restore_mbuf(zmbuf->mbuf); rte_pktmbuf_free(zmbuf->mbuf); put_zmbuf(zmbuf); vq->nr_zmbuf -= 1; } } if (likely(vq->shadow_used_idx)) { /*如果是零拷贝方式,则每次接收前检查之前已经dma完成的报文,更新uesd ring,kick前端*/ flush_shadow_used_ring_split(dev, vq); vhost_vring_call_split(dev, vq); } } ...... for (i = 0; i < count; i++) { ...... err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i], mbuf_pool); ...... } vq->last_avail_idx += i; if (likely(dev->dequeue_zero_copy == 0)) { do_data_copy_dequeue(vq); if (unlikely(i < count)) vq->shadow_used_idx = i; if (likely(vq->shadow_used_idx)) { /* 更新used ring,kick前端 */ flush_shadow_used_ring_split(dev, vq); vhost_vring_call_split(dev, vq); } } return i; }

    如果是零拷贝,则在后端接受逻辑的开始,判断上一次接受的报文是否dma完成,如果完成则更新uesd ring,调用vhost_vring_call_split kick前端。如果不是零拷贝模式,则在将报文从desc拷贝出来后,更新uesd ring,调用vhost_vring_call_split kick前端。关于vhost_vring_call_split的实现在前面已经介绍过了,这里不再重复。

    下面看前端收到后端的中断是如何处理的。前面已经介绍过了,对于发送队列,其注册的中断回调函数中会调用skb_xmit_done。

    skb_xmit_done(kernel 4.9)

    static void skb_xmit_done(struct virtqueue *vq) { struct virtnet_info *vi = vq->vdev->priv; /* Suppress further interrupts. */ virtqueue_disable_cb(vq); /* We were probably waiting for more output buffers. */ netif_wake_subqueue(vi->dev, vq2txq(vq)); }

    其中virtqueue_disable_cb会将vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT,这样后端就不会发生kick到前端了(不开启event_idx的情况),然后调用netif_wake_subqueue唤醒被stop的queue(清除__QUEUE_STATE_DRV_XOFF 的state)。

    virtqueue_disable_cb(kernel)

    void virtqueue_disable_cb(struct virtqueue *_vq) { struct vring_virtqueue *vq = to_vvq(_vq); if (!(vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT)) { vq->avail_flags_shadow |= VRING_AVAIL_F_NO_INTERRUPT; if (!vq->event) vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow); } }

    这里又有个疑问,唤醒stop 状态的queue容易理解,可以设置上了VRING_AVAIL_F_NO_INTERRUPT后,什么时候取消呢?其实前面我们已经介绍过了,就是下次stop queue时调用virtqueue_enable_cb_delayed的处理中。所以我们可以看到,即使不开启event_idx,也不是每次uesd 变化都会kick前端的,而是只有queue stop后才会kick,正常状态前端是会设置VRING_AVAIL_F_NO_INTERRUPT不让后端kick的。

    接收队列后端通知前端

    对于接收队列,后端会在将mbuf数据拷贝到avail desc中后,更新uesd ring,然后kick前端。kick前端的目的就是告诉前端,我有数据发送给你了(更新了uesd ring),你可以来取数据了。我们以split方式的接收方向为例。其接受逻辑在virtio_dev_rx_split中实现。

    virtio_dev_rx_split(dpdk 18.11)

    static __rte_always_inline uint32_t virtio_dev_rx_split(struct virtio_net *dev, struct vhost_virtqueue *vq, struct rte_mbuf **pkts, uint32_t count) { uint32_t pkt_idx = 0; uint16_t num_buffers; struct buf_vector buf_vec[BUF_VECTOR_MAX]; uint16_t avail_head; rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]); avail_head = *((volatile uint16_t *)&vq->avail->idx); for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen; uint16_t nr_vec = 0; /* 为拷贝当前mbuf后续预留avail desc */ if (unlikely(reserve_avail_buf_split(dev, vq, pkt_len, buf_vec, &num_buffers, avail_head, &nr_vec) < 0)) { VHOST_LOG_DEBUG(VHOST_DATA, "(%d) failed to get enough desc from vring\n", dev->vid); vq->shadow_used_idx -= num_buffers; break; } rte_prefetch0((void *)(uintptr_t)buf_vec[0].buf_addr); VHOST_LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n", dev->vid, vq->last_avail_idx, vq->last_avail_idx + num_buffers); /* 拷贝mbuf到avail desc */ if (copy_mbuf_to_desc(dev, vq, pkts[pkt_idx], buf_vec, nr_vec, num_buffers) < 0) { vq->shadow_used_idx -= num_buffers; break; } /* 更新last_avail_idx */ vq->last_avail_idx += num_buffers; } /* 小包的批处理拷贝 */ do_data_copy_enqueue(dev, vq); if (likely(vq->shadow_used_idx)) { flush_shadow_used_ring_split(dev, vq); /* 更新used ring */ vhost_vring_call_split(dev, vq); /* 通知前端 */ } return pkt_idx; }

    其中kick前端的处理在最后的vhost_vring_call_split函数中,这个我们在“后端vhost_user的kick方式”中已经介绍过,这里就不再重复了。

    然后我们看前端的通知处理。根据前面的介绍,接受队列前端注册的中断处理函数最终会调用到skb_recv_done。

    skb_recv_done(kernel 4.9)

    static void skb_recv_done(struct virtqueue *rvq) { struct virtnet_info *vi = rvq->vdev->priv; struct receive_queue *rq = &vi->rq[vq2rxq(rvq)]; /* Schedule NAPI, Suppress further interrupts if successful. */ if (napi_schedule_prep(&rq->napi)) { virtqueue_disable_cb(rvq); __napi_schedule(&rq->napi); } }

    我们看到这个函数主要工作就是调用virtqueue_disable_cb给vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT从而禁止后端发送中断(不开启event_idx的情况),然后唤起NAPI处理。所以在NAPI的情况后端通知是被关闭的。那么这个flag什么时候会被打开呢?答案就是在virtio_net的NAPI处理逻辑中,即virtnet_poll函数。

    virtnet_poll(kernel 4.9)

    static int virtnet_poll(struct napi_struct *napi, int budget) { struct receive_queue *rq = container_of(napi, struct receive_queue, napi); unsigned int r, received; received = virtnet_receive(rq, budget); /* Out of packets? */ if (received < budget) { r = virtqueue_enable_cb_prepare(rq->vq); napi_complete_done(napi, received); if (unlikely(virtqueue_poll(rq->vq, r)) && napi_schedule_prep(napi)) { virtqueue_disable_cb(rq->vq); __napi_schedule(napi); } } return received; }

    在NAPI处理流程中如果received < budget,证明本轮数据接收已经比较少了,NAPI过程可能要退出了,这时调用virtqueue_enable_cb_prepare将之前的VRING_AVAIL_F_NO_INTERRUPT取消,从NAPI模式进入中断模式。

    virtqueue_enable_cb_prepare(kernel 4.9)

    unsigned virtqueue_enable_cb_prepare(struct virtqueue *_vq) { struct vring_virtqueue *vq = to_vvq(_vq); u16 last_used_idx; START_USE(vq); /* We optimistically turn back on interrupts, then check if there was * more to do. */ /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to * either clear the flags bit or point the event index at the next * entry. Always do both to keep code simple. */ if (vq->avail_flags_shadow & VRING_AVAIL_F_NO_INTERRUPT) { vq->avail_flags_shadow &= ~VRING_AVAIL_F_NO_INTERRUPT; if (!vq->event) vq->vring.avail->flags = cpu_to_virtio16(_vq->vdev, vq->avail_flags_shadow); } vring_used_event(&vq->vring) = cpu_to_virtio16(_vq->vdev, last_used_idx = vq->last_used_idx); END_USE(vq); return last_used_idx; }

    注意这个函数除了取消VRING_AVAIL_F_NO_INTERRUPT设置之外,还会更新avail->ring[(vr)->num],以供开启event_idx时后端使用。

    接收队列前端通知后端

    下面看接收方向前端通知的过程。首先要清除接收方向前端通知后端的目的,那就是告诉后端avail ring已经更新了(有了更多空buffer),你可以继续放入更多数据了。从这里我们也可以看出,前端通知后端,无论发送还是接收方向,都是告诉后端有了更多的avail desc,而后端通知前端,都是告诉前端有了更多的uesd 的desc。然后我们看前端通知后端的时机,要想知道前端再何时通知后端,我们需要对前端的数据接收流程有个清晰的认识。下面这个图描述了前端vhost_net接收数据的过程。 而通知后端的时机就在try_fill_recv函数调用中。

    try_fill_recv(kernel 4.9)

    static bool try_fill_recv(struct virtnet_info *vi, struct receive_queue *rq, gfp_t gfp) { int err; bool oom; gfp |= __GFP_COLD; /* 针对三种情况分别给desc注入对应的buffer,并更新avail idx */ do { if (vi->mergeable_rx_bufs) err = add_recvbuf_mergeable(rq, gfp); else if (vi->big_packets) err = add_recvbuf_big(vi, rq, gfp); else err = add_recvbuf_small(vi, rq, gfp); oom = err == -ENOMEM; if (err) break; } while (rq->vq->num_free); virtqueue_kick(rq->vq); /* kick 后端 */ return !oom; }

    这个函数首先会根据是否支持mergeable已经是否支持收大包来向avail desc注入对应的空buffer。每种情况下注入buffer产生的desc chain长度是不同的。这里不是重点,我们就不展开了。下面看关键的virtqueue_kick函数。

    virtqueue_kick(kernel 4.9)

    bool virtqueue_kick(struct virtqueue *vq) { if (virtqueue_kick_prepare(vq)) return virtqueue_notify(vq); return true; }

    其中真正向后端发送通知的是virtqueue_notify,但是首先要通过virtqueue_kick_prepare的判断。virtqueue_kick_prepare这个函数我们之前已经介绍过了,如果不开启event_idx时,会根据vring.used->flags是否设置VRING_USED_F_NO_NOTIFY来决定是否kick后端,而开启event_idx时,则会根据后端填入(vr)->used->ring[(vr)->num]中的后端消耗位置来决定是否kick。

    同样如果后端使用的是dpdk vhost_user,那么当前后端是不会写(vr)->used->ring[(vr)->num]告诉前端自己的avail使用位置的,如果开启了event_idx后端还是无法收到中断的。那如果dpdk使用中断模式,这里收不到中断是否会有问题呢?我们换位思考一下,对比一下“发送队列后端通知前端”的场景,在“发送队列后端通知前端”中,如果后端不通知前端,那么前端一旦感觉到没有可用的avail desc后就会stop queue,之后就无法被唤醒了。而这里的场景,vhost_user后端没有类似stop queue的操作,所以即使收不到前端的中断也没有问题。但是在“发送队列前端通知后端”的场景中,如果dpdk采用中断模式切开启了event_idx,那么vhost_user就会因为收不到中断而无法取出前端发送的报文。所以对应想使用中断模式,且开启event_idx的场景,vhost_user需要添加对(vr)->used->ring[(vr)->num]的处理。

    原文链接:http://m.blog.chinaunix.net/uid-28541347-id-5819699.html

    Processed: 0.011, SQL: 9