DPDK vhost-user之mergeable 特性(七)

    技术2022-07-11  94

    vhost_user在收包时(将数据包发往vm内部)会调用rte_vhost_enqueue_burst函数,这个函数的实现如下:

    rte_vhost_enqueue_burst

    uint16_t rte_vhost_enqueue_burst(int vid, uint16_t queue_id, struct rte_mbuf **pkts, uint16_t count) { struct virtio_net *dev = get_device(vid); if (!dev) return 0; if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF)) return virtio_dev_merge_rx(dev, queue_id, pkts, count); else return virtio_dev_rx(dev, queue_id, pkts, count); }

    我们可以看到根据vhost_user后端设备是否支持VIRTIO_NET_F_MRG_RXBUF特性会调用不同函数,其中不支持时调用的virtio_dev_rx之前已经分析过,这里看下virtio_dev_merge_rx的具体实现。

    virtio_dev_merge_rx

    static inline uint32_t __attribute__((always_inline)) virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id, struct rte_mbuf **pkts, uint32_t count) { struct vhost_virtqueue *vq; uint32_t pkt_idx = 0; uint16_t num_buffers; struct buf_vector buf_vec[BUF_VECTOR_MAX]; uint16_t avail_head; /*获取对应的queue*/ vq = dev->virtqueue[queue_id]; if (unlikely(vq->enabled == 0)) return 0; count = RTE_MIN((uint32_t)MAX_PKT_BURST, count); if (count == 0) return 0; /*avail->ring[vq->last_avail_idx & (vq->size - 1)]记录着首个可用的desc index, *avail->ring[vq->avail->idx & (vq->size - 1)]记录着最后一个可用的desc index*/ rte_prefetch0(&vq->avail->ring[vq->last_avail_idx & (vq->size - 1)]); vq->shadow_used_idx = 0; /*avail->ring[avail_head]记录着最后一个可用的desc index*/ avail_head = *((volatile uint16_t *)&vq->avail->idx); /*遍历每一个要发送的数据包*/ for (pkt_idx = 0; pkt_idx < count; pkt_idx++) { uint32_t pkt_len = pkts[pkt_idx]->pkt_len + dev->vhost_hlen; /*预留足够的desc来存放mbuf,使用buf_vec来记录,每个buf_vec对应一个desc, 所以num_buffers就是存放这个数据包所需的desc chain的个数*/ if (unlikely(reserve_avail_buf_mergeable(dev, vq, pkt_len, buf_vec, &num_buffers, avail_head) < 0)) { LOG_DEBUG(VHOST_DATA, "(%d) failed to get enough desc from vring\n", dev->vid); vq->shadow_used_idx -= num_buffers; break; } LOG_DEBUG(VHOST_DATA, "(%d) current index %d | end index %d\n", dev->vid, vq->last_avail_idx, vq->last_avail_idx + num_buffers); /*根据buf_vec中记录的desc信息,将当前数据包(mbuf)拷贝到这些desc中*/ if (copy_mbuf_to_desc_mergeable(dev, pkts[pkt_idx], buf_vec, num_buffers) < 0) { vq->shadow_used_idx -= num_buffers; break; } /*更新last_avail_idx*/ vq->last_avail_idx += num_buffers; } if (likely(vq->shadow_used_idx)) { flush_shadow_used_ring(dev, vq); /* flush used->idx update before we read avail->flags. */ rte_mb(); /* Kick the guest if necessary. */ if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT) && (vq->callfd >= 0)) eventfd_write(vq->callfd, (eventfd_t)1); } return pkt_idx; }

    这里主要引入了一个struct buf_vector的数组,这个数组和desc是一一对应的,其关系如下所示: 那么为什么要引入这个buf_vector数组呢?首先我们知道所谓merge_rx的功能,主要是为了vm接收大包(实现LRO的功能)。一般来说,一个mbuf会对应转换成为一个desc,但是当mbuf稍微大点,一个desc无法装下怎么办呢? 这时不要忘了desc中的next成员,即desc也是可以形成chain的,如果当前desc无法装下mbuf,那么就用vq->desc[desc->next]来继续存放。如下图: desc1,desc3,desc5形成一个chain,mbuf由desc1和desc3存放。那么如果mbuf再大点呢?达到desc1,desc3,desc5三个desc都无法存放呢?这时如果没有打开VIRTIO_NET_F_MRG_RXBUF调用virtio_dev_rx就会接收出错了(尽管还有其他可用的desc,但由于不在一个chain,所以也不会使用)。但是如果打开了VIRTIO_NET_F_MRG_RXBUF,则会尝试找其他其他的chain。

    还以上图为例,desc数组中共有两个chain1:desc1->desc3->desc5; chain2:desc2->desc4->desc6,那么当chain1无法存下这个mbuf数据时,mbuf剩下的数据将由chain2存放。主要:一个mbuf绝不可能按照desc1àdesc2的顺序存放,因为desc1和desc2属于不同的chain,只有当前chain使用完才能使用另一个chain。

    由于desc数组的顺序不一定是按照chain的顺序组织的(如上图),所以为了方便后续的mbuf到各个desc的拷贝操作,我们增加了buf_vector这个数组,用它来记录拷贝mbuf的desc顺序。如下图所示: 这样我们拷贝mbuf时就可以按照buf_vector1到buf_vector6依次拷贝到对应的desc中了。

    另外我们注意一点,在函数的最后会对last_avail_idx进行更新:vq->last_avail_idx += num_buffers; 注意这里的num_buffers不是这个mbuf使用的desc个数,而是使用的desc chain个数。还以上面mbuf使用两个chain为例,last_avail_idx需要加2,如下图所示,如果之前last_avail_idx为1的话,这里就要更新为3了。 这里容易混淆的一点是:avail->ring中存放的index不是和desc一一对应的,而是和desc chain一一对应的,即只存放desc chain header的index。所以avail->idx-last_avail_id也不是可用desc的个数,而是可用desc chain的个数。

    有了以上背景,我们再看reserve_avail_buf_mergeable这个函数就容易理解多了。这个函数的作用就是:预留足够的desc来存放mbuf,同时使用buf_vec来记录,每个buf_vec对应一个desc,如果当前所有可用的desc都无法装得下这个mbuf则返错。

    reserve_avail_buf_mergeable

    static inline int reserve_avail_buf_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq, uint32_t size, struct buf_vector *buf_vec, uint16_t *num_buffers, uint16_t avail_head) { uint16_t cur_idx; uint32_t vec_idx = 0; uint16_t tries = 0; uint16_t head_idx = 0; uint16_t len = 0; /*num_buffers为要使用的desc chain的个数*/ *num_buffers = 0; cur_idx = vq->last_avail_idx; /*每次遍历一个desc chain,遍历过的chain加起来可以存放下这个mbuf*/ while (size > 0) { if (unlikely(cur_idx == avail_head)) return -1; /*fill_vec_buf的作用遍历下一个desc chain,用来存放mbuf,然后buf_vec记录这些desc的信息*/ if (unlikely(fill_vec_buf(dev, vq, cur_idx, &vec_idx, buf_vec, &head_idx, &len) < 0)) return -1; len = RTE_MIN(len, size); update_shadow_used_ring(vq, head_idx, len); size -= len; cur_idx++; tries++; *num_buffers += 1; /* * if we tried all available ring items, and still * can't get enough buf, it means something abnormal * happened. */ /*当尝试次数大于了vq->size,说明所有可用的desc都被扫描过了, 即所有可用的desc加起来都无法满足这个mbuf*/ if (unlikely(tries >= vq->size)) return -1; } return 0; }

    这里需要注意的一点就是num_buffers不是使用desc的个数,而是使用的desc chain个数。而fill_vec_buf负责每次挑选一个desc chain填入对应的buf_vector,注意传入参数head_idx每次随着被desc填充而被修改。下面看fill_vec_buf。

    fill_vec_buf

    static inline int __attribute__((always_inline)) fill_vec_buf(struct virtio_net *dev, struct vhost_virtqueue *vq, uint32_t avail_idx, uint32_t *vec_idx, struct buf_vector *buf_vec, uint16_t *desc_chain_head, uint16_t *desc_chain_len) { uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)]; uint32_t vec_id = *vec_idx; uint32_t len = 0; struct vring_desc *descs = vq->desc; *desc_chain_head = idx; /* VRING_DESC_F_INDIRECT处理 */ if (vq->desc[idx].flags & VRING_DESC_F_INDIRECT) { descs = (struct vring_desc *)(uintptr_t) gpa_to_vva(dev, vq->desc[idx].addr); if (unlikely(!descs)) return -1; idx = 0; } /*遍历这个desc chain知道结束,将这个chain中的desc信息记录到buf_vec中, *chain能存放的数据长度赋值给desc_chain_len*/ while (1) { if (unlikely(vec_id >= BUF_VECTOR_MAX || idx >= vq->size)) return -1; len += descs[idx].len; buf_vec[vec_id].buf_addr = descs[idx].addr; buf_vec[vec_id].buf_len = descs[idx].len; buf_vec[vec_id].desc_idx = idx; vec_id++; if ((descs[idx].flags & VRING_DESC_F_NEXT) == 0) break; idx = descs[idx].next; } *desc_chain_len = len; *vec_idx = vec_id; return 0; }

    这个函数比较简单,就是遍历一个desc chain到结束为止,然后将这个chain的信息存放在buf_vec中,将这个chain能存放的数据长度信息返回,以供上层判断是否还需要再找下一个chain填充。

    这里有一个VRING_DESC_F_INDIRECT 的desc特性的判断,这里顺便说下direct desc和indirect desc的区别。通常的desc->addr指向的是存放数据的page,这样的desc叫做direct desc。但是indirect desc的desc->addr指向的是一个direct desc数组,如下图所示。(主要indirect desc指向的只能是direct desc,即不能再继续级联下去) 最后在开启mergeable后virtio_dev_merge_rx的调用和普通模式virtio_dev_rx的调用还有点不同。在virtio_dev_rx中,每次将mbuf中的数据存放在一个desc后都会更新vq->used->ring和vq->last_used_idx,即告诉前端那些desc中已经存放了数据。但在virtio_dev_merge_rx中却没有看到这个过程。其实这个过程是有的,我们注意到在reserve_avail_buf_mergeable中每次调用完fill_vec_buf就会调用一下update_shadow_used_ring,我们看一下其实现。

    static inline void __attribute__((always_inline)) update_shadow_used_ring(struct vhost_virtqueue *vq, uint16_t desc_idx, uint16_t len) { uint16_t i = vq->shadow_used_idx++; vq->shadow_used_ring[i].id = desc_idx; vq->shadow_used_ring[i].len = len; }

    这里会更新shadow_used_idx和shadow_used_ring。这里引入的shadow_used_idx和shadow_used_ring其实是为了最后批处理更新vq->used->ring,通常更新vq->used->ring要先找到对应的idx,在更新vq->used->ring[idx]。如果对于要使用多个desc chain的情况,这样每次更新就会造成较大的访存开销。那我们看看使用shadow_used_idx和shadow_used_ring会怎么更新。在virtio_dev_merge_rx中拷贝完mbuf数据后,最后会调用flush_shadow_used_ring函数。

    flush_shadow_used_ring

    static inline void __attribute__((always_inline)) flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq) { uint16_t used_idx = vq->last_used_idx & (vq->size - 1); /*vq->shadow_used_idx存放的是本次使用的desc chain个数*/ /*如果used_idx + vq->shadow_used_idx没有产生环形队列回绕*/ if (used_idx + vq->shadow_used_idx <= vq->size) { do_flush_shadow_used_ring(dev, vq, used_idx, 0, vq->shadow_used_idx); } else { uint16_t size; /*used_idx + vq->shadow_used_idx产生了回绕,需要分首尾两部分更新*/ /* update used ring interval [used_idx, vq->size] */ size = vq->size - used_idx; do_flush_shadow_used_ring(dev, vq, used_idx, 0, size); /* update the left half used ring interval [0, left_size] */ do_flush_shadow_used_ring(dev, vq, 0, size, vq->shadow_used_idx - size); } vq->last_used_idx += vq->shadow_used_idx; rte_smp_wmb(); /*更新vq->used->idx*/ *(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx; /*更新热迁移位图*/ vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx), sizeof(vq->used->idx)); }

    我们再看下do_flush_shadow_used_ring的处理。

    do_flush_shadow_used_ring

    static inline void __attribute__((always_inline)) do_flush_shadow_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq, uint16_t to, uint16_t from, uint16_t size) { rte_memcpy(&vq->used->ring[to], &vq->shadow_used_ring[from], size * sizeof(struct vring_used_elem)); vhost_log_used_vring(dev, vq, offsetof(struct vring_used, ring[to]), size * sizeof(struct vring_used_elem)); }

    可以看到由于之前shadow_used_ring中有相关记录,所以这里可以一次性拷贝到uesd_ring中,减少了内存访问次数。

    原文链接:http://blog.chinaunix.net/uid-8574039-id-5826405.html

    Processed: 0.008, SQL: 9