目录
- 一、一些概念
- 二、进程和线程
- 2.1 概念
- 2.2 多线程导致的问题
- 2.3 使用spawn创建新线程
- 2.4 线程与move闭包
- 三、消息传递
- 3.1 概念
- 3.2 创建通道
- 3.3 示例
- 3.4 其它测试
- 四、共享状态并发
- 4.1 互斥器
- 4.2 Mutex的API
- 4.3 多线程共享Mutex
- 1)在多线程之间共享值;
- 2)多线程和多所有权
- 3) 原子引用计数
- 4)RefCell/Rc与 Mutex/Arc的相似性
- 五、使用Sync和Send Trait的可扩展并发
- 5.1 send
- 5.2 Sync
一、一些概念
- 并发编程(Concurrent programming):程序的不同部分相互独立运行;
- 并行编译(parallel programming):程序不同部分同时运行;
- 这里将以上两个概念统称为并发,而不做更加细致的区分;
二、进程和线程
2.1 概念
- 进程(process):在大部分现代OS中,已执行的代码运行在一个进程中,OS管理多个进程;
- 线程(Thread):在程序内部同时运行的多个独立部分;
2.2 多线程导致的问题
- 线程是同时运行的,所以无法预先线程的执行顺序,因此导致以下问题:
- 竞争状态(Race conditions):多个线程以不一致的顺序访问数据或资源;
- 死锁(Deadlocks):两个线程相互等待对方所持有的资源;
- 只在某些情况下发生BUG,很难复现和修复;
2.3 使用spawn创建新线程
- 调用
thread::spawn
函数并传递一个新线程运行的代码闭包;
use std::thread;
use std::time::Duration;fn main() {thread::spawn(|| {for i in 1..10 {println!("hi number {} from the spawned thread!", i);thread::sleep(Duration::from_millis(1));}});for i in 1..5 {println!("hi number {} from the main thread!", i);thread::sleep(Duration::from_millis(1));}
}
程序运行结果如下
- 主线程和子线程一起运行,当主线程结束之后无论子线程是否结束,子线程都会被强制结束;
- 使用
thread::spawn
的返回值的join()
方法可以保证线程执行完成;
fn main() {let handle = thread::spawn(|| {for i in 1..10 {println!("hi number {} from the spawned thread!", i);thread::sleep(Duration::from_millis(1));}});for i in 1..5 {println!("hi number {} from the main thread!", i);thread::sleep(Duration::from_millis(1));}handle.join().unwrap();
}
- 通过handle的
join
方法会阻止当前运行的线程,直到handle所代表的线程运行结束;
- 把join()方法移动到最后一个for之前,像下面这样
fn main(){……handle.join().unwrap();for i in 1..5 {println!("hi number {} from the main thread!", i);thread::sleep(Duration::from_millis(1));}
}
则打印如下
- 可见,这是等待子线程运行结束后才会开始运行主线程;
2.4 线程与move闭包
- move闭包通过与
thread::spawn
一起使用,它允许在一个线程中使用另一个线程的数据; - 创建新线程时,可以使用move将值的所有权在线程之间进行转移;
use std::thread;
fn main() {let v = vec![1, 2, 3];let handle = thread::spawn(move || {println!("Here's a vector: {:?}", v);});// drop(v); //所有权被转移handle.join().unwrap();
}
- 通过move关键字,将变量v的所有权进行转移;
- 转移之后主线程里的v就不能再使用了;
三、消息传递
3.1 概念
- 消息传递(message passing) 是一个日益流行且安全的并发的方式;
- 通道(channel) 是Rust中实现消息传递并发的主要工具;
- 通道由两部分组成:一个发送者(transmitter)和一个接收者(receiver);
3.2 创建通道
- 使用
mpsc::channel
函数创建新通道,函数返回一个元组,分别表示发送端和接收端; - mpsc是多个生产者,单个消费者(multiple producer,single consumer)的缩写;
3.3 示例
use std::thread;
use std::sync::mpsc;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();});let received = rx.recv().unwrap();println!("Got: {}", received);
}
- 使用
mpsc::channel()
创建通道; - 使用move将发送者tx的所有权转移到子线程中;
- 发送端:
- 发送端的send方法用来获取需要放入通道的值;
- send方法返回一个
Result<T, E>
类型,这里发送失败时调用unwrap产生panic;
- 接收端:
- 接收端有两个接收方法:recv和try_recv;
- recv会阻塞主线程执行直到从通道中接收一个值,返回
Result<T, E>
,发送端关闭时收到一个错误; - try_recv不会阻塞,立即
Result<T, E>
;
3.4 其它测试
通道与所有权转移
use std::thread;
use std::sync::mpsc;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();println!("val is {}", val);});let received = rx.recv().unwrap();println!("Got: {}", received);
}
- 代码尝试在发送端的send之后将值打印出来,提示所有权已经发生了移动;
发送多个值并观察接收者在等待
use std::thread;
use std::sync::mpsc;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();thread::spawn(move || {let vals = vec![String::from("hi"),String::from("from"),String::from("the"),String::from("thread"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("Got: {}", received);}
}
- 子线程循环发送一些字符串;
- 主线程不再显式调用recv函数,而是将rx当作迭代器使用,循环输出接收到的值;
- 主线程的for循环里没有任何暂停的代码,因此它一直在等待从子线程发送的值;
通过克隆创建多个发送者
use std::thread;
use std::sync::mpsc;
use std::time::Duration;fn main() {let (tx, rx) = mpsc::channel();let tx1 = tx.clone();thread::spawn(move || {let vals = vec![String::from("tx1: hi"),String::from("tx1: from"),String::from("tx1: the"),String::from("tx1: thread"),];for val in vals {tx1.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});thread::spawn(move || {let vals = vec![String::from("tx: more"),String::from("tx: messages"),String::from("tx: for"),String::from("tx: you"),];for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});for received in rx {println!("Got: {}", received);}
}
- 发送之前调用发送端的
clone
方法再创建一个发送端; - 两个发送端者往通道里写入,接收端接收,从下面的结果中可以看到两个发送端的数据被交替接收到了;
四、共享状态并发
- Rust支持通过共享状态实现并发;
- 通道类似于单所有权:一旦将值传送到通道中,就无法再次使用这个值;
- 共享内存并发类似多所有权:多个线程可以同时访问同一块内存;
4.1 互斥器
- 互斥器(mutex)只允许一个线程访问某些数据;
- 使用互斥器:
- 在使用数据之前尝试获取互斥锁;
- 处理完被互斥器所保护的数据之后,必须解锁;
4.2 Mutex的API
- 通过
Mutex::new
创建Mutex<T>
,这是一个智能指针; - 使用
lock
方法获取锁,此方法会阻塞当前线程,直接获取锁为止; - 当拥有锁的线程panic了,
lock
会失败; lock
的返回值是MutexGuard
的智能指针,离开作用域时,自动解锁;
use std::sync::Mutex;fn main() {let m = Mutex::new(5);{let mut num = m.lock().unwrap();*num = 6;}println!("m = {:?}", m);
}
- 调用
Mutex::new
创建一个Mutex,其中的5是要保护的数据; - 调用
lock
获取锁,使用 unwrap()处理panic的情况; - 离开内部作用域后自动解锁;
4.3 多线程共享Mutex
1)在多线程之间共享值;
use std::sync::Mutex;
use std::thread;fn main() {let counter = Mutex::new(0);let mut handles = vec![];for _ in 0..10 {let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("Result: {}", *counter.lock().unwrap());
}
- 创建一个要保护的计数器counter,初始值是0和空的Vector;
- 创建10个子线程,将每个线程句柄放到Vector中,线程内部获取要保护的值并加1;
- 离开子线程作用域后自动释放锁资源;
- 最后在主线程中打印counter的值;
- value moved into closure here, in previous iteration of loop表示所有权已经在上一次循环中移动过了,因此这里不能再移动;
2)多线程和多所有权
Rc<T>
智能指针可以拥有多所有者;- 将
Mutex<T>
封装进Rc<T>
;
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;fn main() {let counter = Rc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Rc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("Result: {}", *counter.lock().unwrap());
}
编译报错
- 编译器提示:线程里
Rc<T>
不能被安全的发送; - 之前提过
Rc<T>
只能用在单线程中; - 多线程中使用原子引用计数;
3) 原子引用计数
- 原子引用计数
Arc<T>
是一个类似Rc<T>
并可以安全的用于并发环境的类型; - 封装在
std::sync::atomic
中; Arc<T>
与Rc<T>
的API相同;
use std::sync::{Mutex, Arc};
use std::thread;fn main() {let counter = Arc::new(Mutex::new(0));let mut handles = vec![];for _ in 0..10 {let counter = Arc::clone(&counter);let handle = thread::spawn(move || {let mut num = counter.lock().unwrap();*num += 1;});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("Result: {}", *counter.lock().unwrap());
}
- 输出正确结果:10
4)RefCell/Rc与 Mutex/Arc的相似性
- 和Cell家族一样,
Mutex<T>
提供了内部可变性; - 可以使用
RefCell<T>
改变Rc<T>
里的内容; - 可以使用
Mutex<T>
改变Arc<T>
里的内容; Mutex<T>
有死锁风险 ;
五、使用Sync和Send Trait的可扩展并发
- Rust语言本身的并发性较少,目前的并发特性都来自于标准库;
- 无需局限于标准库的并发,可以自己实现;
- Rust语言中有两个并发概念:
std::marker::Sync
和std::marker::Send
两个trait;
5.1 send
- Send标记的trait表明所有权可以在线程间传递;
- 几乎所有的Rust类型都是Send的;
Rc<T>
没有实现Send,它只适合单线程;- 任何完全由Send类型组成的类型也被标记为Send;
- 除了原始指针之外,几乎所有的基础类型都是Send;
5.2 Sync
- Sync标记的trait可以安全的被多个线程引用;
- 如果T是Sync,那么&T就是Send,这意味着引用可以安全的发送到另一个线程;
- 基础类型都是Sync
- 完全由Sync类型组成的类型也是Sync,但
Rc<T>
不是Sync的,RefCell<T>
和Cell<T>
家族也不是Sync的;
手动实现Send和Sync是不安全的