tokio 学习

Rust Async 异步编程 简易教程

理解tokio核心(1): runtime

Rust 中的异步编程

Rust 中的 async 和线程

OS 线程

  • 适用于少量任务,有内存和 CPU 开销,且线程生成和线程间切换非常昂贵
  • 线程池可以降低一些成本
  • 允许重用同步代码,代码无需大概,无需特定编程模型
  • 有些系统支持修改线程优先级

Async

  • 显著降低内存和 CPU 开销
  • 同等条件下,支持比线程多几个数量级的任务(少数线程支撑大量任务)
  • 可执行文件大(需要生成状态机,每个可执行文件捆绑一个异步运行时)
  • async 把一段代码转化为一个实现了 Future trait 的状态机。虽然在同步方法中调用阻塞函数会阻塞整个线程,但阻塞的 Future 将放弃对线程的控制,从而允许其他 Future 来运行。

asnyc 并不是比线程好,只是不同而已!

异步执行环境

异步执行环境(Async Execution Context)是一种机制,用于管理和调度异步任务的执行。在 Rust 中,Tokio 提供了一种异步执行环境,它基于 Futures 模型,允许你编写和执行异步任务。
下面是异步执行环境的一些关键概念:

  • 事件循环(Event Loop):异步执行环境通常由一个事件循环驱动。事件循环不断地检查异步任务的状态,执行就绪的任务,并处理事件。它负责调度任务的执行,并确保异步任务按照适当的顺序执行。
  • 异步任务(Async Tasks):异步执行环境下的任务是基于 Future 的。Future 表示一个尚未完成的异步操作,并提供了一种方便的方式来等待异步操作完成并获取其结果。在 Tokio 中,你可以创建异步任务,然后将它们提交给 Tokio 运行时执行
  • 任务调度(Task Scheduling):异步执行环境负责调度异步任务的执行。它会根据任务的就绪状态和优先级,决定何时执行任务,并在需要时将任务挂起和恢复。
  • 非阻塞执行(Non-blocking Execution):异步执行环境通过非阻塞方式执行异步任务。这意味着当一个任务在等待某些操作完成时,事件循环可以继续处理其他任务和事件,而不必等待该任务完成
  • 并发性(Concurrency):异步执行环境可以管理多个任务的并发执行。通过使用异步编程模型,你可以编写高效的并发代码,同时处理多个任务,而不会受到线程阻塞的影响。

总之,异步执行环境提供了一个灵活的框架,用于管理和执行异步任务。它通过事件循环Future 模型,实现了高效的非阻塞并发执行,让你可以编写高性能的异步应用程序。

理解 async

异步编程,诀窍就是当 CPU 等待外部事件或动作时,异步运行时会安排其他可继续执行的任务在 CPU 上执行。而当从磁盘或 I/O 子系统的系统终端到达的时候,异步运行时会知道识别这事,并安排原来的任务继续执行。

一般来说,I/O 受限(I/O Bound)的程序(程序执行的速度依赖于 I/O 子系统的速度) CPU 受限(CPU Bound)的任务(程序执行的速度依赖于CPU的速度)可能更适合异步任务的执行。

async、.await 关键字是 Rust 标准库里拥有异步编程的内置核心原语集的代表,就是语法糖。

Future

Future 是 Rust 中的一个重要概念,用于表示一个尚未完成的异步操作。Future 代表了一个异步计算的结果,可能会在未来的某个时间点产生值或错误。

Future 可以表示:

  • 下一次网络数据包的到来
  • 下一次鼠标的移动
  • 或者仅仅是经过一段时间的时间点

Rust 的异步函数都会返回 Future,Future 基本上就是代表着延迟的计算。

以下是关于 Future 的一些重要概念和特点:

  • 表示异步计算:Future 是 Rust 中表示异步计算的一种方式。它表示了一个尚未完成的异步操作,并提供了一种方法来等待操作完成并获取结果。
  • 延迟计算:Future 具有延迟计算的特性,即它不会立即执行,而是在未来的某个时间点执行。Future 可以被等待(await),直到它产生值或错误为止。
  • 状态:Future 有三种可能的状态:Pending(等待中)、Ready(准备就绪)、Error(错误)。当 Future 处于 Pending 状态时,表示异步操作尚未完成;当 Future 处于 Ready 状态时,表示异步操作已经完成,并且产生了一个值;当 Future 处于 Error 状态时,表示异步操作产生了一个错误。
  • 组合和转换:Future 可以被组合和转换,以便实现复杂的异步操作。例如,可以使用 map、and_then、or_else 等方法来对 Future 进行转换和处理,以便处理异步操作的结果。
  • 异步编程:Future 是异步编程的基础,它与 async/await 语法一起使用,使得异步编程变得更加简单和直观。通过 async/await 语法,可以方便地编写和组合异步操作,而不必显式地使用 Future 的 API。

