05-异步编程
Rust异步编程通过async/await提供零成本异步抽象,Tokio是最流行的异步运行时。
异步基础
async/await
// 异步函数
async fn hello() {
println!("hello");
}
// 返回Future
fn hello() -> impl Future<Output = ()> {
async {
println!("hello");
}
}
// 调用异步函数
#[tokio::main]
async fn main() {
hello().await; // await等待Future完成
}
Future trait
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
// Future定义
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T), // 完成
Pending, // 未完成
}
Tokio运行时
安装配置
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
基本使用
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Hello");
sleep(Duration::from_secs(1)).await;
println!("World");
}
// 多线程运行时(默认)
#[tokio::main]
// 单线程运行时
#[tokio::main(flavor = "current_thread")]
// 手动创建运行时
use tokio::runtime::Runtime;
fn main() {
let runtime = Runtime::new().unwrap();
runtime.block_on(async {
println!("Hello from async");
});
}
异步任务
创建任务
use tokio::task;
#[tokio::main]
async fn main() {
// 启动异步任务
let handle = task::spawn(async {
println!("async task");
42
});
// 等待任务完成
let result = handle.await.unwrap();
println!("Result: {}", result);
}
// 多个任务并发
#[tokio::main]
async fn main() {
let handle1 = task::spawn(async { 1 });
let handle2 = task::spawn(async { 2 });
let result1 = handle1.await.unwrap();
let result2 = handle2.await.unwrap();
println!("{} + {} = {}", result1, result2, result1 + result2);
}
join和select
use tokio::join;
async fn task1() -> u32 {
1
}
async fn task2() -> u32 {
2
}
#[tokio::main]
async fn main() {
// join:等待所有任务完成
let (r1, r2) = join!(task1(), task2());
println!("Results: {} {}", r1, r2);
}
// select:等待第一个完成
use tokio::select;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let task1 = async {
sleep(Duration::from_secs(1)).await;
"task1"
};
let task2 = async {
sleep(Duration::from_millis(500)).await;
"task2"
};
select! {
result = task1 => println!("{} finished first", result),
result = task2 => println!("{} finished first", result),
}
}
异步I/O
文件I/O
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 读文件
let mut file = File::open("foo.txt").await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
println!("{}", contents);
// 写文件
let mut file = File::create("bar.txt").await?;
file.write_all(b"hello world").await?;
Ok(())
}
// 一次性读写
use tokio::fs;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let contents = fs::read_to_string("foo.txt").await?;
fs::write("bar.txt", b"hello").await?;
Ok(())
}
网络I/O
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// TCP服务器
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
let (socket, _) = listener.accept().await?;
tokio::spawn(async move {
handle_client(socket).await;
});
}
}
async fn handle_client(mut socket: TcpStream) {
let mut buf = [0; 1024];
loop {
let n = match socket.read(&mut buf).await {
Ok(n) if n == 0 => return, // 连接关闭
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket: {:?}", e);
return;
}
};
if let Err(e) = socket.write_all(&buf[0..n]).await {
eprintln!("failed to write to socket: {:?}", e);
return;
}
}
}
// TCP客户端
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
stream.write_all(b"hello").await?;
let mut buf = [0; 1024];
let n = stream.read(&mut buf).await?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
Ok(())
}
异步通道
mpsc
多生产者单消费者。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
for i in 0..10 {
if tx.send(i).await.is_err() {
println!("receiver dropped");
return;
}
}
});
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
}
oneshot
单次通信。
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
if let Err(_) = tx.send(3) {
println!("receiver dropped");
}
});
match rx.await {
Ok(v) => println!("got = {:?}", v),
Err(_) => println!("sender dropped"),
}
}
broadcast
广播。
use tokio::sync::broadcast;
#[tokio::main]
async fn main() {
let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
assert_eq!(rx1.recv().await.unwrap(), 10);
assert_eq!(rx1.recv().await.unwrap(), 20);
});
tokio::spawn(async move {
assert_eq!(rx2.recv().await.unwrap(), 10);
assert_eq!(rx2.recv().await.unwrap(), 20);
});
tx.send(10).unwrap();
tx.send(20).unwrap();
}
异步同步原语
Mutex
use tokio::sync::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let data = Arc::clone(&data);
handles.push(tokio::spawn(async move {
let mut lock = data.lock().await;
*lock += 1;
}));
}
for handle in handles {
handle.await.unwrap();
}
println!("Result: {}", *data.lock().await); // 10
}
RwLock
use tokio::sync::RwLock;
#[tokio::main]
async fn main() {
let lock = RwLock::new(5);
// 读锁
{
let r1 = lock.read().await;
let r2 = lock.read().await;
println!("{} {}", *r1, *r2);
}
// 写锁
{
let mut w = lock.write().await;
*w += 1;
}
}
Semaphore
use tokio::sync::Semaphore;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
handles.push(tokio::spawn(async move {
println!("Task {} started", i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
drop(permit); // 释放许可
println!("Task {} finished", i);
}));
}
for handle in handles {
handle.await.unwrap();
}
}
Stream
异步迭代器。
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let mut stream = stream::iter(vec![1, 2, 3]);
while let Some(v) = stream.next().await {
println!("got = {}", v);
}
}
// 从channel创建
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(10);
let mut stream = ReceiverStream::new(rx);
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});
while let Some(v) = stream.next().await {
println!("got = {}", v);
}
}
// map、filter等操作
use tokio_stream::{self as stream, StreamExt};
#[tokio::main]
async fn main() {
let stream = stream::iter(1..=10)
.filter(|x| x % 2 == 0)
.map(|x| x * 2);
tokio::pin!(stream);
while let Some(v) = stream.next().await {
println!("got = {}", v); // 4, 8, 12, 16, 20
}
}
HTTP服务器(Axum)
use axum::{
routing::{get, post},
http::StatusCode,
Json, Router,
};
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/", get(root))
.route("/users", post(create_user));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn root() -> &'static str {
"Hello, World!"
}
async fn create_user(
Json(payload): Json<CreateUser>,
) -> (StatusCode, Json<User>) {
let user = User {
id: 1337,
username: payload.username,
};
(StatusCode::CREATED, Json(user))
}
#[derive(Deserialize)]
struct CreateUser {
username: String,
}
#[derive(Serialize)]
struct User {
id: u64,
username: String,
}
异步与同步对比
特性 |
同步 |
异步 |
|---|---|---|
阻塞 |
阻塞线程 |
不阻塞,切换任务 |
并发 |
线程并发 |
任务并发 |
开销 |
线程栈(MB) |
任务(字节级) |
适用场景 |
CPU密集 |
I/O密集 |
复杂度 |
简单 |
较复杂 |
性能优化
减少分配
// ❌ 每次分配
async fn process(data: Vec<u8>) {
// ...
}
// ✓ 使用引用
async fn process(data: &[u8]) {
// ...
}
批量操作
// ❌ 逐个发送
for item in items {
tx.send(item).await?;
}
// ✓ 批量发送
tx.send_batch(items).await?;
使用缓冲
// ❌ 无缓冲channel
let (tx, rx) = mpsc::channel(1);
// ✓ 有缓冲
let (tx, rx) = mpsc::channel(1024);
常见陷阱
阻塞运行时
// ❌ 错误:阻塞异步运行时
#[tokio::main]
async fn main() {
tokio::spawn(async {
std::thread::sleep(Duration::from_secs(10)); // 阻塞整个线程!
});
}
// ✓ 正确:使用异步sleep
tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(10)).await;
});
// ✓ CPU密集任务用spawn_blocking
tokio::spawn(async {
tokio::task::spawn_blocking(|| {
// 耗时计算
}).await.unwrap();
});
忘记await
// ❌ 错误:忘记await
async fn process() {
do_something(); // Future未执行!
}
// ✓ 正确
async fn process() {
do_something().await;
}
无限Future
// ❌ 错误:无限递归
async fn infinite() {
infinite().await; // 栈溢出
}
// ✓ 使用循环
async fn process_loop() {
loop {
process_item().await;
}
}
最佳实践
选择运行时
// ✓ I/O密集:Tokio
#[tokio::main]
async fn main() {
// ...
}
// ✓ 嵌入式:async-std
use async_std::task;
fn main() {
task::block_on(async {
// ...
});
}
// ✓ 浏览器:wasm-bindgen-futures
错误处理
// ✓ 使用Result
async fn fetch() -> Result<String, reqwest::Error> {
let resp = reqwest::get("https://example.com").await?;
let body = resp.text().await?;
Ok(body)
}
// ✓ 组合Future
use futures::future;
async fn fetch_all() -> Result<Vec<String>, reqwest::Error> {
let urls = vec!["url1", "url2", "url3"];
let futures: Vec<_> = urls.iter().map(|url| fetch(url)).collect();
future::try_join_all(futures).await
}
超时和取消
use tokio::time::{timeout, Duration};
// 超时
async fn with_timeout() -> Result<(), Box<dyn std::error::Error>> {
let result = timeout(Duration::from_secs(5), slow_operation()).await?;
Ok(result)
}
// 取消(Drop Future)
let handle = tokio::spawn(async {
// ...
});
handle.abort(); // 取消任务
资源清理
// ✓ 使用Drop
struct Connection {
// ...
}
impl Drop for Connection {
fn drop(&mut self) {
// 清理资源
}
}
// ✓ 异步清理
async fn cleanup(conn: Connection) {
conn.close().await;
}
Pin和Unpin
异步函数内部状态不能移动。
use std::pin::Pin;
// 自动实现Unpin(可移动)
struct MyStruct;
// 手动标记!Unpin(不可移动)
use std::marker::PhantomPinned;
struct NotUnpin {
data: String,
_pin: PhantomPinned,
}
// Pin API
fn needs_pin(data: Pin<&mut MyStruct>) {
// ...
}
核心: 异步编程通过零成本抽象实现高效I/O。Tokio是主流异步运行时,提供完整的异步生态系统。