异步编程是一种并发编程模型,通过在任务执行期间不阻塞线程的方式,提高系统的并发能力和响应性。相比于传统的同步编程,异步编程可以更好地处理 I/O 密集型任务和并发请求,提高系统的吞吐量和性能。
异步编程具有以下优势:
异步编程广泛应用于以下场景:
Rust 作为一门现代的系统级编程语言,旨在提供高效、安全和可靠的异步编程能力。Rust 异步编程的目标是实现高性能、无安全漏洞的异步应用程序,同时提供简洁的语法和丰富的异步库。
最值得一读的是 Rust 官方的Rust 异步编程书[1]
中文版: Rust 异步编程指南[2]
由于并发编程在现代社会非常重要,因此每个主流语言都对自己的并发模型进行过权衡取舍和精心设计,Rust 语言也不例外。下面的列表可以帮助大家理解不同并发模型的取舍:
Go
语言的协程设计就非常优秀,这也是 Go
语言能够迅速火遍全球的杀手锏之一。协程跟线程类似,无需改变编程模型,同时,它也跟 async
类似,可以支持大量的任务并发运行。但协程抽象层次过高,导致用户无法接触到底层的细节,这对于系统编程语言和自定义异步运行时是难以接受的actor
, 单元之间通过消息传递的方式进行通信和数据传递,跟分布式系统的设计理念非常相像。由于 actor
模型跟现实很贴近,因此它相对来说更容易实现,但是一旦遇到流控制、失败重试等场景时,就会变得不太好用async
模型的问题就是内部实现机制过于复杂,对于用户来说,理解和使用起来也没有线程和协程简单,好在前者的复杂性开发者们已经帮我们封装好,而理解和使用起来不够简单,正是本章试图解决的问题。总之,Rust 经过权衡取舍后,最终选择了同时提供多线程编程和 async 编程:
I/O
时,选择它就对了异步运行时是 Rust 中支持异步编程的运行时环境,负责管理异步任务的执行和调度。它提供了任务队列、线程池和事件循环等基础设施,支持异步任务的并发执行和事件驱动的编程模型。Rust 没有内置异步调用所必须的运行时,主要的 Rust 异步运行时包括:
还有 futuresust 异步编程的基础抽象库。大多数运行时都依赖 futures 提供异步原语。
今日头条是国内使用 Rust 语言的知名公司之一,他们也开源了一个他们的运行时bytedance/monoio[4]
Rust 异步编程模型包含了一些关键的组件和概念,包括:
异步函数和异步块:使用 async 关键字定义的异步函数和异步代码块。
// `foo()`返回一个`Future<Output = u8>`,
// 当调用`foo().await`时,该`Future`将被运行,当调用结束后我们将获取到一个`u8`值
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// 下面的`async`语句块返回`Future<Output = u8>`
async {
let x: u8 = foo().await;
x + 5
}
}
async 语句块和 async fn 最大的区别就是前者无法显式的声明返回值,在大多数时候这都不是问题,但是当配合 ? 一起使用时,问题就有所不同:
async fn foo() -> Result<u8, String> {
Ok(1)
}
async fn bar() -> Result<u8, String> {
Ok(1)
}
pub fn main() {
let fut = async {
foo().await?;
bar().await?;
Ok(())
};
}
以上代码编译后会报错:
error[E0282]: type annotations needed
--> src/main.rs:14:9
|
11 | let fut = async {
| --- consider giving `fut` a type
...
14 | Ok(1)
| ^^ cannot infer type for type parameter `E` declared on the enum `Result`
原因在于编译器无法推断出 Result<T, E>
中的 E
的类型, 而且编译器的提示consider giving fut a type
你也别傻乎乎的相信,然后尝试半天,最后无奈放弃:目前还没有办法为 async
语句块指定返回类型。
既然编译器无法推断出类型,那咱就给它更多提示,可以使用 ::< ... >
的方式来增加类型注释:
let fut = async {
foo().await?;
bar().await?;
Ok::<(), String>(()) // 在这一行进行显式的类型注释
};
await 关键字:在异步函数内部使用 await 关键字等待异步操作完成。
async/.await
是 Rust 语法的一部分,它在遇到阻塞操作时( 例如 IO )会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
async
是懒惰的,直到被执行器 poll
或者 .await
后才会开始运行,其中后者是最常用的运行 Future
的方法。 当 .await
被调用时,它会尝试运行 Future
直到完成,但是若该 Future
进入阻塞,那就会让出当前线程的控制权。当 Future
后面准备再一次被运行时(例如从 socket
中读取到了数据),执行器会得到通知,并再次运行该 Future
,如此循环,直到完成。
Future Trait:表示异步任务的 Future Trait,提供异步任务的执行和状态管理。
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
async 和 await 是 Rust 中用于异步编程的关键字。async 用于定义异步函数,表示函数体中包含异步代码。await 用于等待异步操作完成,并返回异步操作的结果。
异步函数使用 async 关键字定义,并返回实现了 Future Trait 的类型。异步函数可以在其他异步函数中使用 await 关键字等待异步操作完成。调用异步函数时,会返回一个实现了 Future Trait 的对象,可以通过调用 .await 方法等待结果。
异步块是一种在异步函数内部创建的临时异步上下文,可以使用 async 关键字创建。异步闭包是一种将异步代码封装在闭包中的方式,可以使用 async 关键字创建。异步块和异步闭包允许在同步上下文中使用 await 关键字等待异步操作。
异步函数的返回类型通常是实现了 Future Trait 的类型。Future Trait 表示一个异步任务,提供异步任务的执行和状态管理。Rust 标准库和第三方库中提供了许多实现了 Future Trait 的类型,用于表示各种异步操作。
举一个例子,下面这个例子是一个传统的并发下载网页的例子:
fn get_two_sites() {
// 创建两个新线程执行任务
let thread_one = thread::spawn(|| download("https://course.rs"));
let thread_two = thread::spawn(|| download("https://fancy.rs"));
// 等待两个线程的完成
thread_one.join().expect("thread one panicked");
thread_two.join().expect("thread two panicked");
}
如果是在一个小项目中简单的去下载文件,这么写没有任何问题,但是一旦下载文件的并发请求多起来,那一个下载任务占用一个线程的模式就太重了,会很容易成为程序的瓶颈。好在,我们可以使用async
的方式来解决:
async fn get_two_sites_async() {
// 创建两个不同的`future`,你可以把`future`理解为未来某个时刻会被执行的计划任务
// 当两个`future`被同时执行后,它们将并发的去下载目标页面
let future_one = download_async("https://www.foo.com");
let future_two = download_async("https://www.bar.com");
// 同时运行两个`future`,直至完成
join!(future_one, future_two);
}
注意上面的代码必须在一个异步运行时在运行,以便异步运行时使用一定数量的线程来调度这些代码的运行。
接下来我们就学习各种异步运行时库和异步运行时的方法。
Tokio 是 Rust 异步编程最重要的运行时库,提供了异步 IO、异步任务调度、同步原语等功能。
Tokio 的主要组件包括:
可以看到 Tokio 库包含了很多的功能,包括异步网络编程、并发原语等,之后我们会花整个一章专门介绍它,这一节我们值介绍它的异步运行时的使用。
你可以如下定义 main 函数,它自动支持运行时的启动:
#[tokio::main]
async fn main() {
// 在运行时中异步执行任务
tokio::spawn(async {
// do work
});
// 等待任务完成
other_task.await;
}
这个例子 main 函数前必须加 async 关键字,并且加#[tokio::main]
属性,那么这个 main 就会在异步运行时运行。
你也可以使用显示创建运行时的方法:
pub fn tokio_async() {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("Hello from tokio!");
rt.spawn(async {
println!("Hello from a tokio task!");
println!("in spawn")
})
.await
.unwrap();
});
rt.spawn_blocking(|| println!("in spawn_blocking"));
}
首先它创建了一个 Tokio 运行时rt
。block_on
方法在运行时上下文中执行一个异步任务,这里我们简单地打印了一句话。
然后使用rt.spawn
在运行时中异步执行另一个任务。这个任务也打印了几句话。spawn
返回一个JoinHandle
,所以这里调用.await
来等待任务结束。
最后,使用spawn_blocking
在运行时中执行一个普通的阻塞任务。这个任务会在线程池中运行,而不会阻塞运行时。
总结一下这个例子展示的要点:
block_on
执行异步任务spawn
在运行时中异步执行任务spawn_blocking
在线程池中执行阻塞任务await
JoinHandle 来等待异步任务结束Tokio 运行时提供了执行和调度异步任务所需的全部功能。通过正确地组合block_on
、spawn
和spawn_blocking
,可以发挥 Tokio 的强大能力,实现各种异步场景。
futures 库 futures 是 Rust 异步编程的基础抽象库,为编写异步代码提供了核心的 trait 和类型。
主要提供了以下功能:
.await
获取其结果。.await
迭代获取其元素。pub fn futures_async() {
let pool = ThreadPool::new().expect("Failed to build pool");
let (tx, rx) = mpsc::unbounded::<i32>();
let fut_values = async {
let fut_tx_result = async move {
(0..100).for_each(|v| {
tx.unbounded_send(v).expect("Failed to send");
})
};
pool.spawn_ok(fut_tx_result);
let fut_values = rx.map(|v| v * 2).collect();
fut_values.await
};
let values: Vec<i32> = executor::block_on(fut_values);
println!("Values={:?}", values);
}
这个例子展示了如何使用 futures 和线程池进行异步编程:
pool
tx
和rx
用来在任务间传递数据fut_values
,里面首先用spawn_ok
在线程池中异步执行一个任务,这个任务会通过通道发送 0-99 的数字。rx
用map
创建一个 Stream,它会将收到的数字乘 2。collect
收集 Stream 的结果到一个 Vec。block_on
在主线程中执行这个异步任务并获取结果。这段代码展示了 futures 和通道的组合使用 - 通过线程池并发地处理数据流。
block_on
运行 future 而不需要显式运行时也很方便。
futures 通过异步处理数据流,可以实现非阻塞并发程序,这在诸如网络服务端编程中很有用。与线程相比,futures 的抽象通常更轻量和高效。
这个库是 futures 的一个子集,它的编译速度快了一个数量级,修复了 futures API 中的一些小问题,补充了一些明显的空白,并移除了绝大部分不安全的代码。
简而言之,这个库的目标是比 futures 更可易用,同时仍然与其完全兼容。
让我们从创建一个简单的 Future 开始。在 Rust 中,Future 是一种表示异步计算的 trait。以下是一个示例:
use futures_lite::future;
async fn hello_async() {
println!("Hello, async world!");
}
fn main() {
future::block_on(hello_async());
}
在这个例子中,我们使用 futures-lite
中的 future::block_on
函数来运行异步函数 hello_async
。
async-std
是一个为 Rust 提供异步标准库的库。它扩展了标准库,使得在异步上下文中进行文件 I/O、网络操作和任务管理等操作更加便捷。
它提供了你所习惯的所有接口,但以异步的形式,并且准备好用于 Rust 的async
/await
语法。
特性
std::future
和async/await
构建,编译速度极快。use async_std::task;
async fn hello_async() {
println!("Hello, async world!");
}
fn main() {
task::block_on(hello_async());
}
这个例子首先导入 async_std::task。
然后定义一个异步函数 hello_async,其中只是简单打印一句话。
在 main 函数中,使用 task::block_on 来执行这个异步函数。block_on 会阻塞当前线程,直到传入的 future 运行完成。
这样的效果就是,尽管 hello_async 函数是异步的,但我们可以用同步的方式调用它,不需要手动处理 future。
async/await 语法隐藏了 future 的细节,给异步编程带来了极大的便利。借助 async_std,我们可以非常轻松地使用 async/await 来编写异步 Rust 代码。
smol
是一个超轻量级的异步运行时(async runtime)库,专为简化异步 Rust 代码的编写而设计。它提供了一个简洁而高效的方式来管理异步任务。
特性
轻量级: smol
的设计目标之一是轻量级,以便快速启动和低资源开销。
简洁 API: 提供简洁的 API,使得异步任务的创建、组合和运行变得直观和简单。
零配置: 无需复杂的配置,可以直接在现有的 Rust 项目中使用。
异步 I/O 操作: 支持异步文件 I/O、网络操作等,使得异步编程更加灵活。
下面这个例子演示了使用 smol 异步运行时执行异步代码块的例子:
pub fn smol_async() {
smol::block_on(async { println!("Hello from smol") });
}
在 Rust 中,有两个常见的宏可以用于同时等待多个 future:select 和 join。
select! 宏可以同时等待多个 future,并只处理最先完成的那个 future:
use futures::future::{select, FutureExt};
let future1 = async { /* future 1 */ };
let future2 = async { /* future 2 */ };
let result = select! {
res1 = future1 => { /* handle result of future1 */ },
res2 = future2 => { /* handle result of future2 */ },
};
join! 宏可以同时等待多个 future,并处理所有 future 的结果:
use futures::future::{join, FutureExt};
let future1 = async { /* future 1 */ };
let future2 = async { /* future 2 */ };
let (res1, res2) = join!(future1, future2);
join! 返回一个元组,包含所有 future 的结果。
这两个宏都需要 futures crate,使代码更加简洁。不使用宏的话,需要手动创建一个 Poll
所以 select 和 join 在处理多个 future 时非常方便。select 用于只处理最先完成的,join 可以同时处理所有 future。
try_join!宏也可以用于同时等待多个 future,它与 join!类似,但是有一点不同:
try_join!在任何一个 future 返回错误时,就会提前返回错误,而不会等待其他 future。
例如:
use futures::try_join;
let future1 = async {
Ok::<(), Error>(/*...*/)
};
let future2 = async {
Err(Error::SomethingBad)
};
let result = try_join!(future1, future2);
这里因为 future2 返回了错误,所以 try_join!也会返回这个错误,不会等待 future1 完成。
这不同于 join!,join!会等待所有 future 完成。
所以 try_join!的用途是同时启动多个 future,但是遇到任何一个错误就立即返回,避免不必要的等待。这在需要并发但不能容忍任何失败的场景很有用。
而当需要等待所有 future 无论成功失败,获取所有结果的时候,再使用 join!。
所以 try_join!和 join!都可以组合多个 future,但错误处理策略不同。选择哪个要根据实际需要决定。
zip
函数会 join 两个 future,并等待他们完成。而try_zip
函数会 join 两个函数,但是会等待两个 future 都完成或者其中一个 Err 则返回:
pub fn smol_zip() {
smol::block_on(async {
use smol::future::{try_zip, zip, FutureExt};
let future1 = async { 1 };
let future2 = async { 2 };
let result = zip(future1, future2);
println!("smol_zip: {:?}", result.await);
let future1 = async { Ok::<i32, i32>(1) };
let future2 = async { Err::<i32, i32>(2) };
let result = try_zip(future1, future2).await;
println!("smol_try_zip: {:?}", result);
});
}
Rust 异步编程书: https://rust-lang.github.io/async-book/
[2]Rust 异步编程指南: https://github.com/rustlang-cn/async-book
[3]多线程章节: https://github.com/rustlang-cn/async-book/blob/master/advance/concurrency-with-threads/concurrency-parallelism.md
[4]bytedance/monoio: https://github.com/bytedance/monoio