04-并发与智能指针
Rust通过所有权系统实现无数据竞争的并发,智能指针提供灵活内存管理。
智能指针
智能指针是实现了Deref和Drop trait的数据结构,拥有所指数据。
Box
堆分配,最简单的智能指针。
// 基本使用
let b = Box::new(5);
println!("{}", b);
// 应用场景1:递归类型
enum List {
Cons(i32, Box<List>),
Nil,
}
use List::{Cons, Nil};
let list = Cons(1, Box::new(Cons(2, Box::new(Cons(3, Box::new(Nil))))));
// 应用场景2:大数据转移所有权(避免栈拷贝)
let large_data = Box::new([0; 1000000]);
// 应用场景3:Trait对象
trait Draw {
fn draw(&self);
}
let objects: Vec<Box<dyn Draw>> = vec![/* ... */];
Deref trait
解引用强制转换。
use std::ops::Deref;
struct MyBox<T>(T);
impl<T> MyBox<T> {
fn new(x: T) -> MyBox<T> {
MyBox(x)
}
}
impl<T> Deref for MyBox<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0
}
}
// 使用
let x = 5;
let y = MyBox::new(x);
assert_eq!(5, *y); // *y -> *(y.deref())
// 解引用强制转换
fn hello(name: &str) {
println!("Hello, {}!", name);
}
let m = MyBox::new(String::from("Rust"));
hello(&m); // &MyBox<String> -> &String -> &str
Drop trait
自动清理资源。
struct CustomSmartPointer {
data: String,
}
impl Drop for CustomSmartPointer {
fn drop(&mut self) {
println!("Dropping CustomSmartPointer with data `{}`!", self.data);
}
}
fn main() {
let c = CustomSmartPointer {
data: String::from("my stuff"),
};
let d = CustomSmartPointer {
data: String::from("other stuff"),
};
println!("Created CustomSmartPointers.");
} // d先drop,然后c
// 提前drop
let c = CustomSmartPointer { data: String::from("some data") };
println!("Created.");
drop(c); // 显式drop
println!("Dropped before end.");
Rc
引用计数,单线程多所有者。
use std::rc::Rc;
enum List {
Cons(i32, Rc<List>),
Nil,
}
use List::{Cons, Nil};
let a = Rc::new(Cons(5, Rc::new(Cons(10, Rc::new(Nil)))));
println!("count after creating a = {}", Rc::strong_count(&a)); // 1
let b = Cons(3, Rc::clone(&a)); // 引用计数+1
println!("count after creating b = {}", Rc::strong_count(&a)); // 2
{
let c = Cons(4, Rc::clone(&a));
println!("count after creating c = {}", Rc::strong_count(&a)); // 3
}
println!("count after c goes out of scope = {}", Rc::strong_count(&a)); // 2
// ❌ Rc不能修改
// let value = Rc::get_mut(&mut a).unwrap();
RefCell
内部可变性,运行时借用检查。
use std::cell::RefCell;
let data = RefCell::new(5);
*data.borrow_mut() += 1; // 可变借用
println!("{}", *data.borrow()); // 6
// 违反借用规则会panic(运行时)
let a = RefCell::new(5);
let b = a.borrow_mut();
// let c = a.borrow_mut(); // panic!:已有可变借用
Rc<RefCell>
多所有者+可变性。
use std::cell::RefCell;
use std::rc::Rc;
#[derive(Debug)]
enum List {
Cons(Rc<RefCell<i32>>, Rc<List>),
Nil,
}
use List::{Cons, Nil};
let value = Rc::new(RefCell::new(5));
let a = Rc::new(Cons(Rc::clone(&value), Rc::new(Nil)));
let b = Cons(Rc::new(RefCell::new(3)), Rc::clone(&a));
let c = Cons(Rc::new(RefCell::new(4)), Rc::clone(&a));
*value.borrow_mut() += 10; // 修改共享值
println!("a = {:?}", a);
println!("b = {:?}", b);
println!("c = {:?}", c);
循环引用和Weak
use std::cell::RefCell;
use std::rc::{Rc, Weak};
#[derive(Debug)]
struct Node {
value: i32,
parent: RefCell<Weak<Node>>,
children: RefCell<Vec<Rc<Node>>>,
}
let leaf = Rc::new(Node {
value: 3,
parent: RefCell::new(Weak::new()),
children: RefCell::new(vec![]),
});
println!(
"leaf strong = {}, weak = {}",
Rc::strong_count(&leaf),
Rc::weak_count(&leaf),
);
{
let branch = Rc::new(Node {
value: 5,
parent: RefCell::new(Weak::new()),
children: RefCell::new(vec![Rc::clone(&leaf)]),
});
*leaf.parent.borrow_mut() = Rc::downgrade(&branch);
println!(
"branch strong = {}, weak = {}",
Rc::strong_count(&branch),
Rc::weak_count(&branch),
);
println!(
"leaf strong = {}, weak = {}",
Rc::strong_count(&leaf),
Rc::weak_count(&leaf),
);
}
println!("leaf parent = {:?}", leaf.parent.borrow().upgrade());
并发
线程
use std::thread;
use std::time::Duration;
// 创建线程
let handle = thread::spawn(|| {
for i in 1..10 {
println!("spawned thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("main thread: {}", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap(); // 等待线程结束
// move闭包
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("vector: {:?}", v); // v所有权移入
});
handle.join().unwrap();
消息传递
Channel:多生产者单消费者。
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
// println!("{}", val); // ❌ val已被move
});
let received = rx.recv().unwrap(); // 阻塞接收
println!("Got: {}", received);
// 尝试接收(非阻塞)
match rx.try_recv() {
Ok(val) => println!("Got: {}", val),
Err(_) => println!("No message"),
}
// 多生产者
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
tx.send(String::from("hi from thread")).unwrap();
});
thread::spawn(move || {
tx1.send(String::from("hi from another thread")).unwrap();
});
for received in rx {
println!("Got: {}", received);
}
共享状态
Mutex:互斥锁。
use std::sync::Mutex;
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap(); // 获取锁
*num = 6;
} // 离开作用域,自动释放锁
println!("m = {:?}", m);
// 多线程共享
use std::sync::{Arc, Mutex};
use std::thread;
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 10
Arc
原子引用计数,线程安全的Rc。
use std::sync::Arc;
use std::thread;
let data = Arc::new(vec![1, 2, 3]);
let mut handles = vec![];
for _ in 0..10 {
let data = Arc::clone(&data);
let handle = thread::spawn(move || {
println!("{:?}", data);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
Send和Sync trait
编译器自动实现,标记线程安全。
// Send:可以在线程间转移所有权
// Sync:可以在线程间共享引用
// 几乎所有类型都是Send
// Rc<T>不是Send(单线程)
// RefCell<T>不是Send(单线程)
// 几乎所有类型都是Sync
// RefCell<T>不是Sync
// Rc<T>不是Sync
// 手动实现(不安全,谨慎)
// unsafe impl Send for MyType {}
// unsafe impl Sync for MyType {}
无畏并发
Rust通过类型系统在编译期消除数据竞争。
数据竞争
三个条件同时满足才会发生:
两个或多个指针同时访问同一数据
至少一个指针写数据
没有同步机制
Rust编译器阻止同时满足这三个条件。
// ❌ 编译错误:数据竞争
let mut data = vec![1, 2, 3];
thread::spawn(|| {
data.push(4); // 错误:闭包借用了可变引用
});
data.push(5); // 错误:主线程也在用
// ✅ 正确:用Mutex同步
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
let data_clone = Arc::clone(&data);
thread::spawn(move || {
let mut d = data_clone.lock().unwrap();
d.push(4);
});
let mut d = data.lock().unwrap();
d.push(5);
并发模式
// 模式1:消息传递
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
// 生产者
thread::spawn(move || {
for i in 0..10 {
tx.send(i).unwrap();
}
});
// 消费者
for received in rx {
println!("{}", received);
}
// 模式2:共享状态
use std::sync::{Arc, Mutex};
let data = Arc::new(Mutex::new(0));
// Worker线程
let mut handles = vec![];
for _ in 0..10 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut num = data.lock().unwrap();
*num += 1;
}));
}
// 等待所有线程
for h in handles {
h.join().unwrap();
}
// 模式3:线程池
use std::sync::{mpsc, Arc, Mutex};
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job", id);
job();
});
Worker { id, thread }
}
}
智能指针对比
类型 |
所有权 |
线程安全 |
可变性 |
用途 |
|---|---|---|---|---|
Box |
单一 |
是 |
可变 |
堆分配 |
Rc |
多个 |
否 |
不可变 |
单线程共享 |
Arc |
多个 |
是 |
不可变 |
多线程共享 |
RefCell |
单一 |
否 |
可变 |
内部可变性 |
Mutex |
多个 |
是 |
可变 |
线程安全可变 |
RwLock |
多个 |
是 |
可变 |
读写锁 |
常见模式
单例模式
use std::sync::OnceLock;
static INSTANCE: OnceLock<Config> = OnceLock::new();
struct Config {
value: String,
}
fn get_config() -> &'static Config {
INSTANCE.get_or_init(|| Config {
value: String::from("config value"),
})
}
线程安全的计数器
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
fn increment() {
COUNTER.fetch_add(1, Ordering::SeqCst);
}
fn get_count() -> usize {
COUNTER.load(Ordering::SeqCst);
}
读写锁
use std::sync::RwLock;
let lock = RwLock::new(5);
// 读锁(可多个)
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
println!("{}, {}", r1, r2);
}
// 写锁(独占)
{
let mut w = lock.write().unwrap();
*w += 1;
}
最佳实践
智能指针选择
// ✓ 默认用Box
let data = Box::new(MyStruct::new());
// ✓ 单线程多所有者用Rc
let shared = Rc::new(data);
// ✓ 多线程共享用Arc
let shared = Arc::new(data);
// ✓ 内部可变性用RefCell(单线程)
let cell = RefCell::new(data);
// ✓ 多线程可变用Mutex
let mutex = Arc::new(Mutex::new(data));
并发选择
// ✓ 任务间独立:消息传递
let (tx, rx) = mpsc::channel();
// ✓ 共享状态:Mutex
let counter = Arc::new(Mutex::new(0));
// ✓ 读多写少:RwLock
let data = Arc::new(RwLock::new(vec![]));
// ✓ 简单标志:AtomicBool
use std::sync::atomic::AtomicBool;
let flag = Arc::new(AtomicBool::new(false));
避免死锁
// ❌ 死锁:互相等待
let mutex1 = Arc::new(Mutex::new(0));
let mutex2 = Arc::new(Mutex::new(0));
// 线程1
let m1 = mutex1.clone();
let m2 = mutex2.clone();
thread::spawn(move || {
let _g1 = m1.lock().unwrap();
thread::sleep(Duration::from_millis(10));
let _g2 = m2.lock().unwrap(); // 等待mutex2
});
// 线程2
let m1 = mutex1.clone();
let m2 = mutex2.clone();
thread::spawn(move || {
let _g2 = m2.lock().unwrap();
thread::sleep(Duration::from_millis(10));
let _g1 = m1.lock().unwrap(); // 等待mutex1(死锁)
});
// ✓ 解决:固定加锁顺序
// 总是先锁mutex1,再锁mutex2
错误处理
// ✓ 处理lock失败
match mutex.lock() {
Ok(guard) => { /* use guard */ },
Err(poisoned) => {
// Mutex中毒(持有者panic)
let guard = poisoned.into_inner(); // 恢复
}
}
// ✓ 处理join失败
match handle.join() {
Ok(_) => println!("线程正常结束"),
Err(e) => eprintln!("线程panic: {:?}", e),
}
核心: 所有权系统+类型系统实现无畏并发,编译期消除数据竞争。智能指针灵活管理内存,结合并发原语构建安全并发程序。