总的来说,Future 是 Rust 中表示异步计算的一种抽象,它提供了一种方法来等待异步操作完成并获取结果。通过组合和转换 Future,可以实现复杂的异步操作。Future 与 async/await 语法一起使用,可以使得异步编程变得更加简单和直观。

通俗的理解 Future Trait

Future 代表着一种你可以检验其是否完成的操作
Future 可以通过调用 poll 函数来取得进展

  • poll 函数会驱动 Future 尽可能接近完成
  • 如果 Future 完成了,就返回 poll::Ready(result),其中 result 就是最终的结果
  • 如果 Future 还无法完成,就返回 poll::Pending,并当 Future 准备好取得更多进展时调用一个 wakerwake() 函数

针对一个 Future,你唯一能做的就是使用 poll 来敲它,直到一个值掉出来

创建 future

1)使用 async/await 关键字定义异步函数或异步块。这是最常见和推荐的方式。
async fn 返回的是 Future(本质就是第二种的写法),这个 Future 需要一个执行者来运行。

async fn example_async_function() -> i32 {// Some async operations here42
}async {// Some async operations here42
};

2)使用 Future trait 的 async fn 方法。方法(1)是此方法的语法糖。

use std::future::Future;
//				返回一个实现了 Future trait 的类型,其 Output 类型为 i32
fn example_async_function() -> impl Future<Output = i32> {
// 创建一个异步任务并将其返回,并不会执行异步任务中的代码async {// Some async operations here42}
}

这个异步函数的定义中使用了 async 关键字,但它并不会立即执行。相反,调用者需要将这个异步函数返回的异步任务执行在一个异步执行上下文中,通常是通过 await 表达式或者将异步任务提交给异步运行时来执行。

3)使用异步库提供的函数或宏来创建 Future。比如在 Tokio 中,可以使用 tokio::spawn()创建一个异步任务的 Future

use tokio::task;
// task::spawn() 创建一个异步任务,其参数是一个异步闭包,都是Future
task::spawn(async {// Some async operations here42
});

4)使用 futures crate 中的方法创建 Future。比如使用 futures::future::ready() 创建一个立即返回某个值的 Future,或者使用 futures::future::pending() 创建一个永远不会完成的 Future。

use futures::future;future::ready(42);
future::pending();

5)使用 async_stdtokio 等异步运行时提供的方法创建 Future。

async_std::future::ready(42);
tokio::time::delay_for(Duration::from_secs(1)).then(|_| async { 42 });

future 的定义

pub trait Future {/// The type of value produced on completion.#[stable(feature = "futures_api", since = "1.36.0")]#[rustc_diagnostic_item = "FutureOutput"]type Output; // future执行成功返回的类型/// Attempt to resolve the future to a final value, registering/// the current task for wakeup if the value is not yet available.////// # Return value////// This function returns:////// - [`Poll::Pending`] if the future is not ready yet/// - [`Poll::Ready(val)`] with the result `val` of this future if it///   finished successfully.////// Once a future has finished, clients should not `poll` it again.////// When a future is not ready yet, `poll` returns `Poll::Pending` and/// stores a clone of the [`Waker`] copied from the current [`Context`]./// This [`Waker`] is then woken once the future can make progress./// For example, a future waiting for a socket to become/// readable would call `.clone()` on the [`Waker`] and store it./// When a signal arrives elsewhere indicating that the socket is readable,/// [`Waker::wake`] is called and the socket future's task is awoken./// Once a task has been woken up, it should attempt to `poll` the future/// again, which may or may not produce a final value.////// Note that on multiple calls to `poll`, only the [`Waker`] from the/// [`Context`] passed to the most recent call should be scheduled to/// receive a wakeup.////// # Runtime characteristics////// Futures alone are *inert*; they must be *actively* `poll`ed to make/// progress, meaning that each time the current task is woken up, it should/// actively re-`poll` pending futures that it still has an interest in.////// The `poll` function is not called repeatedly in a tight loop -- instead,/// it should only be called when the future indicates that it is ready to/// make progress (by calling `wake()`). If you're familiar with the/// `poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures/// typically do *not* suffer the same problems of "all wakeups must poll/// all events"; they are more like `epoll(4)`.////// An implementation of `poll` should strive to return quickly, and should/// not block. Returning quickly prevents unnecessarily clogging up/// threads or event loops. If it is known ahead of time that a call to/// `poll` may end up taking awhile, the work should be offloaded to a/// thread pool (or something similar) to ensure that `poll` can return/// quickly.////// # Panics////// Once a future has completed (returned `Ready` from `poll`), calling its/// `poll` method again may panic, block forever, or cause other kinds of/// problems; the `Future` trait places no requirements on the effects of/// such a call. However, as the `poll` method is not marked `unsafe`,/// Rust's usual rules apply: calls must never cause undefined behavior/// (memory corruption, incorrect use of `unsafe` functions, or the like),/// regardless of the future's state.////// [`Poll::Ready(val)`]: Poll::Ready/// [`Waker`]: crate::task::Waker/// [`Waker::wake`]: crate::task::Waker::wake#[lang = "poll"]#[stable(feature = "futures_api", since = "1.36.0")]fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;// poll方法被异步运行时调用,用来检查异步任务是否已经完成// 返回的Poll是一个枚举,其中Poll::Pending表示还未完成,Poll::Ready(value)表示已经完成
}

