Blocking 本质上是一个executor。可以阻塞并等待 future 异步代码的执行直至完成。大致的原理是启用一个线程池,将阻塞的代码放到线程池中执行,通过一个channel返回执行的结果。
之所以要学习这个库的原因是这里提供了异步框架 executor 实现思路。这个库的作者在 smol 这个轻量级的异步运行库实现了完整的 executor 功能,而 smol 实际上就是 async-std 的基础。
To convert async to blocking, block on async code with block_on(), block_on!, or BlockOn. To convert blocking to async, unblock blocking code with unblock(), unblock!, or Unblock.
库本身的依赖:
futures-channel: std, sinkfutures-util: std, io, sinkonce_cell,一个一次性的cell。可以用来完成全局变量的惰性初始化parking,使用条件变量来实现操作系统线程的挂起和重新执行waker-fn,用闭包实现唤醒功能。这个实际上就是用来唤醒future首先,将准备执行的future封装进一个闭包,再打包生成一个新的对象 runnable,放入一个全局队列。然后通知线程池。准备执行。闭包执行的结果将使用channel发送回来。
impl Executor { /// Spawns a future onto this executor. /// /// Returns a [`Task`] handle for the spawned task. fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> { // Wrap the future into one that sends the output into a channel. let (s, r) = oneshot::channel(); let future = async move { let _ = s.send(future.await); }; // Create a task and schedule it for execution. let runnable = Arc::new(Runnable { state: AtomicUsize::new(0), future: Mutex::new(Box::pin(future)), }); // Execute是一个全局的单例。维护线程池的状态。 EXECUTOR.schedule(runnable); // Return a handle that retrieves the output of the future. Box::pin(async { r.await.expect("future has panicked") }) } /// Runs the main loop on the current thread. /// /// This function runs blocking tasks until it becomes idle and times out. // 这是线程运行的主循环。首先加锁获取全局的队列,从队列里取出一个runnable对象。并运行。 fn main_loop(&'static self) { let mut inner = self.inner.lock().unwrap(); loop { // This thread is not idle anymore because it's going to run tasks. inner.idle_count -= 1; // Run tasks in the queue. while let Some(runnable) = inner.queue.pop_front() { // We have found a task - grow the pool if needed. //这里需要注意。innner 被转移进入grow_pool,并随之析构。也就是说,这之后已经解锁的Mutex self.grow_pool(inner); // Run the task. // runnable 维护一个闭包运行的简单状态机。run() let _ = panic::catch_unwind(|| runnable.run()); // Re-lock the inner state and continue. // 因为之前Mutex已经释放了,所以这里需要重新加锁 inner = self.inner.lock().unwrap(); } // This thread is now becoming idle. inner.idle_count += 1; // Put the thread to sleep until another task is scheduled. let timeout = Duration::from_millis(500); let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap(); inner = lock; // If there are no tasks after a while, stop this thread. if res.timed_out() && inner.queue.is_empty() { inner.idle_count -= 1; inner.thread_count -= 1; break; } } } /// Schedules a runnable task for execution. // 这里将包含了闭包的对象。放入全局单例的队列中,并通知线程池。在必要时会增加线程池的大小。 fn schedule(&'static self, runnable: Arc<Runnable>) { let mut inner = self.inner.lock().unwrap(); inner.queue.push_back(runnable); // Notify a sleeping thread and spawn more threads if needed. self.cvar.notify_one(); self.grow_pool(inner); } /// Spawns more blocking threads if the pool is overloaded with work. fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) { // If runnable tasks greatly outnumber idle threads and there aren't too many threads // already, then be aggressive: wake all idle threads and spawn one more thread. while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < 500 { // The new thread starts in idle state. inner.idle_count += 1; inner.thread_count += 1; // Notify all existing idle threads because we need to hurry up. self.cvar.notify_all(); // Generate a new thread ID. static ID: AtomicUsize = AtomicUsize::new(1); let id = ID.fetch_add(1, Ordering::Relaxed); // Spawn the new thread. thread::Builder::new() .name(format!("blocking-{}", id)) .spawn(move || self.main_loop()) .unwrap(); } } }Block<T> 数据结构实现将异步操作包装为同步的模式。暂时没发现有什么用处。 This type implements traits [Iterator], [Read], [Write], or [Seek] if the inner type implements [Stream], [AsyncRead], [AsyncWrite], or [AsyncSeek], respectively.
将一个同步操作的闭包做异步封装在线程池上执行。
pub async fn unblock<T, F>(f: F) -> T where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { let (sender, receiver) = oneshot::channel(); let task = Executor::spawn(async move { let _ = sender.send(f()); }); task.await; receiver.await.expect("future has panicked") }将同步IO操作的句柄封装为异步接口。实现了几个重要的异步操作的 Trait。 This type implements traits [Stream], [AsyncRead], [AsyncWrite], or [AsyncSeek] if the inner type implements [Iterator], [Read], [Write], or [Seek], respectively.
而在 futures-io 中定义的 AsyncReadExt 等 Trait 指定了所有实现 AsyncRead 的类型自动实现 AsyncReadExt ,因此 Unblock<T> 即实现了AsyncReadExt ,拥有AsyncReadExt 的 read 相关方法。因此,可以对Unblock<T> 执行 read,返回一个 Future,await 执行后获得 read 结果。注意,Stream 则稍有不同。 如同步操作 T 返回一个 Iterator,则对应异步 Unblock<T> 返回 Stream。