DPDK vhost-user之mergeable对接收端(guest)的影响(八)

    技术2022-07-11  93

    这里在分析一下guset内部对于开启mergeable接收会有什么影响,顺便分析一下开启GUEST_GSO/GUEST_TSO时,guset内部的接收流程。

    首先我们从vhost-user,发送端分析一下,两种情况是如何更新used->ring的。

    mergeable情况

    在reserve_avail_buf_mergeable(dpdk代码)函数中有一下逻辑:

    /*fill_vec_buf的作用是找一个desc chain,用来存放mbuf,然后buf_vec记录这些desc的信息*/ if (unlikely(fill_vec_buf(dev, vq, cur_idx, &vec_idx, buf_vec, &head_idx, &len) < 0)) return -1; len = RTE_MIN(len, size); update_shadow_used_ring(vq, head_idx, len);

    这里我们在“vhost_user mergeable特性”中已经分析过,fill_vec_buf是遍历当前desc chain,然后将这个chain的信息记录在buf_vec中,同时len中返回的是这个chain能存放的数据长度。

    在update_shadow_used_ring中会将这个长度赋值给vq->shadow_used_ring[i].len,如下:

    static inline void __attribute__((always_inline)) update_shadow_used_ring(struct vhost_virtqueue *vq, uint16_t desc_idx, uint16_t len) { uint16_t i = vq->shadow_used_idx++; vq->shadow_used_ring[i].id = desc_idx; vq->shadow_used_ring[i].len = len; }

    最后在flush_shadow_used_ring中vq->shadow_used_ring[i].len最终被赋值给vq->used->ring[i].len。也就是vq->used->ring[i].len存放的是一个chain的长度。

    开启GUEST_GSO/GUEST_TSO (不开启mergeable)

    这种情况vhost_user后端不会去特殊处理,和普通报文一样。在virtio_dev_rx的处理逻辑中有如下代码:

    for (i = 0; i < count; i++) { used_idx = (start_idx + i) & (vq->size - 1); desc_indexes[i] = vq->avail->ring[used_idx]; vq->used->ring[used_idx].id = desc_indexes[i]; /* vq->used->ring[used_idx].len 存放的是整个数据包长加上virtio header的长度*/ vq->used->ring[used_idx].len = pkts[i]->pkt_len + dev->vhost_hlen; vhost_log_used_vring(dev, vq, offsetof(struct vring_used, ring[used_idx]), sizeof(vq->used->ring[used_idx])); }

    这里vq->used->ring[used_idx].len 存放的是整个数据包长加上virtio header的长度,因为在非mergeable情况,一个数据包要么被一个chain装完,要么丢弃,所以只有发送成功,就不存在一个chain只装了部分数据的情况。

    下面我们再看guset接收端代码,就kernel(3.10)的virtio_net代码。以下是guset的收包逻辑:

    virtnet_poll

    static int virtnet_poll(struct napi_struct *napi, int budget) { /*……*/ while (received < budget && /*virtqueue_get_buf取出要接收的skb*/ (buf = virtqueue_get_buf(rq->vq, &len)) != NULL) { receive_buf(rq, buf, len); /*真正的接收处理操作,最终调用netif_receive_skb*/ --rq->num; received++; } /*……*/ }

    我们只看和我们分析有关的逻辑。首先调用virtqueue_get_buf从队列中取出一个mbuf,并返回一个长度len。

    void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len) { struct vring_virtqueue *vq = to_vvq(_vq); void *ret; unsigned int i; u16 last_used; /*……*/ virtio_rmb(vq->weak_barriers); /*获取本次要是有的used_elem数组index*/ last_used = (vq->last_used_idx & (vq->vring.num - 1)); i = vq->vring.used->ring[last_used].id; /*本次要接受skb对应的data下标,也是skb对应第一个desc的index*/ *len = vq->vring.used->ring[last_used].len;/*本次要接受skb的长度*/ /* detach_buf clears data, so grab it now. */ /*取出要接受的skb*/ ret = vq->data[i]; /*释放skb对应的desc chain*/ detach_buf(vq, i); vq->last_used_idx++; /* If we expect an interrupt for the next entry, tell host * by writing event index and flush out the write before * the read in the next get_buf call. */ if (!(vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { vring_used_event(&vq->vring) = vq->last_used_idx; virtio_mb(vq->weak_barriers); } END_USE(vq); return ret; }

    这里注意以下几点:

    返回的len存放的是vq->vring.used->ring[last_used].len中的值,上面我们分析过,在mergeable情况下这是一个chain的长度(如果数据包的长度小于chain能装的数据长度,则为数据包的长度+virtio header),在GUEST_TSO*的情况,这是一个数据包的长度+virtio header;detach_buf 会释放当前desc chain,而不仅是一个desc,因为无论那种情况,这个chain中的数据再之后都会被取出,可以归还给后端了。

    receive_buf

    static void receive_buf(struct receive_queue *rq, void *buf, unsigned int len) { struct virtnet_info *vi = rq->vq->vdev->priv; struct net_device *dev = vi->dev; struct virtnet_stats *stats = this_cpu_ptr(vi->stats); struct sk_buff *skb; struct skb_vnet_hdr *hdr; /*……*/ if (vi->mergeable_rx_bufs) skb = receive_mergeable(dev, rq, buf, len); else if (vi->big_packets) skb = receive_big(dev, rq, buf); else skb = receive_small(buf, len); if (unlikely(!skb)) return; hdr = skb_vnet_hdr(skb); u64_stats_update_begin(&stats->rx_syncp); stats->rx_bytes += skb->len; stats->rx_packets++; u64_stats_update_end(&stats->rx_syncp); if (hdr->hdr.flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) { pr_debug("Needs csum!\n"); if (!skb_partial_csum_set(skb, hdr->hdr.csum_start, hdr->hdr.csum_offset)) goto frame_err; } else if (hdr->hdr.flags & VIRTIO_NET_HDR_F_DATA_VALID) { skb->ip_summed = CHECKSUM_UNNECESSARY; } skb->protocol = eth_type_trans(skb, dev); pr_debug("Receiving skb proto 0xx len %i type %i\n", ntohs(skb->protocol), skb->len, skb->pkt_type); /*根据后端填入virtio_net_hdr中的信息,设置gso的相关字段,说明收到的是大包*/ if (hdr->hdr.gso_type != VIRTIO_NET_HDR_GSO_NONE) { pr_debug("GSO!\n"); switch (hdr->hdr.gso_type & ~VIRTIO_NET_HDR_GSO_ECN) { case VIRTIO_NET_HDR_GSO_TCPV4: skb_shinfo(skb)->gso_type = SKB_GSO_TCPV4; break; case VIRTIO_NET_HDR_GSO_UDP: skb_shinfo(skb)->gso_type = SKB_GSO_UDP; break; case VIRTIO_NET_HDR_GSO_TCPV6: skb_shinfo(skb)->gso_type = SKB_GSO_TCPV6; break; default: net_warn_ratelimited("%s: bad gso type %u.\n", dev->name, hdr->hdr.gso_type); goto frame_err; } if (hdr->hdr.gso_type & VIRTIO_NET_HDR_GSO_ECN) skb_shinfo(skb)->gso_type |= SKB_GSO_TCP_ECN; skb_shinfo(skb)->gso_size = hdr->hdr.gso_size; if (skb_shinfo(skb)->gso_size == 0) { net_warn_ratelimited("%s: zero gso size.\n", dev->name); goto frame_err; } /* Header must be checked, and gso_segs computed. */ skb_shinfo(skb)->gso_type |= SKB_GSO_DODGY; skb_shinfo(skb)->gso_segs = 0; } /*发往协议栈*/ netif_receive_skb(skb); return; frame_err: dev->stats.rx_frame_errors++; dev_kfree_skb(skb); }

    如果mergeable开启,则vi->mergeable_rx_bufs会被置位,如果GUEST_TSO* 被打开,则 vi->big_packets会被置位。所以分析两种情况的接收处理就是看相应的调用函数,即receive_mergeable和receive_big。在分析这两个函数前,首先来看receive_buf的后半部分,根据后端填入virtio_net_hdr中的信息,设置gso(这里用于收方向,即gro)的相关字段。 所以要想guset能够接收大包(LRO)功能不但需要开启相关flag(GUEST_TSO*或mergeable),还依赖后端对virtio header的设置,如果后端处理了切割大包逻辑,以链表形式给前端,并设置相应virtio header,则guset就可以不用再分片,否则如果后端没有处理分片,仅仅把大包发给guset,则guset还需要进行GRO处理。 下面分析

    receive_mergeable

    static struct sk_buff *receive_mergeable(struct net_device *dev, struct receive_queue *rq, void *buf, unsigned int len) { /*从第一个page中获取到virtio header*/ struct skb_vnet_hdr *hdr = page_address(buf); /*从virtio header中获取这个数据包所用的desc chain个数*/ int num_buf = hdr->mhdr.num_buffers; struct page *page = buf; /*将page中的数据转换为skb*/ struct sk_buff *skb = page_to_skb(rq, page, len); int i; if (unlikely(!skb)) goto err_skb; while (--num_buf) { /*对应当前数据包使用的每个chain*/ i = skb_shinfo(skb)->nr_frags; if (i >= MAX_SKB_FRAGS) { pr_debug("%s: packet too long\n", skb->dev->name); skb->dev->stats.rx_length_errors++; goto err_frags; } /*对接下来的每个desc chain 再次调用virtqueue_get_buf */ page = virtqueue_get_buf(rq->vq, &len); if (!page) { pr_debug("%s: rx error: %d buffers %d missing\n", dev->name, hdr->mhdr.num_buffers, num_buf); dev->stats.rx_length_errors++; goto err_buf; } if (len > PAGE_SIZE) len = PAGE_SIZE; set_skb_frag(skb, page, 0, &len); --rq->num; } return skb; }

    可以看出mergeable的情况,由于一个数据包可能使用多个chain,则会对每个chain在此调用virtqueue_get_buf,获取对应page(mergeable的情况每个chain的长度为1,对应的也是一个page),然后通过set_skb_frag将之后的每个chain(desc)对应的page加入首个skb的skb_shinfo(skb)->frags[i]中。所以mergeable情况收到的大包,会有skb_shinfo(skb)->frags[],其对应的每个desc对应skb_shinfo(skb)->frags[]的一个page。下面看receive_big。

    receive_big

    static struct sk_buff *receive_big(struct net_device *dev, struct receive_queue *rq, void *buf) { struct page *page = buf; struct sk_buff *skb = page_to_skb(rq, page, 0); return skb; }

    直接调用page_to_skb,这个在receive_mergeable中也有调用。

    page_to_skb

    static struct sk_buff *page_to_skb(struct receive_queue *rq, struct page *page, unsigned int len) { struct virtnet_info *vi = rq->vq->vdev->priv; struct sk_buff *skb; struct skb_vnet_hdr *hdr; unsigned int copy, hdr_len, offset; char *p; p = page_address(page); /* copy small packet so we can reuse these pages for small data */ skb = netdev_alloc_skb_ip_align(vi->dev, GOOD_COPY_LEN); if (unlikely(!skb)) return NULL; hdr = skb_vnet_hdr(skb); if (vi->mergeable_rx_bufs) { hdr_len = sizeof hdr->mhdr; offset = hdr_len; } else { hdr_len = sizeof hdr->hdr; offset = sizeof(struct padded_vnet_hdr); } /*提取virtio header*/ memcpy(hdr, p, hdr_len); len -= hdr_len; p += offset; /*将剩余数据尽可能的拷贝到当前的skb线性区中*/ copy = len; if (copy > skb_tailroom(skb)) copy = skb_tailroom(skb); memcpy(skb_put(skb, copy), p, copy); len -= copy; offset += copy; /* * Verify that we can indeed put this data into a skb. * This is here to handle cases when the device erroneously * tries to receive more than is possible. This is usually * the case of a broken device. */ if (unlikely(len > MAX_SKB_FRAGS * PAGE_SIZE)) { net_dbg_ratelimited("%s: too much data\n", skb->dev->name); dev_kfree_skb(skb); return NULL; } /*如果第一个skb的线性区用完了,但是还有数据没拷贝出来,则添加到skb_shinfo(skb)->frags[]*/ while (len) { set_skb_frag(skb, page, offset, &len); page = (struct page *)page->private; offset = 0; } if (page) give_pages(rq, page); return skb; }

    从上面的过程总结一下:当开启GUEST_TSO*时,guest收大包会尽可能的填充skb的线性区,剩余数据填充skb_shinfo(skb)->frags[],而对于mergeable由于只有第一个chain(也就是一个desc)对应的page会填充skb线性区,其他数据都在skb_shinfo(skb)->frags[],所以mergeable可能会有更多frags。

    另外注意一点,当mergeable和GUEST_TSO*同时开启时,由于guest是优先判断mergeable的,所以就会走mergeable逻辑。

    在实现LRO时,建议使用mergeable特性,因为如果使用GUEST_TSO*,则接收小包也会是由长为17的desc chain,这会造成浪费。

    原文链接:http://blog.chinaunix.net/uid-8574039-id-5826459.html

    Processed: 0.012, SQL: 9