谁执行 poll 方法?

是异步执行器来,它是异步运行时的一部分。异步执行器会管理一个 Future 的集合,并通过调用 Future 上的 poll 方法来驱动他们完成。所以函数或代码块在前面加上 async 关键字后,就相当于告诉异步执行器它会返回 Future,这个 Future 需要被驱动直到完成。

利用 Tokio 库来理解 Future

图取自杨旭老师的视频讲解

自定义 Future

异步任务 h1 中的 Future 一直返回 PollPending,这意味这个 Future 将永远无法执行。

use std::future::Future;
use std::pin::Pin;
// Context包含异步任务的上下文,可以用来唤醒当前的任务
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;struct ReadFileFuture {}impl Future for ReadFileFuture {type Output = String;// 这个Future会被异步运行时反复地poll,把这个Future给固定住(pin住),固定到内存的某一特定位置,对异步代码的安全性是必要的fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {println!("tokio stop polling me", );Poll::Pending}
}#[tokio::main]
async fn main() {println!("hello before reading file!", );
//  这里的Future一直返回 `PollPending`,这意味这个Future将永远无法执行let h1 = tokio::spawn(async {let future1 = ReadFileFuture {};future1.await;});
// 异步任务h1被poll一次之后执行完毕,就不会再被poll和执行了let h2 = tokio::spawn(async {let file2_contents = read_from_file2().await;println!("{:?}", file2_contents);});let _ = tokio::join!(h1, h2);
}fn read_from_file2() -> impl Future<Output = String> {async {sleep(Duration::new(2, 0));println!("{:?}", "processing file 2");String::from("hello, there from file 2")}
}

Tokio 执行器如何知道何时再次 poll 第一个 Future?

Tokio 执行器肯定不会一直 poll,Tokio使用一个 Weaker 组件来处理这件事。当被异步执行器 poll 过的任务还没有准备好产生值的时候,这个任务就被注册到一个 Weaker。Weaker 会有一个处理程序(handle),它会被存储在任务关联的 Context 对象中。

Weaker 有一个 wake() 方法,可以用来告诉异步执行器关联的任务应该被唤醒了。当 wake() 方法被调用了,Tokio 执行器就会被通知是时候再次 poll 这个异步的任务了,具体方式是调用任务上的 poll() 函数。简单来说,就是通过 wake() 函数,执行器就确切的知道哪些 Future 已准备好进行 poll 的调用。

Tokio 库组件

在这里插入图片描述

Tokio 组件的简化工作流程

在这里插入图片描述

存储或传递 Future

通常,async 的函数在调用后会立即 .await,这么做就没有什么问题,但是如果存储 Future或将其传递给其他任务或线程,就会出现生命周期的问题。

一种变通解决办法:把使用引用作为参数的 async fn转为一个 'static future。在 async 块里,将参数和 async fn 的调用捆绑到一起(延长参数的生命周期来匹配 future)。

