所谓前后端通知,必然涉及两个方向:前端通知后端,后端通知前端。而我们知道vhost有txq和rxq,对于每种queue都伴随有这两种通知。而通知方式又根据是否支持event_idx有着不同的实现,最后virtio1.1引入的packed ring后,通知相对split ring又有不同。下面我们以txq,rxq的两个方向共四种情况来分析前后端的通知实现。其中前端以kernel4.9 virtio_net实现为例分析,后端以dpdk 18.11 vhost_user实现分析。在展开前后端通知分析前,我们先了解两个背景知识:前端中断处理函数的注册和后端vhost_user的kick方式。
前端中断处理函数注册
后端对前端的通知,是以中断方式传递到前端的。分析通知的接收处理就少不了要了解这些中断处理函数。所以我们先看一下前端是怎么注册中断处理函数的。这些需要从virtio_net的加载函数virtnet_probe说起,具体如下图所示。 我们知道virtio设备分为morden和lagecy两种,我们以morden设备为例。对于morden设备,会调用virtio_pci_modern_probe初始化config ops:
vp_dev
->vdev
.config
= &virtio_pci_config_ops
;
其find_vqs函数对应为vp_modern_find_vqs。vp_modern_find_vqs 其中主要调用vq_find_vqs函数。vp_find_vqs函数完成队列中断处理函数的初始化,根据设备对中断的支持,分为以下三种情况:
所有txq,rxq以及ctrlq都共享一个中断处理;ctrlq单独使用一个中断处理,其他txq和rxq共享一个中断处理;可以每个queue(包含txq,rxq以及ctrlq)各一个中断处理;
virtnet_find_vqs(kernel 4.9)
static int virtnet_find_vqs(struct virtnet_info
*vi
)
{
vq_callback_t
**callbacks
;
struct virtqueue
**vqs
;
int ret
= -ENOMEM
;
int i
, total_vqs
;
const char **names
;
total_vqs
= vi
->max_queue_pairs
* 2 +
virtio_has_feature(vi
->vdev
, VIRTIO_NET_F_CTRL_VQ
);
vqs
= kzalloc(total_vqs
* sizeof(*vqs
), GFP_KERNEL
);
if (!vqs
)
goto err_vq
;
callbacks
= kmalloc(total_vqs
* sizeof(*callbacks
), GFP_KERNEL
);
if (!callbacks
)
goto err_callback
;
names
= kmalloc(total_vqs
* sizeof(*names
), GFP_KERNEL
);
if (!names
)
goto err_names
;
if (vi
->has_cvq
) {
callbacks
[total_vqs
- 1] = NULL;
names
[total_vqs
- 1] = "control";
}
for (i
= 0; i
< vi
->max_queue_pairs
; i
++) {
callbacks
[rxq2vq(i
)] = skb_recv_done
;
callbacks
[txq2vq(i
)] = skb_xmit_done
;
sprintf(vi
->rq
[i
].name
, "input.%d", i
);
sprintf(vi
->sq
[i
].name
, "output.%d", i
);
names
[rxq2vq(i
)] = vi
->rq
[i
].name
;
names
[txq2vq(i
)] = vi
->sq
[i
].name
;
}
ret
= vi
->vdev
->config
->find_vqs(vi
->vdev
, total_vqs
, vqs
, callbacks
,
names
);
if (ret
)
goto err_find
;
if (vi
->has_cvq
) {
vi
->cvq
= vqs
[total_vqs
- 1];
if (virtio_has_feature(vi
->vdev
, VIRTIO_NET_F_CTRL_VLAN
))
vi
->dev
->features
|= NETIF_F_HW_VLAN_CTAG_FILTER
;
}
for (i
= 0; i
< vi
->max_queue_pairs
; i
++) {
vi
->rq
[i
].vq
= vqs
[rxq2vq(i
)];
vi
->sq
[i
].vq
= vqs
[txq2vq(i
)];
}
kfree(names
);
kfree(callbacks
);
kfree(vqs
);
return 0;
err_find
:
kfree(names
);
err_names
:
kfree(callbacks
);
err_callback
:
kfree(vqs
);
err_vq
:
return ret
;
}
vp_find_vqs (kernel 4.9)
int vp_find_vqs(struct virtio_device
*vdev
, unsigned nvqs
,
struct virtqueue
*vqs
[],
vq_callback_t
*callbacks
[],
const char * const names
[])
{
int err
;
err
= vp_try_to_find_vqs(vdev
, nvqs
, vqs
, callbacks
, names
, true
, true
);
if (!err
)
return 0;
err
= vp_try_to_find_vqs(vdev
, nvqs
, vqs
, callbacks
, names
,
true
, false
);
if (!err
)
return 0;
return vp_try_to_find_vqs(vdev
, nvqs
, vqs
, callbacks
, names
,
false
, false
);
}
函数首先尝试方式3每个queue各一个中断处理,如果失败再尝试方式2,如果再失败就只能使用方式1了。
我们看到每种方式都是调用同一个函数vp_try_to_find_vqs,只是传入的参数不同。那我们就来看下这个函数的主要内容。
vp_try_to_find_vqs
static int vp_try_to_find_vqs(struct virtio_device
*vdev
, unsigned nvqs
,
struct virtqueue
*vqs
[],
vq_callback_t
*callbacks
[],
const char * const names
[],
bool use_msix
,
bool per_vq_vectors
)
{
struct virtio_pci_device
*vp_dev
= to_vp_device(vdev
);
u16 msix_vec
;
int i
, err
, nvectors
, allocated_vectors
;
vp_dev
->vqs
= kmalloc(nvqs
* sizeof *vp_dev
->vqs
, GFP_KERNEL
);
if (!vp_dev
->vqs
)
return -ENOMEM
;
if (!use_msix
) {
err
= vp_request_intx(vdev
);
if (err
)
goto error_find
;
} else {
if (per_vq_vectors
) {
nvectors
= 1;
for (i
= 0; i
< nvqs
; ++i
)
if (callbacks
[i
])
++nvectors
;
} else {
nvectors
= 2;
}
err
= vp_request_msix_vectors(vdev
, nvectors
, per_vq_vectors
);
if (err
)
goto error_find
;
}
vp_dev
->per_vq_vectors
= per_vq_vectors
;
allocated_vectors
= vp_dev
->msix_used_vectors
;
for (i
= 0; i
< nvqs
; ++i
) {
if (!names
[i
]) {
vqs
[i
] = NULL;
continue;
} else if (!callbacks
[i
] || !vp_dev
->msix_enabled
)
msix_vec
= VIRTIO_MSI_NO_VECTOR
;
else if (vp_dev
->per_vq_vectors
)
msix_vec
= allocated_vectors
++;
else
msix_vec
= VP_MSIX_VQ_VECTOR
;
vqs
[i
] = vp_setup_vq(vdev
, i
, callbacks
[i
], names
[i
], msix_vec
);
if (IS_ERR(vqs
[i
])) {
err
= PTR_ERR(vqs
[i
]);
goto error_find
;
}
if (!vp_dev
->per_vq_vectors
|| msix_vec
== VIRTIO_MSI_NO_VECTOR
)
continue;
snprintf(vp_dev
->msix_names
[msix_vec
],
sizeof *vp_dev
->msix_names
,
"%s-%s",
dev_name(&vp_dev
->vdev
.dev
), names
[i
]);
err
= request_irq(vp_dev
->msix_entries
[msix_vec
].vector
,
vring_interrupt
, 0,
vp_dev
->msix_names
[msix_vec
],
vqs
[i
]);
if (err
) {
vp_del_vq(vqs
[i
]);
goto error_find
;
}
}
return 0;
error_find
:
vp_del_vqs(vdev
);
return err
;
}
其中vp_request_intx函数完成方式1的处理,具体就是通过request_irq注册中断处理函数vp_interrupt;vp_request_msix_vectors函数完成方式2的处理,其中调用request_irq给ctrlq注册中断处理函数为vp_config_changed(方式3的ctrlq中断处理也是这里注册),调用request_irq给数据queue(txq,rxq)注册中断处理函数为vp_vring_interrupt;而方式3的剩余处理在本函数的后半部分,为每个数据queue调用request_irq注册中断处理函数vring_interrupt。如果查看代码会发现方式2数据queue共享的中断处理vp_vring_interrupt函数中也是通过遍历所有queue调用vring_interrupt实现的。所以我们重点关注vring_interrupt函数的实现,这是数据queue中断处理的核心。
vring_interrupt
irqreturn_t
vring_interrupt(int irq
, void *_vq
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
if (!more_used(vq
)) {
pr_debug("virtqueue interrupt with no work for %p\n", vq
);
return IRQ_NONE
;
}
if (unlikely(vq
->broken
))
return IRQ_HANDLED
;
pr_debug("virtqueue callback for %p (%p)\n", vq
, vq
->vq
.callback
);
if (vq
->vq
.callback
)
vq
->vq
.callback(&vq
->vq
);
return IRQ_HANDLED
;
}
static inline bool
more_used(const struct vring_virtqueue
*vq
)
{
return vq
->last_used_idx
!= virtio16_to_cpu(vq
->vq
.vdev
, vq
->vring
.used
->idx
);
}
如果more_used返回false表示vq->last_used_idx== vring.used->idx,这说明当前没有uesd desc需要更新处理,所以中断直接返回。否则就调用对应queue的callback函数。而callback函数在之前virtnet_find_vqs的调用中被设置。rxq和txq的callback分别注册为了skb_recv_done和skb_xmit_done。
callbacks
[rxq2vq(i
)] = skb_recv_done
;
callbacks
[txq2vq(i
)] = skb_xmit_done
;
所以接收队列和发送队列的中断处理主要就是分别调用skb_recv_done和skb_xmit_done函数。
后端vhost_user的kick方式
下面我们再看下后端vhost_user是如果kick前端的。首先是split ring的情况。
vhost_vring_call_split (dpdk 1811)
static __rte_always_inline
void
vhost_vring_call_split(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
)
{
rte_smp_mb();
if (dev
->features
& (1ULL << VIRTIO_RING_F_EVENT_IDX
)) {
uint16_t old
= vq
->signalled_used
;
uint16_t new
= vq
->last_used_idx
;
if (vhost_need_event(vhost_used_event(vq
), new
, old
)
&& (vq
->callfd
>= 0)) {
vq
->signalled_used
= vq
->last_used_idx
;
eventfd_write(vq
->callfd
, (eventfd_t
) 1);
}
} else {
if (!(vq
->avail
->flags
& VRING_AVAIL_F_NO_INTERRUPT
)
&& (vq
->callfd
>= 0))
eventfd_write(vq
->callfd
, (eventfd_t
)1);
}
}
split的方式比较简单,当开启event_idx的时候,就根据old,new以及前端消耗到的位置avail->ring[(vr)->size]来判断是否kick;如果没有开启event_idx时则只要前端没有设置VRING_AVAIL_F_NO_INTERRUPT就kick前端,注意不开启event_idx时,后端也不是无脑kick的,只有前端没设置VRING_AVAIL_F_NO_INTERRUPT时才会kick,至于前端什么时候设置VRING_AVAIL_F_NO_INTERRUPT,一会再分析。
下面我们看packed ring的kick前端处理方式。在介绍packed处理之前我们先看其相关结构。
struct vhost_virtqueue
{
union {
struct vring_desc
*desc
;
struct vring_packed_desc
*desc_packed
;
};
union {
struct vring_avail
*avail
;
struct vring_packed_desc_event
*driver_event
;
};
union {
struct vring_used
*used
;
struct vring_packed_desc_event
*device_event
;
};
……
};
在packed 方式中,由于uesd ring和avail ring不再需要,取而代之的是两个desc_event结构。在split方式中我们知道前后端控制是否相互通知是通过avail->flag和uesd->flag的设置来完成的,但packed中分别是通过driver_event和device_event来完成的。
其中driver_event是后端只读的,是前端控制后端更新uesd desc时是否发送通知的,对应于split 方式的avail->flag;
device_event是前端只读的,是后端控制前端更新avail desc时是否发送通知的,对应于split方式的uesd->flag。
driver_event和device_event都是vring_packed_desc_event结构,其具体结构如下。
struct vring_packed_desc_event
{
uint16_t off_wrap
;
uint16_t flags
;
};
其中flag可以取三个值分别为:
#define VRING_EVENT_F_ENABLE 0x0
#define VRING_EVENT_F_DISABLE 0x1
#define VRING_EVENT_F_DESC 0x2
driver_event取值VRING_EVENT_F_DISABLE相当于split方式avail->flag设置VRING_AVAIL_F_NO_INTERRUPT,device_event取值VRING_EVENT_F_DISABLE相当于uesd->flag设置VRING_USED_F_NO_NOTIFY。
剩下关键的就是最后这个VRING_EVENT_F_DESC flag了,官方spec是这么解释这个flag的: Enable events for a specific descriptor,(as specified by Descriptor Ring Change Event Offset/Wrap Counter),Only valid if VIRTIO_F_RING_EVENT_IDX has been negotiated. 可以看出这个flag的作用是指定某一个desc发生变化后触发通知,而这个flag生效的前提就是开启了event_idx。这个解释似乎还是不太直观,其实我们可以对比split 的event_idx处理方式来理解。我们知道split方式中,如果开启了event_idx,则前端需要通过avail->ring的最后一个desc告诉后端前端的uesd desc处理到哪里了,后端根据这个值来决定是否需要kick前端;而后端则使用uesd->ring的最后一个desc告诉前端后端的avail desc处理到哪里了,前端根据这个来决定是否来通知后端。但是在packed方式中没有了avail ring和uesd ring,那如果开启了event_idx前端后端如何才能告知对方自己处理到什么位置了呢?答案就是通过这里的off_wrap成员,可以看到这个成员是一个uint16_t,其中后15位指定了前后端处理到什么位置了,而最高位是为了解决翻转的Wrap Counter,而整个off_wrap字段有意义的前提就是flag被设置为了VRING_EVENT_F_DESC。
下面我们对比packed和split方式后端是否开启通知,来了解下vring_packed_desc_event这三个flag的作用。首先看split的开关中断处理,即vhost_enable_notify_split :
vhost_enable_notify_split(dpdk 1811)
static inline void
vhost_enable_notify_split(struct virtio_net
*dev
,
struct vhost_virtqueue
*vq
, int enable
)
{
if (!(dev
->features
& (1ULL << VIRTIO_RING_F_EVENT_IDX
))) {
if (enable
)
vq
->used
->flags
&= ~VRING_USED_F_NO_NOTIFY
;
else
vq
->used
->flags
|= VRING_USED_F_NO_NOTIFY
;
} else {
if (enable
)
vhost_avail_event(vq
) = vq
->last_avail_idx
;
}
}
可以看到在没有开启EVENT_IDX时,控制guest是否通知后端是用过控制vq->used->flags设置VRING_USED_F_NO_NOTIFY来实现的,但是如果开启了EVENT_IDX就不再使用used->flags,而是使用EVENT_IDX特有中断限速方式。
下面再看packed的guest通知开启关闭方式实现vhost_enable_notify_packed。
vhost_enable_notify_packed(dpdk 1811)
static inline void
vhost_enable_notify_packed(struct virtio_net
*dev
,
struct vhost_virtqueue
*vq
, int enable
)
{
uint16_t flags
;
if (!enable
) {
vq
->device_event
->flags
= VRING_EVENT_F_DISABLE
;
return;
}
flags
= VRING_EVENT_F_ENABLE
;
if (dev
->features
& (1ULL << VIRTIO_RING_F_EVENT_IDX
)) {
flags
= VRING_EVENT_F_DESC
;
vq
->device_event
->off_wrap
= vq
->last_avail_idx
|
vq
->avail_wrap_counter
<< 15;
}
rte_smp_wmb();
vq
->device_event
->flags
= flags
;
}
当不开启event_idx时,packed 方式使用device_event->flags是否设置VRING_EVENT_F_DISABLE代替split 方式设置的VRING_USED_F_NO_NOTIFY。如果开启了event_idx则device_event->flags一定会被设置为VRING_EVENT_F_DESC的,另外注意device_event->off_wrap的初始设置,低15位被设置为vq->last_avail_idx,高位被设置为vq->avail_wrap_counter。
最后我们看一下packed方式下后端是如何决定是否通知前端的,即vhost_vring_call_packed函数的实现。
vhost_vring_call_packed(dpdk 1811)
static __rte_always_inline
void
vhost_vring_call_packed(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
)
{
uint16_t old
, new
, off
, off_wrap
;
bool signalled_used_valid
, kick
= false
;
rte_smp_mb();
if (!(dev
->features
& (1ULL << VIRTIO_RING_F_EVENT_IDX
))) {
if (vq
->driver_event
->flags
!=
VRING_EVENT_F_DISABLE
)
kick
= true
;
goto kick
;
}
old
= vq
->signalled_used
;
new
= vq
->last_used_idx
;
vq
->signalled_used
= new
;
signalled_used_valid
= vq
->signalled_used_valid
;
vq
->signalled_used_valid
= true
;
if (vq
->driver_event
->flags
!= VRING_EVENT_F_DESC
) {
if (vq
->driver_event
->flags
!= VRING_EVENT_F_DISABLE
)
kick
= true
;
goto kick
;
}
if (unlikely(!signalled_used_valid
)) {
kick
= true
;
goto kick
;
}
rte_smp_rmb();
off_wrap
= vq
->driver_event
->off_wrap
;
off
= off_wrap
& ~(1 << 15);
if (new
<= old
)
old
-= vq
->size
;
if (vq
->used_wrap_counter
!= off_wrap
>> 15)
off
-= vq
->size
;
if (vhost_need_event(off
, new
, old
))
kick
= true
;
kick
:
if (kick
)
eventfd_write(vq
->callfd
, (eventfd_t
)1);
}
在不开启event_idx的时候,和开启event_idx但是driver_event->flags != VRING_EVENT_F_DESC时(正常情况不存在)都是按照前端是否设置VRING_EVENT_F_DISABLE来决定的。
下面一点和split略有不同,就是vq->signalled_used赋值的位置,split是kick前端后条件满足时才进行signalled_used赋值,而这里是直接赋值,并且引入了一个signalled_used_valid变量。首先明确一下,这个变化和packed本身无关,其实split在后续patch也采用了类似处理(详见:http://mails.dpdk.org/archives/dev/2019-March/126684.html )。其中vq->signalled_used_valid的引入是为了在热迁移后以及前端驱动reload后,将vq->signalled_used的值标记为无效,正常情况vq->signalled_used记录的是上次通知前端时后端处理到的uesd idx,但是当热迁移发生或者前端驱动reload后这个值将不再有意义。signalled_used_valid是用在第一次更新used ring、virtio-net driver reload和live migration之后,所以在ring初始化的时候、guest发送VHOST_USER_GET_VRING_BASE的时候才赋值为false。
另外将vq->signalled_used赋值位置前移,而不是只有满足kick条件才赋值,这个改动是为了一个优化。之前产生interrupt的条件是: last_used_idx – event_idx <= last_used_idx – signalled_used (event_idx为前端更新到的uesd idx)。以前的实现是产生interrupt的时候才会更新signalled_used,而现在是每次更新uesd_ring之后,不管是否有interrupt都更新它,现在的做法和kernel vhost_net的实现是一样的。这样做的好处是可以减少不必要的kick事件,以vm接收方向为例,如果guest kernel有NAPI,那么在guest以poll的方式收包的时候会停止更新event_idx。假设NAPI在Δt时间内都会poll(也就是不更新event_idx),那么对原来的实现而言,last_used_idx就是一直增长,而event_idx和signalled_used都是不变的;在这种情况下,产生Interrupt只有两种情况,一是guest NAPI停止poll、更新event_idx,另一种是last_used_idx超过2^16,比如(last_used_idx=13, signaled_used=65535, event_idx=10 )。而对现在的实现而言,last_used_idx也一直增长,event_idx也是固定不变的,不过(last_used_idx – signalled_used)是一个相对固定的值(其值等于这一次更新的used ring的descriptor数,我们称它为Δf);因此在这种情况下guest NAPI结束后,更新event_idx,但(last_used_idx – event_idx)的值未必小于等于Δf,所以不一定需要产生interrupt。所以,在guest如果没有NAPI,现在的实现与以前的相比可以减小(last_used_idx – signaled_used)的值,这样可以减小(last_used_idx – event_idx)小于等于这个值的的概率,从而减小产生interrupt的概率。
最后一点就是packed方式是如何获取前端当前的消耗位置的,也就是event_idx的,是通过off_wrap的低15位以及高位的warp counter来完成的。
有了以上背景知识,我们就开始分别分析前后端通知的四种情况。
发送队列前端通知后端
我们看发送队列(txq)通知后端的情况。首先明确对于发送队列,前端通知(kick)后端的作用是什么?发送队列kick后端就是为了告诉后端前端已经将数据放入了共享ring(具体就是avail desc)中,后端可以来取数据了。
所以我们看前端virtio_net驱动的代码实现,在发送函数start_xmit的结尾有如下调用:
if (kick
|| netif_xmit_stopped(txq
))
virtqueue_kick(sq
->vq
);
我们来看virtqueue_kick的实现。
virtqueue_kick (kernel4.9 )
bool
virtqueue_kick(struct virtqueue
*vq
)
{
if (virtqueue_kick_prepare(vq
))
return virtqueue_notify(vq
);
return true
;
}
virtqueue_notify(kernel4.9)
bool
virtqueue_notify(struct virtqueue
*_vq
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
if (unlikely(vq
->broken
))
return false
;
if (!vq
->notify(_vq
)) {
vq
->broken
= true
;
return false
;
}
return true
;
}
EXPORT_SYMBOL_GPL(virtqueue_notify
);
可以看到真正发送kick通知的函数是virtqueue_notify,其调用的条件是virtqueue_kick_prepare,只有其返回true的时候才会kick后端。我们看下virtqueue_kick_prepare的实现。
virtqueue_kick_prepare (kernel4.9 )
bool
virtqueue_kick_prepare(struct virtqueue
*_vq
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
u16 new
, old
;
bool needs_kick
;
START_USE(vq
);
virtio_mb(vq
->weak_barriers
);
old
= vq
->avail_idx_shadow
- vq
->num_added
;
new
= vq
->avail_idx_shadow
;
vq
->num_added
= 0;
if (vq
->event
) {
needs_kick
= vring_need_event(virtio16_to_cpu(_vq
->vdev
, vring_avail_event(&vq
->vring
)),
new
, old
);
} else {
needs_kick
= !(vq
->vring
.used
->flags
& cpu_to_virtio16(_vq
->vdev
, VRING_USED_F_NO_NOTIFY
));
}
END_USE(vq
);
return needs_kick
;
}
可以看到如果支持event_idx就更加old和new以及used->ring[(vr)->num]的范围来通知后端,详细过程可以参考之前写的event_idx相关文章。如果不支持event_idx就看used->flags是否设置了VRING_USED_F_NO_NOTIFY,如果没有设置就通知后端,否则就不通知。这里我们也可以看到,当开启event_idx后,VRING_USED_F_NO_NOTIFY也就失去了作用。此外VRING_USED_F_NO_NOTIFY这个flag是后端设置的,对前端是只读的,用来告诉前端是否需要通知后端。
然后我们看后端处理,这里我们因为使用的是dpdk,而dpdk一般采用的polling模式,设置VRING_USED_F_NO_NOTIFY,所以前端是不会kick后端的,但是如果开启了event_idx呢?我们知道开启event_idx时,前端kick后端需要后端通过used->ring[(vr)->num]告诉前端当前avail desc消耗到什么位置了,但是当前dpdk vhost_user并没有这个处理,因此当前如果打开event_idx,后端dpdk依然是无法收到中断的。
发送队列后端通知前端
我们再看发送队列后端通知前端的过程。发送队列为什么要通知前端呢?因为后端将前端放入avail ring中的数据取出后需要告诉前端对应的数据已经被取走了,你可以把相关数据buffer释放了。而究竟释放那些buffer是取决于uesd ring的,所以通知前端本质上是为了告诉前端uesd ring有更新了。
但是有一点要注意,我们知道uesd ring是前后端共享的,所以如果后端更新了uesd ring,即使不通知前端,前端应该也是可以感知到的。所以前端释放buffer不一定要依赖后端kick。事实上也的确如此。
我们还看发送start_xmit,
start_xmit (kernel4.9 )
static netdev_tx_t
start_xmit(struct sk_buff
*skb
, struct net_device
*dev
)
{
struct virtnet_info
*vi
= netdev_priv(dev
);
int qnum
= skb_get_queue_mapping(skb
);
struct send_queue
*sq
= &vi
->sq
[qnum
];
int err
;
struct netdev_queue
*txq
= netdev_get_tx_queue(dev
, qnum
);
bool kick
= !skb
->xmit_more
;
free_old_xmit_skbs(sq
);
skb_tx_timestamp(skb
);
err
= xmit_skb(sq
, skb
);
if (unlikely(err
)) {
dev
->stats
.tx_fifo_errors
++;
if (net_ratelimit())
dev_warn(&dev
->dev
,
"Unexpected TXQ (%d) queue failure: %d\n", qnum
, err
);
dev
->stats
.tx_dropped
++;
dev_kfree_skb_any(skb
);
return NETDEV_TX_OK
;
}
skb_orphan(skb
);
nf_reset(skb
);
if (sq
->vq
->num_free
< 2+MAX_SKB_FRAGS
) {
netif_stop_subqueue(dev
, qnum
);
if (unlikely(!virtqueue_enable_cb_delayed(sq
->vq
))) {
free_old_xmit_skbs(sq
);
if (sq
->vq
->num_free
>= 2+MAX_SKB_FRAGS
) {
netif_start_subqueue(dev
, qnum
);
virtqueue_disable_cb(sq
->vq
);
}
}
}
if (kick
|| netif_xmit_stopped(txq
))
virtqueue_kick(sq
->vq
);
return NETDEV_TX_OK
;
}
其中开头部分就首先调用free_old_xmit_skbs根据uesd ring将之前的buffer释放掉。具体流程如下图所示。 其中detach_buf负责更加uesd ring来将对应的desc释放掉,并将对应地址dma unmmap。另外要注意的是virtqueue_get_buf函数的最后有如下调用:
virtqueue_get_buf (kernel4.9 )
void *virtqueue_get_buf(struct virtqueue
*_vq
, unsigned int *len
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
void *ret
;
unsigned int i
;
u16 last_used
;
last_used
= (vq
->last_used_idx
& (vq
->vring
.num
- 1));
i
= virtio32_to_cpu(_vq
->vdev
, vq
->vring
.used
->ring
[last_used
].id
);
*len
= virtio32_to_cpu(_vq
->vdev
, vq
->vring
.used
->ring
[last_used
].len
);
……
ret
= vq
->desc_state
[i
].data
;
detach_buf(vq
, i
);
vq
->last_used_idx
++;
if (!(vq
->avail_flags_shadow
& VRING_AVAIL_F_NO_INTERRUPT
))
virtio_store_mb(vq
->weak_barriers
,
&vring_used_event(&vq
->vring
),
cpu_to_virtio16(_vq
->vdev
, vq
->last_used_idx
));
END_USE(vq
);
return ret
;
}
当前端没有设置VRING_AVAIL_F_NO_INTERRUPT时,会更新avail->ring[(vr)->num],这也是后端开启event_idx时kick前端的条件。VRING_AVAIL_F_NO_INTERRUPT是前端设置,后端只读的。为什么要更新avail->ring[(vr)->num]呢?avail->ring[(vr)->num]中记录的前端已经处理到那个uesd idx了,因为可以及时告诉后端前端处理到什么位置了,后端来根据情况决定是否需要kick前端。
那现在问题又回来了,对于txq既然前端不需要后端kick也能释放buffer,那后端kick有什么用呢?我们再回头看一下start_xmit发送函数,有一下逻辑。
start_xmit (kernel4.9 )
static netdev_tx_t
start_xmit(struct sk_buff
*skb
, struct net_device
*dev
)
{
struct virtnet_info
*vi
= netdev_priv(dev
);
int qnum
= skb_get_queue_mapping(skb
);
struct send_queue
*sq
= &vi
->sq
[qnum
];
int err
;
struct netdev_queue
*txq
= netdev_get_tx_queue(dev
, qnum
);
bool kick
= !skb
->xmit_more
;
free_old_xmit_skbs(sq
);
skb_tx_timestamp(skb
);
err
= xmit_skb(sq
, skb
);
if (unlikely(err
)) {
dev
->stats
.tx_fifo_errors
++;
if (net_ratelimit())
dev_warn(&dev
->dev
,
"Unexpected TXQ (%d) queue failure: %d\n", qnum
, err
);
dev
->stats
.tx_dropped
++;
dev_kfree_skb_any(skb
);
return NETDEV_TX_OK
;
}
skb_orphan(skb
);
nf_reset(skb
);
if (sq
->vq
->num_free
< 2+MAX_SKB_FRAGS
) {
netif_stop_subqueue(dev
, qnum
);
if (unlikely(!virtqueue_enable_cb_delayed(sq
->vq
))) {
free_old_xmit_skbs(sq
);
if (sq
->vq
->num_free
>= 2+MAX_SKB_FRAGS
) {
netif_start_subqueue(dev
, qnum
);
virtqueue_disable_cb(sq
->vq
);
}
}
}
if (kick
|| netif_xmit_stopped(txq
))
virtqueue_kick(sq
->vq
);
return NETDEV_TX_OK
;
}
当前端发送速率过快,从而vq->num_free较少时会调用netif_stop_subqueue(将队列状态设置为__QUEUE_STATE_DRV_XOFF),这样队列的start_xmit函数下次在__dev_queue_xmit中就不会被调用。要想打破这样一个状态,就需要后端的kick了。这里还有一个十分关键的函数,就这在stop_queue之后调用的virtqueue_enable_cb_delayed函数。这个函数中有着至关重要的一个操作,如下:
virtqueue_enable_cb_delayed
bool
virtqueue_enable_cb_delayed(struct virtqueue
*_vq
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
u16 bufs
;
START_USE(vq
);
if (vq
->avail_flags_shadow
& VRING_AVAIL_F_NO_INTERRUPT
) {
vq
->avail_flags_shadow
&= ~VRING_AVAIL_F_NO_INTERRUPT
;
if (!vq
->event
)
vq
->vring
.avail
->flags
= cpu_to_virtio16(_vq
->vdev
, vq
->avail_flags_shadow
);
}
bufs
= (u16
)(vq
->avail_idx_shadow
- vq
->last_used_idx
) * 3 / 4;
virtio_store_mb(vq
->weak_barriers
,
&vring_used_event(&vq
->vring
),
cpu_to_virtio16(_vq
->vdev
, vq
->last_used_idx
+ bufs
));
if (unlikely((u16
)(virtio16_to_cpu(_vq
->vdev
, vq
->vring
.used
->idx
) - vq
->last_used_idx
) > bufs
)) {
END_USE(vq
);
return false
;
}
END_USE(vq
);
return true
;
}
首先这个函数做了一个关键操作,就是取消设置VRING_AVAIL_F_NO_INTERRUPT,使后端可以发送中断上来。那么VRING_AVAIL_F_NO_INTERRUPT这个flag是什么时候被置上的呢?这个我们后面回答。另外一点就是会更新更新avail->ring[(vr)->num]告诉前端uesd desc当前消耗到哪里了,但注意这里不是更新为vq->last_used_idx,而是还加了一个bufs,为的就是让后端延时一会再kick前端。
下面我们看一下dpdk后端kick前端的时机,我们以split方式为例说明。
virtio_dev_tx_split(dpdk18.11)
static __rte_always_inline uint16_t
virtio_dev_tx_split(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
,
struct rte_mempool
*mbuf_pool
, struct rte_mbuf
**pkts
, uint16_t count
)
{
uint16_t i
;
uint16_t free_entries
;
if (unlikely(dev
->dequeue_zero_copy
)) {
struct zcopy_mbuf
*zmbuf
, *next
;
for (zmbuf
= TAILQ_FIRST(&vq
->zmbuf_list
);
zmbuf
!= NULL; zmbuf
= next
) {
next
= TAILQ_NEXT(zmbuf
, next
);
if (mbuf_is_consumed(zmbuf
->mbuf
)) {
update_shadow_used_ring_split(vq
,
zmbuf
->desc_idx
, 0);
TAILQ_REMOVE(&vq
->zmbuf_list
, zmbuf
, next
);
restore_mbuf(zmbuf
->mbuf
);
rte_pktmbuf_free(zmbuf
->mbuf
);
put_zmbuf(zmbuf
);
vq
->nr_zmbuf
-= 1;
}
}
if (likely(vq
->shadow_used_idx
)) {
flush_shadow_used_ring_split(dev
, vq
);
vhost_vring_call_split(dev
, vq
);
}
}
......
for (i
= 0; i
< count
; i
++) {
......
err
= copy_desc_to_mbuf(dev
, vq
, buf_vec
, nr_vec
, pkts
[i
],
mbuf_pool
);
......
}
vq
->last_avail_idx
+= i
;
if (likely(dev
->dequeue_zero_copy
== 0)) {
do_data_copy_dequeue(vq
);
if (unlikely(i
< count
))
vq
->shadow_used_idx
= i
;
if (likely(vq
->shadow_used_idx
)) {
flush_shadow_used_ring_split(dev
, vq
);
vhost_vring_call_split(dev
, vq
);
}
}
return i
;
}
如果是零拷贝,则在后端接受逻辑的开始,判断上一次接受的报文是否dma完成,如果完成则更新uesd ring,调用vhost_vring_call_split kick前端。如果不是零拷贝模式,则在将报文从desc拷贝出来后,更新uesd ring,调用vhost_vring_call_split kick前端。关于vhost_vring_call_split的实现在前面已经介绍过了,这里不再重复。
下面看前端收到后端的中断是如何处理的。前面已经介绍过了,对于发送队列,其注册的中断回调函数中会调用skb_xmit_done。
skb_xmit_done(kernel 4.9)
static void skb_xmit_done(struct virtqueue
*vq
)
{
struct virtnet_info
*vi
= vq
->vdev
->priv
;
virtqueue_disable_cb(vq
);
netif_wake_subqueue(vi
->dev
, vq2txq(vq
));
}
其中virtqueue_disable_cb会将vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT,这样后端就不会发生kick到前端了(不开启event_idx的情况),然后调用netif_wake_subqueue唤醒被stop的queue(清除__QUEUE_STATE_DRV_XOFF 的state)。
virtqueue_disable_cb(kernel)
void virtqueue_disable_cb(struct virtqueue
*_vq
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
if (!(vq
->avail_flags_shadow
& VRING_AVAIL_F_NO_INTERRUPT
)) {
vq
->avail_flags_shadow
|= VRING_AVAIL_F_NO_INTERRUPT
;
if (!vq
->event
)
vq
->vring
.avail
->flags
= cpu_to_virtio16(_vq
->vdev
, vq
->avail_flags_shadow
);
}
}
这里又有个疑问,唤醒stop 状态的queue容易理解,可以设置上了VRING_AVAIL_F_NO_INTERRUPT后,什么时候取消呢?其实前面我们已经介绍过了,就是下次stop queue时调用virtqueue_enable_cb_delayed的处理中。所以我们可以看到,即使不开启event_idx,也不是每次uesd 变化都会kick前端的,而是只有queue stop后才会kick,正常状态前端是会设置VRING_AVAIL_F_NO_INTERRUPT不让后端kick的。
接收队列后端通知前端
对于接收队列,后端会在将mbuf数据拷贝到avail desc中后,更新uesd ring,然后kick前端。kick前端的目的就是告诉前端,我有数据发送给你了(更新了uesd ring),你可以来取数据了。我们以split方式的接收方向为例。其接受逻辑在virtio_dev_rx_split中实现。
virtio_dev_rx_split(dpdk 18.11)
static __rte_always_inline uint32_t
virtio_dev_rx_split(struct virtio_net
*dev
, struct vhost_virtqueue
*vq
,
struct rte_mbuf
**pkts
, uint32_t count
)
{
uint32_t pkt_idx
= 0;
uint16_t num_buffers
;
struct buf_vector buf_vec
[BUF_VECTOR_MAX
];
uint16_t avail_head
;
rte_prefetch0(&vq
->avail
->ring
[vq
->last_avail_idx
& (vq
->size
- 1)]);
avail_head
= *((volatile uint16_t
*)&vq
->avail
->idx
);
for (pkt_idx
= 0; pkt_idx
< count
; pkt_idx
++) {
uint32_t pkt_len
= pkts
[pkt_idx
]->pkt_len
+ dev
->vhost_hlen
;
uint16_t nr_vec
= 0;
if (unlikely(reserve_avail_buf_split(dev
, vq
,
pkt_len
, buf_vec
, &num_buffers
,
avail_head
, &nr_vec
) < 0)) {
VHOST_LOG_DEBUG(VHOST_DATA
,
"(%d) failed to get enough desc from vring\n",
dev
->vid
);
vq
->shadow_used_idx
-= num_buffers
;
break;
}
rte_prefetch0((void *)(uintptr_t
)buf_vec
[0].buf_addr
);
VHOST_LOG_DEBUG(VHOST_DATA
, "(%d) current index %d | end index %d\n",
dev
->vid
, vq
->last_avail_idx
,
vq
->last_avail_idx
+ num_buffers
);
if (copy_mbuf_to_desc(dev
, vq
, pkts
[pkt_idx
],
buf_vec
, nr_vec
,
num_buffers
) < 0) {
vq
->shadow_used_idx
-= num_buffers
;
break;
}
vq
->last_avail_idx
+= num_buffers
;
}
do_data_copy_enqueue(dev
, vq
);
if (likely(vq
->shadow_used_idx
)) {
flush_shadow_used_ring_split(dev
, vq
);
vhost_vring_call_split(dev
, vq
);
}
return pkt_idx
;
}
其中kick前端的处理在最后的vhost_vring_call_split函数中,这个我们在“后端vhost_user的kick方式”中已经介绍过,这里就不再重复了。
然后我们看前端的通知处理。根据前面的介绍,接受队列前端注册的中断处理函数最终会调用到skb_recv_done。
skb_recv_done(kernel 4.9)
static void skb_recv_done(struct virtqueue
*rvq
)
{
struct virtnet_info
*vi
= rvq
->vdev
->priv
;
struct receive_queue
*rq
= &vi
->rq
[vq2rxq(rvq
)];
if (napi_schedule_prep(&rq
->napi
)) {
virtqueue_disable_cb(rvq
);
__napi_schedule(&rq
->napi
);
}
}
我们看到这个函数主要工作就是调用virtqueue_disable_cb给vring.avail->flags设置上VRING_AVAIL_F_NO_INTERRUPT从而禁止后端发送中断(不开启event_idx的情况),然后唤起NAPI处理。所以在NAPI的情况后端通知是被关闭的。那么这个flag什么时候会被打开呢?答案就是在virtio_net的NAPI处理逻辑中,即virtnet_poll函数。
virtnet_poll(kernel 4.9)
static int virtnet_poll(struct napi_struct
*napi
, int budget
)
{
struct receive_queue
*rq
=
container_of(napi
, struct receive_queue
, napi
);
unsigned int r
, received
;
received
= virtnet_receive(rq
, budget
);
if (received
< budget
) {
r
= virtqueue_enable_cb_prepare(rq
->vq
);
napi_complete_done(napi
, received
);
if (unlikely(virtqueue_poll(rq
->vq
, r
)) &&
napi_schedule_prep(napi
)) {
virtqueue_disable_cb(rq
->vq
);
__napi_schedule(napi
);
}
}
return received
;
}
在NAPI处理流程中如果received < budget,证明本轮数据接收已经比较少了,NAPI过程可能要退出了,这时调用virtqueue_enable_cb_prepare将之前的VRING_AVAIL_F_NO_INTERRUPT取消,从NAPI模式进入中断模式。
virtqueue_enable_cb_prepare(kernel 4.9)
unsigned virtqueue_enable_cb_prepare(struct virtqueue
*_vq
)
{
struct vring_virtqueue
*vq
= to_vvq(_vq
);
u16 last_used_idx
;
START_USE(vq
);
if (vq
->avail_flags_shadow
& VRING_AVAIL_F_NO_INTERRUPT
) {
vq
->avail_flags_shadow
&= ~VRING_AVAIL_F_NO_INTERRUPT
;
if (!vq
->event
)
vq
->vring
.avail
->flags
= cpu_to_virtio16(_vq
->vdev
, vq
->avail_flags_shadow
);
}
vring_used_event(&vq
->vring
) = cpu_to_virtio16(_vq
->vdev
, last_used_idx
= vq
->last_used_idx
);
END_USE(vq
);
return last_used_idx
;
}
注意这个函数除了取消VRING_AVAIL_F_NO_INTERRUPT设置之外,还会更新avail->ring[(vr)->num],以供开启event_idx时后端使用。
接收队列前端通知后端
下面看接收方向前端通知的过程。首先要清除接收方向前端通知后端的目的,那就是告诉后端avail ring已经更新了(有了更多空buffer),你可以继续放入更多数据了。从这里我们也可以看出,前端通知后端,无论发送还是接收方向,都是告诉后端有了更多的avail desc,而后端通知前端,都是告诉前端有了更多的uesd 的desc。然后我们看前端通知后端的时机,要想知道前端再何时通知后端,我们需要对前端的数据接收流程有个清晰的认识。下面这个图描述了前端vhost_net接收数据的过程。 而通知后端的时机就在try_fill_recv函数调用中。
try_fill_recv(kernel 4.9)
static bool
try_fill_recv(struct virtnet_info
*vi
, struct receive_queue
*rq
,
gfp_t gfp
)
{
int err
;
bool oom
;
gfp
|= __GFP_COLD
;
do {
if (vi
->mergeable_rx_bufs
)
err
= add_recvbuf_mergeable(rq
, gfp
);
else if (vi
->big_packets
)
err
= add_recvbuf_big(vi
, rq
, gfp
);
else
err
= add_recvbuf_small(vi
, rq
, gfp
);
oom
= err
== -ENOMEM
;
if (err
)
break;
} while (rq
->vq
->num_free
);
virtqueue_kick(rq
->vq
);
return !oom
;
}
这个函数首先会根据是否支持mergeable已经是否支持收大包来向avail desc注入对应的空buffer。每种情况下注入buffer产生的desc chain长度是不同的。这里不是重点,我们就不展开了。下面看关键的virtqueue_kick函数。
virtqueue_kick(kernel 4.9)
bool
virtqueue_kick(struct virtqueue
*vq
)
{
if (virtqueue_kick_prepare(vq
))
return virtqueue_notify(vq
);
return true
;
}
其中真正向后端发送通知的是virtqueue_notify,但是首先要通过virtqueue_kick_prepare的判断。virtqueue_kick_prepare这个函数我们之前已经介绍过了,如果不开启event_idx时,会根据vring.used->flags是否设置VRING_USED_F_NO_NOTIFY来决定是否kick后端,而开启event_idx时,则会根据后端填入(vr)->used->ring[(vr)->num]中的后端消耗位置来决定是否kick。
同样如果后端使用的是dpdk vhost_user,那么当前后端是不会写(vr)->used->ring[(vr)->num]告诉前端自己的avail使用位置的,如果开启了event_idx后端还是无法收到中断的。那如果dpdk使用中断模式,这里收不到中断是否会有问题呢?我们换位思考一下,对比一下“发送队列后端通知前端”的场景,在“发送队列后端通知前端”中,如果后端不通知前端,那么前端一旦感觉到没有可用的avail desc后就会stop queue,之后就无法被唤醒了。而这里的场景,vhost_user后端没有类似stop queue的操作,所以即使收不到前端的中断也没有问题。但是在“发送队列前端通知后端”的场景中,如果dpdk采用中断模式切开启了event_idx,那么vhost_user就会因为收不到中断而无法取出前端发送的报文。所以对应想使用中断模式,且开启event_idx的场景,vhost_user需要添加对(vr)->used->ring[(vr)->num]的处理。
原文链接:http://m.blog.chinaunix.net/uid-28541347-id-5819699.html