vhost_user在收包时(将数据包发往vm内部)会调用rte_vhost_enqueue_burst函数,这个函数的实现如下:
rte_vhost_enqueue_burst
uint16_t
rte_vhost_enqueue_burst(int vid
, uint16_t queue_id
,
struct rte_mbuf
**pkts
, uint16_t count
)
{
struct virtio_net
*dev
= get_device(vid
);
if (!dev
)
return 0;
if (dev
->features
& (1 << VIRTIO_NET_F_MRG_RXBUF
))
return virtio_dev_merge_rx(dev
, queue_id
, pkts
, count
);
else
return virtio_dev_rx(dev
, queue_id
, pkts
, count
);
}
我们可以看到根据vhost_user后端设备是否支持VIRTIO_NET_F_MRG_RXBUF特性会调用不同函数,其中不支持时调用的virtio_dev_rx之前已经分析过,这里看下virtio_dev_merge_rx的具体实现。
virtio_dev_merge_rx
static inline uint32_t
__attribute__((always_inline
))
virtio_dev_merge_rx(struct virtio_net
*dev
, uint16_t queue_id
,
struct rte_mbuf
**pkts
, uint32_t count
)
{
struct vhost_virtqueue
*vq
;
uint32_t pkt_idx
= 0;
uint16_t num_buffers
;
struct buf_vector buf_vec
[BUF_VECTOR_MAX
];
uint16_t avail_head
;
vq
= dev
->virtqueue
[queue_id
];
if (unlikely(vq
->enabled
== 0))
return 0;
count
= RTE_MIN((uint32_t
)MAX_PKT_BURST
, count
);
if (count
== 0)
return 0;
rte_prefetch0(&vq
->avail
->ring
[vq
->last_avail_idx
& (vq
->size
- 1)]);
vq
->shadow_used_idx
= 0;
avail_head
= *((volatile uint16_t
*)&vq
->avail
->idx
);
for (pkt_idx
= 0; pkt_idx
< count
; pkt_idx
++) {
uint32_t pkt_len
= pkts
[pkt_idx
]->pkt_len
+ dev
->vhost_hlen
;
if (unlikely(reserve_avail_buf_mergeable(dev
, vq
,
pkt_len
, buf_vec
, &num_buffers
,
avail_head
) < 0)) {
LOG_DEBUG(VHOST_DATA
,
"(%d) failed to get enough desc from vring\n",
dev
->vid
);
vq
->shadow_used_idx
-= num_buffers
;
break;
}
LOG_DEBUG(VHOST_DATA
, "(%d) current index %d | end index %d\n",
dev
->vid
, vq
->last_avail_idx
,
vq
->last_avail_idx
+ num_buffers
);
if (copy_mbuf_to_desc_mergeable(dev
, pkts
[pkt_idx
],
buf_vec
, num_buffers
) < 0) {
vq
->shadow_used_idx
-= num_buffers
;
break;
}
vq
->last_avail_idx
+= num_buffers
;
}
if (likely(vq
->shadow_used_idx
)) {
flush_shadow_used_ring(dev
, vq
);
rte_mb();
if (!(vq
->avail
->flags
& VRING_AVAIL_F_NO_INTERRUPT
)
&& (vq
->callfd
>= 0))
eventfd_write(vq
->callfd
, (eventfd_t
)1);
}
return pkt_idx
;
}
这里主要引入了一个struct buf_vector的数组,这个数组和desc是一一对应的,其关系如下所示: 那么为什么要引入这个buf_vector数组呢?首先我们知道所谓merge_rx的功能,主要是为了vm接收大包(实现LRO的功能)。一般来说,一个mbuf会对应转换成为一个desc,但是当mbuf稍微大点,一个desc无法装下怎么办呢? 这时不要忘了desc中的next成员,即desc也是可以形成chain的,如果当前desc无法装下mbuf,那么就用vq->desc[desc->next]来继续存放。如下图: desc1,desc3,desc5形成一个chain,mbuf由desc1和desc3存放。那么如果mbuf再大点呢?达到desc1,desc3,desc5三个desc都无法存放呢?这时如果没有打开VIRTIO_NET_F_MRG_RXBUF调用virtio_dev_rx就会接收出错了(尽管还有其他可用的desc,但由于不在一个chain,所以也不会使用)。但是如果打开了VIRTIO_NET_F_MRG_RXBUF,则会尝试找其他其他的chain。
还以上图为例,desc数组中共有两个chain1:desc1->desc3->desc5; chain2:desc2->desc4->desc6,那么当chain1无法存下这个mbuf数据时,mbuf剩下的数据将由chain2存放。主要:一个mbuf绝不可能按照desc1àdesc2的顺序存放,因为desc1和desc2属于不同的chain,只有当前chain使用完才能使用另一个chain。
由于desc数组的顺序不一定是按照chain的顺序组织的(如上图),所以为了方便后续的mbuf到各个desc的拷贝操作,我们增加了buf_vector这个数组,用它来记录拷贝mbuf的desc顺序。如下图所示: 这样我们拷贝mbuf时就可以按照buf_vector1到buf_vector6依次拷贝到对应的desc中了。
另外我们注意一点,在函数的最后会对last_avail_idx进行更新:vq->last_avail_idx += num_buffers; 注意这里的num_buffers不是这个mbuf使用的desc个数,而是使用的desc chain个数。还以上面mbuf使用两个chain为例,last_avail_idx需要加2,如下图所示,如果之前last_avail_idx为1的话,这里就要更新为3了。 这里容易混淆的一点是:avail->ring中存放的index不是和desc一一对应的,而是和desc chain一一对应的,即只存放desc chain header的index。所以avail->idx-last_avail_id也不是可用desc的个数,而是可用desc chain的个数。
有了以上背景,我们再看reserve_avail_buf_mergeable这个函数就容易理解多了。这个函数的作用就是:预留足够的desc来存放mbuf,同时使用buf_vec来记录,每个buf_vec对应一个desc,如果当前所有可用的desc都无法装得下这个mbuf则返错。
reserve_avail_buf_mergeable
static inline int
reserve_avail_buf_mergeable(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
,
uint32_t size
, struct buf_vector
*buf_vec
,
uint16_t
*num_buffers
, uint16_t avail_head
)
{
uint16_t cur_idx
;
uint32_t vec_idx
= 0;
uint16_t tries
= 0;
uint16_t head_idx
= 0;
uint16_t len
= 0;
*num_buffers
= 0;
cur_idx
= vq
->last_avail_idx
;
while (size
> 0) {
if (unlikely(cur_idx
== avail_head
))
return -1;
if (unlikely(fill_vec_buf(dev
, vq
, cur_idx
, &vec_idx
, buf_vec
,
&head_idx
, &len
) < 0))
return -1;
len
= RTE_MIN(len
, size
);
update_shadow_used_ring(vq
, head_idx
, len
);
size
-= len
;
cur_idx
++;
tries
++;
*num_buffers
+= 1;
if (unlikely(tries
>= vq
->size
))
return -1;
}
return 0;
}
这里需要注意的一点就是num_buffers不是使用desc的个数,而是使用的desc chain个数。而fill_vec_buf负责每次挑选一个desc chain填入对应的buf_vector,注意传入参数head_idx每次随着被desc填充而被修改。下面看fill_vec_buf。
fill_vec_buf
static inline int __attribute__((always_inline
))
fill_vec_buf(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
,
uint32_t avail_idx
, uint32_t
*vec_idx
,
struct buf_vector
*buf_vec
, uint16_t
*desc_chain_head
,
uint16_t
*desc_chain_len
)
{
uint16_t idx
= vq
->avail
->ring
[avail_idx
& (vq
->size
- 1)];
uint32_t vec_id
= *vec_idx
;
uint32_t len
= 0;
struct vring_desc
*descs
= vq
->desc
;
*desc_chain_head
= idx
;
if (vq
->desc
[idx
].flags
& VRING_DESC_F_INDIRECT
) {
descs
= (struct vring_desc
*)(uintptr_t
)
gpa_to_vva(dev
, vq
->desc
[idx
].addr
);
if (unlikely(!descs
))
return -1;
idx
= 0;
}
while (1) {
if (unlikely(vec_id
>= BUF_VECTOR_MAX
|| idx
>= vq
->size
))
return -1;
len
+= descs
[idx
].len
;
buf_vec
[vec_id
].buf_addr
= descs
[idx
].addr
;
buf_vec
[vec_id
].buf_len
= descs
[idx
].len
;
buf_vec
[vec_id
].desc_idx
= idx
;
vec_id
++;
if ((descs
[idx
].flags
& VRING_DESC_F_NEXT
) == 0)
break;
idx
= descs
[idx
].next
;
}
*desc_chain_len
= len
;
*vec_idx
= vec_id
;
return 0;
}
这个函数比较简单,就是遍历一个desc chain到结束为止,然后将这个chain的信息存放在buf_vec中,将这个chain能存放的数据长度信息返回,以供上层判断是否还需要再找下一个chain填充。
这里有一个VRING_DESC_F_INDIRECT 的desc特性的判断,这里顺便说下direct desc和indirect desc的区别。通常的desc->addr指向的是存放数据的page,这样的desc叫做direct desc。但是indirect desc的desc->addr指向的是一个direct desc数组,如下图所示。(主要indirect desc指向的只能是direct desc,即不能再继续级联下去) 最后在开启mergeable后virtio_dev_merge_rx的调用和普通模式virtio_dev_rx的调用还有点不同。在virtio_dev_rx中,每次将mbuf中的数据存放在一个desc后都会更新vq->used->ring和vq->last_used_idx,即告诉前端那些desc中已经存放了数据。但在virtio_dev_merge_rx中却没有看到这个过程。其实这个过程是有的,我们注意到在reserve_avail_buf_mergeable中每次调用完fill_vec_buf就会调用一下update_shadow_used_ring,我们看一下其实现。
static inline void __attribute__((always_inline
))
update_shadow_used_ring(struct vhost_virtqueue
*vq
,
uint16_t desc_idx
, uint16_t len
)
{
uint16_t i
= vq
->shadow_used_idx
++;
vq
->shadow_used_ring
[i
].id
= desc_idx
;
vq
->shadow_used_ring
[i
].len
= len
;
}
这里会更新shadow_used_idx和shadow_used_ring。这里引入的shadow_used_idx和shadow_used_ring其实是为了最后批处理更新vq->used->ring,通常更新vq->used->ring要先找到对应的idx,在更新vq->used->ring[idx]。如果对于要使用多个desc chain的情况,这样每次更新就会造成较大的访存开销。那我们看看使用shadow_used_idx和shadow_used_ring会怎么更新。在virtio_dev_merge_rx中拷贝完mbuf数据后,最后会调用flush_shadow_used_ring函数。
flush_shadow_used_ring
static inline void __attribute__((always_inline
))
flush_shadow_used_ring(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
)
{
uint16_t used_idx
= vq
->last_used_idx
& (vq
->size
- 1);
if (used_idx
+ vq
->shadow_used_idx
<= vq
->size
) {
do_flush_shadow_used_ring(dev
, vq
, used_idx
, 0,
vq
->shadow_used_idx
);
} else {
uint16_t size
;
size
= vq
->size
- used_idx
;
do_flush_shadow_used_ring(dev
, vq
, used_idx
, 0, size
);
do_flush_shadow_used_ring(dev
, vq
, 0, size
,
vq
->shadow_used_idx
- size
);
}
vq
->last_used_idx
+= vq
->shadow_used_idx
;
rte_smp_wmb();
*(volatile uint16_t
*)&vq
->used
->idx
+= vq
->shadow_used_idx
;
vhost_log_used_vring(dev
, vq
, offsetof(struct vring_used
, idx
),
sizeof(vq
->used
->idx
));
}
我们再看下do_flush_shadow_used_ring的处理。
do_flush_shadow_used_ring
static inline void __attribute__((always_inline
))
do_flush_shadow_used_ring(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
,
uint16_t to
, uint16_t from
, uint16_t size
)
{
rte_memcpy(&vq
->used
->ring
[to
],
&vq
->shadow_used_ring
[from
],
size
* sizeof(struct vring_used_elem
));
vhost_log_used_vring(dev
, vq
,
offsetof(struct vring_used
, ring
[to
]),
size
* sizeof(struct vring_used_elem
));
}
可以看到由于之前shadow_used_ring中有相关记录,所以这里可以一次性拷贝到uesd_ring中,减少了内存访问次数。
原文链接:http://blog.chinaunix.net/uid-8574039-id-5826405.html