fn bad() -> impl Future<Output = u8> {let x = 5;borrow_x(&x)  // x的生命周期是不够长的,活不到这个Future未来使用的时候
}fn good() -> impl Future<Output = u8> {async {   // 套一个async块,它返回的是一个 Future
// 这个Future 会记住里边的一些变量,以保证 x 的生命周期和 borrow_x(&x).await 的生命周期是匹配的let x = 5;borrow_x(&x).await}
}

Waker

使用 Waker 实现一个简单的计时器 Future

依赖

futures = "0.3.21"
tokio = { version = "1", features = ["full"] }

src\main.rs

use std::{future::Future,pin::Pin,sync::{Arc, Mutex},task::{Context, Poll, Waker},thread,time::Duration,
};pub struct TimerFuture {shared_state: Arc<Mutex<SharedState>>, 
}// TimerFutre 是要运行在一个任务上,这个任务拥有一个 Waker,// 在Future和等待的线程共享状态
struct SharedState {// 睡眠时间是否已经都过完completed: bool,// TimerFutre 所运行的任务的 Waker 在设置 completed = true 之后,// 线程可以使用它来告诉 TimerFuture 的任务可以唤醒,看到 completed = true 并前进waker: Option<Waker>,  // 线程可以用这个 waker 来告诉 TimerFuture 这个任务可以唤醒了
}// 为 TimerFuture 这个结构体添加 Future trait
impl Future for TimerFuture {type Output = ();// 查看 shared state,看下 timer 是否已经结束fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {let mut shared_state = self.shared_state.lock().unwrap();if shared_state.completed {Poll::Ready(())} else {// 设置 waker 以便当 timer 结束时线程可以唤醒当前任务// 保证 Future 可以再次被 poll,并看到 completed = true// 相比每次克隆 waker,如果只做一次显然更有诱惑力// 但是TimerFuture 可在执行者的任务间移动,这会导致过期的 waker 指向错误的任务,// 从而阻止了 TimerFuture 正确的唤醒// 注意:可以使用 Waker::will_wake 函数来检查这一点// 但为了简单起见,我们就省略了这一点shared_state.waker = Some(cx.waker().clone());Poll::Pending}}
}// 创建一个关联函数,用来创建一个新的 TimerFuture,并将在提供的时限过后完成
impl TimerFuture {pub fn new(duration: Duration) -> Self {let shared_state = Arc::new(Mutex::new(SharedState{completed: false,waker: None,}));// 生成新线程let thread_shared_state = shared_state.clone();thread::spawn(move || {thread::sleep(duration);let mut shared_state = thread_shared_state.lock().unwrap();// 发出信号:计时器已停止并唤醒 Future 被 poll 的最后一个任务// 如果存在的话shared_state.completed = true;if let Some(waker) = shared_state.waker.take() {waker.wake();}});TimerFuture {shared_state}}
}#[tokio::main]  // 使用 tokio 的宏来启动异步 main 函数
async fn main() {println!("Timer starts now!");TimerFuture::new(Duration::from_secs(3)).await;println!("Timer ended!");
}

await 函数

在 Rust 中,await 关键字是用于等待异步操作完成的关键字。它只能在异步函数中使用,并且用于暂停当前异步函数的执行,直到一个异步操作完成并返回结果。

换句话说,在发生阻塞时,它让放弃当前线程的控制权成为可能,这就允许在等待操作完成的时候,允许其他代码取得进展。

当程序执行到 await 关键字时,以下是发生的情况:

如果 await 关键字之前的表达式产生了一个 Future,那么它会开始执行这个 Future,并将当前的异步函数挂起。在 await 关键字之后的代码会暂时被挂起,等待异步操作的结果。当异步操作完成并返回结果时,异步函数会恢复执行,并继续执行 await 关键字之后的代码。

await 关键字的返回值是异步操作的结果,它的类型通常是 Future 所产生的值类型。

await 关键字的使用使得异步代码能够以同步的方式编写,不需要显式地处理回调函数或者使用其他复杂的异步编程模式。这极大地提高了代码的可读性和可维护性。

在多线程执行者上进行 .await

当使用多线程 future 执行者时,future 就可以在线程间移动:

  • 所以 async 体里面用的变量都必须能够在线程间移动
  • 因为任何的 .await 都可能导致切换到一个新的线程

这意味着使用以下类型是不安全的:

  • Rc&RefCell 和任何其他没有实现 Send trait 的类型,包括没实现 Sync trait 的引用
  • 注意:调用 .await 时,只要这些类型不在作用域内,就可以使用它们。

在跨越一个 .await 期间,持有传统的、对 future 无感知的锁,也不是好主意:

  • 可导致线程池锁定
  • 为此,可使用 future::lock 里的 Mutex 而不是 std::sync 里的

异步任务是lazy的

异步任务在被创建后并不会立即执行,而是在被提交给执行环境并开始执行上下文中的事件循环时才会被执行。具体来说,异步任务的执行是由事件循环驱动的,当事件循环开始运行时,它会轮询所有待执行的异步任务,并根据任务的状态和条件来决定是否执行或者继续等待。

一般来说,异步任务会在以下几种情况下被执行:

调用 .await 方法:
当一个异步任务被等待时(例如使用 await 表达式),它会被唤醒并开始执行。
在执行到一个 await 表达式时,事件循环会暂停当前任务的执行,并切换到其他待执行的任务。

async fn example() {// 异步任务逻辑
}async fn main() {let result = example().await;
}

被提交给执行环境:
当一个异步任务被提交给执行环境(如 Tokio 运行时)时,它会被加入到执行队列中等待执行。
当执行环境开始运行事件循环时,它会轮询执行队列中的任务,并执行准备就绪的任务。

use tokio::task;async fn example() {// 异步任务逻辑
}fn main() {let rt = tokio::runtime::Runtime::new().unwrap();rt.spawn(example());
}

总之,异步任务的执行是由事件循环控制的,在事件循环开始运行并开始轮询执行队列时,待执行的异步任务才会被执行。

tokio创建的线程进行切换时,需要上下文切换吗

在 Tokio 中,当线程执行异步任务时,通常不需要进行上下文切换。这是因为 Tokio 的异步执行模型是非阻塞的,它使用事件驱动的方式执行任务,而不是依赖于线程的阻塞和切换。

在 Tokio 中,异步任务通过 Future 和异步函数来表示,并由事件循环调度执行。当一个异步任务需要等待某些事件发生时(例如等待 I/O 操作完成、等待定时器触发等),它会将自己挂起,并注册一个回调函数来处理事件发生时的通知。这样,当事件发生时,事件循环会唤醒相应的任务,并继续执行。

由于 Tokio 的异步执行模型是非阻塞的,并且通过事件循环驱动任务的执行,因此不需要在任务之间进行显式的上下文切换。相反,任务的执行是事件驱动的,只有在等待事件时才会挂起,而不会占用线程的执行时间。这样可以避免传统多线程并发模型中因上下文切换而导致的性能损失。

总之,在 Tokio 中,异步任务的执行是通过事件驱动的方式实现的,通常不需要进行显式的上下文切换。这是异步编程模型的一个优势,能够提供高效的并发执行和良好的性能。

在实践中,即使使用Tokio,仍然可能存在上下文切换。这是因为某些情况下,任务可能需要等待I/O操作的完成,这可能会导致线程被阻塞。当一个线程被阻塞时,Tokio可以在另一个线程上调度其他任务,以确保整体的执行效率。虽然在 Tokio 运行时上运行的任务可能会涉及到上下文切换,但相比于传统的线程池模型,它大大减少了上下文切换的频率和成本。

tokio::Runtime

Tokio Runtime Builer

tokio::runtime::Builder 构建器用于用于构建一个 Tokio 运行时,构建一个 Tokio 运行时意味着创建一个异步执行环境,以便异步任务能够被调度和执行。

use anyhow::Result;fn main() -> Result<()> {let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();runtime.block_on(async {println!("Hello");});Ok(())
}

Tokio 运行时支持多线程与单线程,分别使用 Builder::new_multi_thread()Builder::new_current_thread() 去设置

enable_all 同时启用了IO和时间驱动程序,可通过 enable_ioenable_time 分别去启用它们

如果在不启用时间驱动程序的运行时中调用tokio::time::sleep将发生 panic

可以通过Builder::new_current_thread().worker_threads(num)去指定使用的工作线程数,不调用 worker_threads() 线程个数默认设置为系统的 cpu 核心数量,可以通过环境变量 TOKIO_WORKER_THREADS 去覆盖这个默认值

tokio::main 宏

#[tokio::main]
async fn main() {println!("Hello world");
}

等于以下代码

fn main() {tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on(async {println!("Hello world");})
}

可通过 flavor 去指定使用单线程还是多线程,通过 worker_threads 去指定线程数

#[tokio::main(flavor = "current_thread")]#[tokio::main(worker_threads = 2)]

block_on(在同步上下文中执行异步代码)

Runtime::block_on 是 Tokio 运行时提供的一个方法,它允许在同步上下文中执行异步代码,并且会一直阻塞当前线程直到传递给它的 Future 完成

不能在 Tokio 异步上下文中调用,示例:

use anyhow::Result;fn main() -> Result<()> {let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build()?;runtime.block_on(async {let temp_runtime = tokio::runtime::Builder::new_current_thread().build().unwrap();temp_runtime.block_on(async {}); // ERROR! 此方法不能在异步上下文中调用!!tokio::spawn(async{let temp_runtime = tokio::runtime::Builder::new_current_thread().build().unwrap();temp_runtime.block_on(async {}); // ERROR! 此方法不能在异步上下文中调用!!}).await;tokio::task::spawn_blocking(||{let temp_runtime = tokio::runtime::Builder::new_current_thread().build().unwrap();temp_runtime.block_on(async {}); // OK!});});Ok(())
}

tokio::spawn、Runtime::spawn、enter

tokio::spawn全局函数,可以在任何地方直接调用)生成一个新的异步任务,它将立即在后台运行,返回一个 JoinHandle,通过 .await 返回的JoinHandle 来等待它执行完成。生成的任务可与其他任务并发执行。可能在当前线程上执行,也可能被发送到不同的线程执行。具体情况取决于当前的 Runtime 配置。

Runtime::spawn 则直接在 self 运行时上生成任务,不需要在上下文上调用

tokio::spawn 函数必须在 Tokio 运行时上下文中调用
错误示例:

use std::thread;
use std::time::Duration;use anyhow::Result;fn main() -> Result<()> {let _ = tokio::spawn(async {tokio::time::sleep(Duration::from_secs(1)).await;println!("Hello World");});thread::sleep(Duration::from_secs(3));Ok(())
}

创建运行时并在 block_on 内部 spawn

use std::time::Duration;
use anyhow::Result;fn main() -> Result<()> {let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;runtime.block_on(async{  // block_on在内部调用了Runtime::enter方法,进入了上下文let _ = tokio::spawn(async {tokio::time::sleep(Duration::from_secs(1)).await;println!("Hello World");}).await;});Ok(())
}

也可以手动调用enter()进入上下文,示例:

use std::thread;
use std::time::Duration;
use anyhow::Result;fn main() -> Result<()> {let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;let enter_guard = runtime.enter();let _ = tokio::spawn(async {tokio::time::sleep(Duration::from_secs(1)).await;println!("Hello World");});thread::sleep(Duration::from_secs(3));Ok(())
}

如果有多个Tokio 运行时,就可以使用runtime.enter() 选择进入哪个运行时上下文了,tokio::spawn 会在你进入的运行时中运行

enter() 返回了一个 enter_guard 对象,当 enter_guard 销毁时,也就退出了这个上下文

use std::thread;
use std::time::Duration;
use anyhow::Result;fn main() -> Result<()> {let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;let enter_guard = runtime.enter();drop(enter_guard); // 退出运行时上下文// 这里的tokio::spawn没有在运行时上下文中执行,因此报错let _ = tokio::spawn(async {tokio::time::sleep(Duration::from_secs(1)).await;println!("Hello World");});thread::sleep(Duration::from_secs(3));Ok(())
}

tokio 的两种线程

Tokio能够在少量线程上同时运行许多任务,通过在每个线程上重复交换当前运行的任务来实现。然而,这种交换只能在 .await 点发生时进行,所以长时间没有达到 .await 的代码将阻止其他任务的运行。为了解决这个问题,Tokio提供了两种类型的线程:核心线程和阻塞线程。

核心线程

核心线程是所有异步代码运行的地方,默认情况下Tokio会为每个CPU核心生成一个核心线程。可以使用环境变量 TOKIO_WORKER_THREADS 来覆盖默认值。这些线程会在运行时创建时立刻创建。

阻塞线程

阻塞线程是根据需要生成的,专用于运行阻塞任务,并在一定时间内保持活动状态,这个时间可以通过 thread_keep_alive 进行配置,默认值为 10 秒。由于Tokio无法像处理异步代码那样交换阻塞任务,因此阻塞线程的数量上限非常大。,可以通过 Buildermax_blocking_threads 去设置它的数量,默认数量为 512,这些线程是按需创建的,如果达到了最大数量,就会在队列里排队。

在阻塞线程中执行的代码不会像异步线程中的任务那样在 await 点换出,它会独占一个线程去执行,所以生成一个堵塞任务接收闭包类型而不是 Future 类型。

在阻塞线程中发生阻塞,不会阻碍异步任务的运行,因为异步任务只会被调度到核心线程中交替运行。

Runtime::spawn_blocking

要生成一个阻塞任务,应该使用 spawn_blocking 函数,接收一个闭包,并在阻塞线程中运行它

#[tokio::main]
async fn main() {// This is running on a core thread.let blocking_task = tokio::task::spawn_blocking(|| {// This is running on a blocking thread.// Blocking here is ok.});// We can wait for the blocking task like this:// If the blocking task panics, the unwrap below will propagate the// panic.blocking_task.await.unwrap();
}

spawn一样返回JoinHandle类型,可通过.await它来等待它执行完成并获取到返回值

thread::spawn创建的线程与堵阻塞线程一样都是独占的,但是spawn_blocking返回tokio::task::JoinHandle类型实现了Future,你可以在一个异步任务中等待它,而thread::spawn做不到这点,它返回的std::thread::JoinHandle没有实现 Future

Runtime::block_in_place

block_in_place 接收一个闭包,并在当前线程阻塞运行,并返回闭包的返回值

为了防止当前线程的其他异步任务被堵塞,Tokio 会把它们移交到其他异步线程中

此函数不能在 Tokio 使用单线程运行时时使用,因为在这种情况下没有其他异步线程,无法移交

tokio::task

tokio::task::spawn 与 tokio::task::JoinHandle

通常情况下,如果只使用默认的 Tokio 运行时,tokio::task::spawntokio::spawn的功能基本相同,tokio::task::spawn 可以选择某一 Tokio 运行时执行异步任务。

它接受一个 返回Future的异步函数 作为参数,并返回一个 JoinHandle

use tokio::task;async fn async_task() {// 异步任务的逻辑println!("Running async task");
}#[tokio::main]
async fn main() {let handle = tokio::task::spawn(async_task());// 等待异步任务完成// JoinHandlehandle.await.unwrap();
}

tokio::task::spawn_blocking

用于将一个阻塞的操作转换为异步任务,并在 Tokio 的线程池中执行。这对于需要执行阻塞操作(如CPU密集型计算)的场景非常有用,以避免阻塞整个Tokio运行时。

use tokio::task;fn blocking_task() -> u32 {// 阻塞操作的逻辑42
}#[tokio::main]
async fn main() {let handle = tokio::task::spawn_blocking(blocking_task);let result = handle.await.unwrap();println!("Blocking task result: {}", result);
}

异步运行实例

use std::{thread::sleep, time::Duration};#[tokio::main]
async fn main() {let h1 = tokio::spawn(async {let _file1_contents = read_file1().await;});let h2 = tokio::spawn(async {let _file2_contents = read_file2().await;});let _ = tokio::join!(h1, h2);
}async fn read_file1() -> String {sleep(Duration::new(4, 0));println!("processing file 1");String::from("file 1")
}async fn read_file2() -> String {sleep(Duration::new(2, 0));println!("processing file 2");String::from("file 2")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/92.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新手做抖音小店,最易爆单的几大类目分享,抓紧收藏!

大家好&#xff0c;我是电商糖果 新手做抖店没有经验&#xff0c;不了解市场&#xff0c;很多人都担心类目选错了&#xff0c;很难起店。 毕竟电商行业有一句话&#xff0c;类目大于一切&#xff0c;选择大于努力。 类目没有选对&#xff0c;再折腾也没用。 糖果做抖音小店…

eclipse配置SVN和Maven插件

3、 安装SVN插件 使用如下方法安装 Help–Install New Software 注意&#xff1a;目前只能安装1.8.x这个版本的SVN&#xff0c;如果使用高版本的SVN&#xff0c;在安装SVN和maven整合插件的时候就会报错&#xff0c;这应该是插件的bug。 点击Add name: subclipse location…

五款3dmax常用插件推荐(含云渲染工具)

在三维建模和动画设计领域&#xff0c;3ds Max软件因其强大功能和灵活性而广受欢迎。为了进一步提升工作效率和创作质量&#xff0c;有许多插件可供选择。本文推荐五款常用3ds Max插件&#xff0c;帮助你更好实现复杂的模型和动效创作。 五款3dmax常用插件推荐 1、Kitchen Cab…

Linux gcc 6

本章开始学习工具 什么是工具&#xff1f; 本质也是指令 yum 命令 小火车 sudo yum install sl&#xff08;安装sl&#xff09; sudo yum install -y sl //直接yes就不提示了 yum list //将yum源上的软件都穷举出来 yum search sl //结果不友好&#xff0c;不推荐 yum lis…

活动报名 | 如何进行全增量一体的异构数据库实时同步

伴随着新技术的不断涌现&#xff0c;市场竞争也在不断开辟新的角斗场——新的业务需求&#xff0c;新的应用设想都在这里迸发。 面对如此日新月异的竞争环境&#xff0c;企业的当务之急&#xff0c;是为新应用扎根准备好随时可取、准确一致的高质量数据土壤。在这样的背景下&a…

(二十八)Flask之wtforms库【上手使用篇】

目录&#xff1a; 每篇前言&#xff1a;用户登录验证&#xff1a;用户注册验证&#xff1a;使用示例&#xff1a; 抽象解读使用wtforms编写的类&#xff1a;简单谈一嘴&#xff1a;开始抽象&#xff1a; 每篇前言&#xff1a; &#x1f3c6;&#x1f3c6;作者介绍&#xff1a;【…

Docker 磁盘占用过多问题处理过程记录

一、问题描述 突然发现服务器磁盘使用超过95%了&#xff08;截图时2.1 和 2.2 已经执行过了&#xff09; 二、问题分析与解决 2.1&#xff0c;docker 无用镜像占用磁盘 # 使用 docker images 查看服务的镜像 docker images# 可以手动删除一些很大不用的 docker rmi ***## 也…

一秒内传输50万对纠缠光子?!纽约市量子网络刷新纪录

量子网络技术行业的领军企业Qunnect宣布&#xff0c;在纽约市的GothamQ网络上&#xff0c;其偏振量子比特的传输性能刷新了纪录。Qunnect利用现有的商用光缆实现了每秒传输50万对高保真度纠缠光子的速率&#xff0c;且该网络的正常运行时间超过了99%。 纽约34公里长的GothamQ量…

服务器数据恢复—RAID5故障导致SAP+oracle数据丢失的数据恢复案例

服务器存储数据恢复环境&#xff1a; 某品牌服务器存储中有一组由6块SAS硬盘组建的RAID5阵列&#xff0c;其中有1块硬盘作为热备盘使用。上层划分若干lun&#xff0c;存放Oracle数据库数据。 服务器存储故障&分析&#xff1a; 该RAID5阵列中一块硬盘出现故障离线&#xff0…

开启Three.js之旅(会持续完善)

文章目录 Three.js必备构建项目场景Scene相机CameraPerspectiveCamera 渲染器WebGLRendererCSS3DRenderer 灯光LightAmbientLightDirectionalLight 平行光PointLight 加载器CacheFileLoaderLoaderGLTFLoaderRGBELoaderTextureLoader 材质MetarialMeshBasicMaterialMeshLambertM…

k8s集群资源编排清单文件解读

1、YAML 文件概述 k8s集群中对资源管理和资源对象编排部署都可以通过声明样式&#xff08;YAML&#xff09;文件来解决&#xff0c;也就是可以把需要对资源对象操作编辑到 YAML 格式文件中&#xff0c;我们把这种文件叫做资源清单文件&#xff0c;通过 kubectl 命令直接使用资源…

原子的内部结构

原子非常神奇&#xff0c;花时间思考它是非常有价值的。尽管传统的太阳系示意图存在致命的缺点&#xff0c;但我们还是可以局部应用于原子。 首先&#xff0c;原子与太阳系具有相似性一原子的中心质量大&#xff0c;外部质量小。我们用最简单的氢原子做分析&#xff0c;氢原子…

辽宁梵宁教育课程概览:打造职场新人的设计技能利器

随着数字化时代的快速发展&#xff0c;设计技能在职场中的重要性日益凸显。对于职场新人而言&#xff0c;掌握优秀的设计能力不仅有助于个人职业发展&#xff0c;更能为企业创造更多价值。辽宁梵宁教育&#xff0c;作为一所致力于培养职场新人设计技能的培训机构&#xff0c;以…

决策树分类器(保姆级教学) 定义+特性+原理及公式+鸢尾花分类经典问题示例(完整Python代码带详细注释、保姆级分部代码解释及结果说明、决策树可视化及解释)

文章目录 引言定义特性基本原理和公式理解信息增益&#xff08;ID3算法&#xff09;熵的定义条件熵信息增益的计算 基尼不纯度&#xff08;CART算法&#xff09;基尼不纯度的定义基尼不纯度的计算例子 实现步骤解决鸢尾花分类问题&#xff08;机器学习入门中的经典案例Python代…

makefile第七讲

更多精彩内容在公众号。 当make执行完后&#xff0c;我们期望将最终的可执行文件安装到系统目录下&#xff0c;这样在不同的目录下都可以执行编译的可执行文件&#xff0c;相当于做成了个命令。这个就需要用到make install。 源文件如下&#xff1a;用于判断系统是小端还是大端…

性能分析与调优

性能分析方法 自底向上&#xff1a;通过监控硬件及操作系统性能指标&#xff08;cpu、内存、磁盘、网络等硬件资源的性能指标&#xff09;来分析性能问题&#xff08;配置、程序问题&#xff09; 先检查&#xff0c;再下药 自顶向下&#xff1a;通过生成负载来观察被测试的系…

【ROS2笔记七】ROS中的参数通信

7.ROS中的参数通信 文章目录 7.ROS中的参数通信7.1使用CLI工具调整参数7.2参数通信之rclcpp实现7.2.1创建节点7.2.2rclcpp参数API Reference ROS2中的参数是由键值对组成的&#xff0c;参数可以实现动态调整。 7.1使用CLI工具调整参数 启动turtlesim功能包的环境 ros2 run …

如何在本地创建一个贪吃蛇小游戏node.js服务并实现无公网IP远程游玩

文章目录 前言1.安装Node.js环境2.创建node.js服务3. 访问node.js 服务4.内网穿透4.1 安装配置cpolar内网穿透4.2 创建隧道映射本地端口 5.固定公网地址 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽…

OSPF笔记+大实验

OSPF综合大实验---实验报告 配置IP地址 R1&#xff1a; [R1]int g0/0/0 [R1-GigabitEthernet0/0/0]ip add 172.16.33.1 24 [R1-GigabitEthernet0/0/0]int l0 [R1-LoopBack0]ip add 172.168.34.1 24 R2: [R2]int g0/0/0 [R2-GigabitEthernet0/0/0]ip add 172.16.33.2 24…

Jmeter接口测试:使用教程(下)

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号&#xff1a;互联网杂货铺&#xff0c;回复1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 上一篇我给大家讲了jmeter的基本介绍跟参数化和jmeter脚…