Rust 在v1.39版本以后就引入了async关键字,用于支持异步编程。
async fn foo() {}
Rust中,async函数或块会被视作一个 Future 对象,async 关键字只是用来定义这个 Future 对象,定义好的这片异步代码并不会自动执行,而是需要和 async 配对的 .await 去驱动它才会执行。
用 async 定义异步代码,用 .await 驱动执行。
但是 .await 又只能在 async 块中调用。
Rust 明确规定了,main 函数前不能加 async 修饰。也就是说,只能写成这种形式。
fn main() {let a = async {};a.await;
}
但是前面又说过了,.await 只能写在 async 代码块或函数里。这里就需要引入异步运行时了。
异步运行时
异步运行时是一个库,这个库包含一个响应器(reactor)和一个或多个执行器(executor)。它需要处理下面的事:
异步代码的执行;
任务的暂停;
状态的缓存;
外部事件的监听注册;
外部信号来了后,唤醒对应的任务,恢复任务状态;
多个任务间的调度。
tokio第三方实现库
目前 Rust 标准库中还没有内置一个官方的异步 Runtime,tokio 在第三方异步 Runtime 的激烈竞争中胜出,可以说它现在已经成为了 Rust 生态中异步运行时事实上的标准。
1.引入依赖
在Cargo.toml中引入依赖
tokio = { version = "1", features = ["full"] }
2. main 函数
tokio提供的一个属性宏标注在main函数上面,这样main函数前就可以加async修饰了。解决了上面提到的问题。
#[tokio::main] // 这个是tokio库里面提供的一个属性宏标注
async fn main() { // 注意 main 函数前面有 async println!("Hello world");
}
tokio 还可以基于当前系统线程创建单线程的 Runtime,你可以看一下示例。
#[tokio::main(flavor = "current_thread")] // 属性标注里面配置参数
async fn main() {println!("Hello world");
}
3. tokio组件
tokio 发展到现在,已经是一个功能丰富、机制完善的 Runtime 框架了。它针对异步场景把 Rust 标准库里对应的类型和设施都重新实现了一遍。具体包含 6 个部分。
- Runtime 设施组件:你可以自由地配置创建基于系统单线程的 Runtime 和多线程的 Runtime。
- 轻量级任务 task:你可以把它理解成类似 Go 语言中的 Goroutine 这种轻量级线程,而不是操作系统层面的线程。
- 异步输入输出(I/O):网络模块 net、文件操作模块 fs、signal 模块、process 模块等。
- 时间模块:定时器 Interval 等。
- 异步场景下的同步原语:channel、Mutex 锁等等。
- 在异步环境下执行计算密集型任务的方案spawn_blocking等等。
4. tokio的一些知识
1) tokio 底层机制
tokio reactor:用来接收从操作系统的异步框架中传回的消息事件,然后通知tokio waker把对应的任务唤醒。
tokio waker:唤醒对应任务。
tokio executor: 执行对应唤醒任务。每个任务会被抽象成一个Future来独立处理,而每个Future在Rust中会被处理成一个结构体,用状态机的方式来管理。Tokio 中还实现了对这些任务的安排调度机制。
2) 轻量级线程
tokio 提供了一种合作式(而非抢占式)的任务模型:每个任务 task 都可以看作是一个轻量级的线程,与操作系统线程相对。操作系统默认的线程机制需要消耗比较多的资源,一台普通服务器上能启动的总线程数一般最多也就几千个。而 tokio 的轻量级线程可以在一台普通服务器上创建上百万个。
3))M:N模型
tokio 的这个模型是一种 M:N 模型,M 表示轻量级线程的数量,N 表示操作系统线程的数量。N最常用的默认配置是一个机器上有多少CPU逻辑处理核,N就等于多少。
4) 合作式
tokio 的轻量级线程之间的关系是一种合作式的。合作式的意思就是同一个 CPU 核上的任务大家是配合着执行(不同 CPU 核上的任务是并行执行的)。我们可以设想一个简单的场景,A 和 B 两个任务被分配到了同一个 CPU 核上,A 先执行,那么,只有在 A 异步代码中碰到 .await 而且不能立即得到返回值的时候,才会触发挂起,进而切换到任务 B 执行。也就是说,在一个 task 没有遇到 .await 之前,它是不会主动交出这个 CPU 核的,其他 task 也不能主动来抢占这个 CPU 核。
5)创建tokio task
创建 tokio task,这需要使用 task::spawn() 函数。
use tokio::task;#[tokio::main]
async fn main() {task::spawn(async {// 在这里执行异步任务});
}
在这个示例里,main 函数里面创建了一个新的 task,用来执行具体的任务。我们需要知道,tokio 管理下的 async fn main() {} 本身就是一个 task,相当于在 main task 中,创建了一个新的 task 来执行。这里,main task 就是父 task,新创建的这个 task 是子 task。
在 tokio 中,子 task 的生存期有可能超过父 task 的生存期,也就是父 task 执行结束了,但子 task 还在执行。如果在父 task 里要等待子 task 执行完,再结束自己,保险的做法是用 JoinHandler。
注意:如果 main 函数所在的 task 先结束了,会导致整个程序进程退出,有可能会强制杀掉那些新创建的子 task。
JoinHandler 是什么意思呢?这个新概念跟 task 的管理相关。我们在 main task 中里创建一个新 task 后,task::spawn() 函数实际有一个返回值,它返回一个 handler,这个 handler 可以让我们在 main task 里管理新创建的 task。这个 handler 也可以用来指代这个新的 task,相当于给这个 task 取了一个名字。比如示例里,我们就把这个新的任务命名为 task_a,它的类型是 JoinHandler。在用 spawn() 创建 task_a 后,这个新任务就立即执行。
有了 JoinHandler,我们可以方便地创建一批新任务,并等待它们的返回值。
use tokio::task;async fn my_background_op(id: i32) -> String {let s = format!("Starting background task {}.", id);println!("{}", s);s
}#[tokio::main]
async fn main() {let ops = vec![1, 2, 3];let mut tasks = Vec::with_capacity(ops.len());for op in ops.clone() {// 任务创建后,立即开始运行,我们用一个Vec来持有各个任务的handlertasks.push(tokio::spawn(my_background_op(op)));}let mut outputs = Vec::with_capacity(tasks.len());for task in tasks {outputs.push(task.await.unwrap());}println!("{:?}", outputs);
}
// 输出
Starting background task 1.
Starting background task 2.
Starting background task 3.
上面示例里,我们用 tasks 这个动态数组持有 3 个异步任务的 handler,它们是并发执行的。然后对 tasks 进行迭代,等待每个 task 执行完成,并且搜集任务的结果放到 outputs 动态数组里。最后打印出来。