Rust多线程编程
文章目录
- Rust多线程编程
- 使用线程模块
- 创建线程
- 线程传参
- 闭包(匿名函数)
- 值捕获
- 不可变引用捕获
- 可变引用捕获
- 线程闭包传参
- 更优雅地传参
- 回收线程
- 线程同步和通信
- channel 通道
- mutex 互斥锁
- Barrier 栅栏
- Atomic Types 原子类型
使用线程模块
rust标准库中提供了线程相关支持,直接引用即可使用:
use std::thread;
创建线程
使用spawn方法创建一个线程。如:
use std::thread;/* 引用线程模块 */
use std::time::Duration;
fn main() {std::thread::spawn(thread_function);loop {thread::sleep(Duration::from_secs(1));/* sleep 1s */println!("main thread running..");}
}fn thread_function(){loop {thread::sleep(Duration::from_secs(1));/* sleep 1s */println!("demo thread running..");}
}
创建后线程自动运行:
boys@server:~/rust_study/demo$ cargo runCompiling demo v0.1.0 (/home/boys/rust_study/demo)Finished dev [unoptimized + debuginfo] target(s) in 0.39sRunning `target/debug/demo`
main thread running..
demo thread running..
main thread running..
demo thread running..
main thread running..
demo thread running..
^C
boys@server:~/rust_study/demo$
线程传参
默认的spawn方法传递不了参数。如果要为线程传递参数,需要使用匿名函数作为spawn的参数,匿名函数也称为闭包。
闭包(匿名函数)
闭包的基本语法:
|param1, param2, ...| {// 函数体
}
使用闭包的好处是可以捕获函数体外的变量,传递到函数体内。
按捕获变量方式可以分为:
- 值捕获
- 普通引用捕获
- 可变引用捕获
值捕获
值捕获的方式会变量的所有权会转移到闭包内,外部无法再使用。如:
fn main(){let str = String::from("hello");let closure_print_string = move ||{/* move表明函数体内捕获的变量使用值捕获方式 */println!("number = {}", str);/* 使用值捕获方式捕获外部的str */};closure_print_string();println!("test for str: {}", str);/* 值捕获方式,str的所有权已经被转移到闭包内,这里无法再使用 */
}
不可变引用捕获
闭包会自动识别捕获的变量类型。根据函数体内捕获的变量的类型,确认捕获方式。
不可变变量按照不可变引用方式捕获使用,因此无法修改原变量值,只能访问变量值。如:
fn main(){let str = String::from("hello");/* str是不可变 */let closure = ||{println!("str: {}", str);// str.push_str("world");/* 不可变的变量按不可变引用方式捕获,此处修改了变量值会报错 */};closure();println!("test for str: {}", str);
}
可变引用捕获
将外部变量变为可变变量,即可在闭包函数体内修改变量的值。
fn main(){let mut str = String::from("hello");/* str是不可变 */let mut closure = ||{println!("before push, str: {}", str);str.push_str(" world");/* 不可变的变量按不可变引用方式捕获,此处修改了变量值会报错 */};closure();println!("test for str: {}", str);
}
同时闭包的类型也要定义为mut可变的。只要捕获外部的可变变量,都是定义为mut的。如:
fn main(){let num = 123;let mut str = String::from("hello");/* str是不可变 */let mut closure = ||{println!("num: {}", num);/* num不可变 */println!("before push, str: {}", str);str.push_str(" world");/* 不可变的变量按不可变引用方式捕获,此处修改了变量值会报错 */};closure();println!("test for str: {}", str);
}
线程闭包传参
现在就可以使用闭包的方法,将外部参数传递给线程了。
如:
fn main(){let word = "what are words";let thread_handle = std::thread::spawn(move ||{println!("just print 'word': {}", word);});thread_handle.join().unwrap();
}
这里必须使用值捕获方式,将字符串引用变量str的所有权转移到闭包内。因为编译时会检查闭包和当前函数的声明周期,发现闭包可能会在当前的函数结束后运行,因为闭包传递到了另一个线程。
closure may outlive the current function, but it borrows `word`, which is owned by the current function
更优雅地传参
通过闭包虽然可以传递外部参数到另一个线程,但闭包一般都用来实现比较简单的功能。对于线程来说,会有比较复杂的功能,所以更优雅的方式还是使用函数对一个线程做封装后,再闭包传递参数创建线程。如:
use std::time::Duration;
struct ThreadParam{thread_name: String
}
fn demo_thread(param: ThreadParam){for _ in 0..3{/* 使用下划线作为迭代变量,避免编译警告 */std::thread::sleep(Duration::from_secs(1));println!("thread's name: {}", param.thread_name);}
}
fn main(){let thread_arguments = ThreadParam{thread_name: String::from("demo_thread")};let thread_handle = std::thread::spawn(move ||{demo_thread(thread_arguments);});thread_handle.join().unwrap();/* 回收线程,防止主线程退出看不到线程效果 */
}
回收线程
使用join方法回收线程资源。如:
use std::thread;/* 引用线程模块 */
use std::time::Duration;
fn main() {let thread_handle = std::thread::spawn(thread_function);thread_handle.join().unwrap();println!("thread exit.");
}fn thread_function(){let mut count = 1;loop {thread::sleep(Duration::from_secs(1));/* sleep 1s */println!("demo thread run {} time..", count);count = count + 1;if count > 3 {break;}}
}
boys@server:~/rust_study/demo$ cargo runFinished dev [unoptimized + debuginfo] target(s) in 0.00sRunning `target/debug/demo`
demo thread run 1 time..
demo thread run 2 time..
demo thread run 3 time..
thread exit.
线程同步和通信
rust线程同步和通信提供了几种自带的机制:
- channel(通道)
- Mutex(互斥锁)
- 原子类型(Atomic Types)
- 条件变量(Condition Variable)
- 原子引用计数(Arc)
- 栅栏(Barrier)
channel 通道
通道只能使用在多生产者、单消费者的场景下,所在模块是 std::sync::mpsc
,mpsc是"Multi-Producer, Single-Consumer" 的缩写,即多生产者单消费者。
use std::sync::mpsc;
使用示例:
fn main(){let (sender, receiver) = std::sync::mpsc::channel();let sender1 = sender.clone();/* 通过克隆2个sender传递到线程中 */let sender2 = sender.clone();let sender1_thread_handle = std::thread::spawn(move||{std::thread::sleep(std::time::Duration::from_secs(1));sender1.send("hello").unwrap();});let sender2_thread_handle = std::thread::spawn(move||{std::thread::sleep(std::time::Duration::from_secs(2));sender2.send("bye").unwrap();});let mut is_connected = false;loop {let recv_data = receiver.recv().unwrap();if recv_data == "hello"{println!("recv 'hello', connected");is_connected = true;}if is_connected == true{if recv_data == "bye"{println!("recv 'bye', disconnected");break;}}}sender1_thread_handle.join().unwrap();sender2_thread_handle.join().unwrap();
}
mutex 互斥锁
导入模块:
use std::sync::Mutex;
Rust 中的互斥锁(Mutex)是一个泛型类型。它被定义为 std::sync::Mutex<T>
,其中的 T
是要保护的共享数据的类型。初始化时会自动判断数据类型。
下面是使用Mutex对整形变量做互斥访问的示例:
use std::sync::{Mutex, Arc};
use std::thread;fn main() {let data = Arc::new(Mutex::new(42));let thread1_data = Arc::clone(&data);let handle1 = thread::spawn(move || {// 使用 thread1_data 进行操作let mut value = thread1_data.lock().unwrap();*value += 1;println!("thread1 Value: {}", *value);});let thread2_data = Arc::clone(&data);let handle2 = thread::spawn(move || {// 使用 thread2_data 进行操作let value = thread2_data.lock().unwrap();println!("thread2 Value: {}", *value);});handle1.join().unwrap();handle2.join().unwrap();
}
这里使用了Arc原子引用计数类型,实现多个线程共享互斥锁的所有权。
Barrier 栅栏
在 Rust 中,你可以使用 Barrier
类型来实现线程栅栏。 Barrier
允许一组线程都到达一个点后再同时继续执行。
示例:
use std::sync::{Arc, Barrier};
use std::thread;fn main() {let barrier = Arc::new(Barrier::new(3)); // 创建一个包含3个参与者的栅栏for i in 0..3 {/* 创建3个线程 *//* 使用Arc原子引用计数将barrier共享给所有线程 */let barrier_clone = Arc::clone(&barrier);thread::spawn(move || {println!("Thread {} before barrier", i);barrier_clone.wait(); // 所有线程都到达此处后会同时继续执行println!("Thread {} after barrier", i);});}thread::sleep(std::time::Duration::from_secs(2)); // 等待足够的时间以确保所有线程完成println!("Main thread");
}
Atomic Types 原子类型
Rust中常用的原子类型有:
- AtomicBool 原子布尔类型
- AtomicI32 原子整数类型
- AtomicPtr 原子指针类型
原子类型是不可分割的类型,对原子类型变量的操作不可分割,因此,常用在多线程并发场景,避免出现竞态问题。
简单的使用示例:
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use std::thread;fn main() {let atomic_counter = Arc::new(AtomicI32::new(0));let mut handles = vec![];/* 使用vec保存线程句柄,回收资源使用 */for _ in 0..5 {let counter = Arc::clone(&atomic_counter);let handle = thread::spawn(move || {for _ in 0..1_0 {counter.fetch_add(1, Ordering::SeqCst);}});handles.push(handle);}for handle in handles {handle.join().unwrap();}println!("Final value: {}", atomic_counter.load(Ordering::SeqCst));
}
创建了5个线程对原子变量进行递增10次,输出结果为50。若未使用原子变量且不加互斥保护,得到的结果是未知的。
boys@server:~/rust_study/demo$ cargo run Compiling demo v0.1.0 (/home/boys/rust_study/demo)Finished dev [unoptimized + debuginfo] target(s) in 0.39sRunning `target/debug/demo`
Final value: 50