5.4 线程与同步原语
线程是操作系统中最基本的并发执行单元,它允许多个任务在同一进程中并发执行。线程的并发执行带来了性能的提升,但也引入了数据竞争、死锁等问题。为了解决这些问题,Rust 提供了多种同步原语(如互斥锁、信号量、条件变量等),用于协调线程的执行顺序和共享资源的访问。
5.4.1 线程的基本概念
线程是操作系统调度的基本单位,它是进程中的一个执行流。每个线程都有自己的栈和寄存器状态,但共享进程的地址空间和资源。
1. 线程的创建与销毁
Rust 通过 std::thread
模块提供了线程管理的功能。以下是一个创建线程的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Thread: {}", i);
thread::sleep(Duration::from_millis(500));
}
});
for i in 1..5 {
println!("Main: {}", i);
thread::sleep(Duration::from_millis(1000));
}
handle.join().unwrap();
}
|
在这个示例中,thread::spawn
函数用于创建一个新线程,handle.join()
用于等待线程执行完毕。
2. 线程的局部变量
每个线程都有自己的栈,因此线程之间无法直接共享局部变量。如果需要在线程之间共享数据,可以使用同步原语(如互斥锁、原子变量等)。
3. 线程的优先级
线程的优先级决定了线程获得 CPU 时间片的顺序。Rust 标准库没有直接提供设置线程优先级的功能,但可以通过操作系统 API 实现。
5.4.2 同步原语的基本概念
同步原语是用于协调线程执行顺序和共享资源访问的工具。常见的同步原语包括:
1. 互斥锁(Mutex)
互斥锁用于保护共享资源,确保同一时间只有一个线程可以访问资源。以下是一个使用互斥锁的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap()); // 输出 "Result: 10"
}
|
在这个示例中,Mutex
用于保护 counter
,确保多个线程可以安全地修改 counter
。
2. 信号量(Semaphore)
信号量用于控制对共享资源的访问数量。Rust 标准库没有直接提供信号量的实现,但可以通过条件变量和互斥锁实现。
以下是一个使用条件变量和互斥锁实现信号量的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
struct Semaphore {
count: Mutex<usize>,
condvar: Condvar,
}
impl Semaphore {
fn new(count: usize) -> Self {
Semaphore {
count: Mutex::new(count),
condvar: Condvar::new(),
}
}
fn acquire(&self) {
let mut count = self.count.lock().unwrap();
while *count == 0 {
count = self.condvar.wait(count).unwrap();
}
*count -= 1;
}
fn release(&self) {
let mut count = self.count.lock().unwrap();
*count += 1;
self.condvar.notify_one();
}
}
fn main() {
let semaphore = Arc::new(Semaphore::new(2));
let mut handles = vec![];
for i in 0..5 {
let semaphore = Arc::clone(&semaphore);
let handle = thread::spawn(move || {
semaphore.acquire();
println!("Thread {} is running", i);
thread::sleep(std::time::Duration::from_secs(1));
println!("Thread {} is done", i);
semaphore.release();
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
|
3. 条件变量(Condvar)
条件变量用于线程之间的条件等待和通知。以下是一个使用条件变量的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
println!("Thread started!");
}
|
在这个示例中,主线程等待子线程设置 started
为 true
,然后继续执行。
4. 屏障(Barrier)
屏障用于同步多个线程的执行进度。以下是一个使用屏障的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let barrier = Arc::new(Barrier::new(3));
let mut handles = vec![];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
let handle = thread::spawn(move || {
println!("Thread {} is waiting", i);
barrier.wait();
println!("Thread {} is running", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
|
在这个示例中,三个线程在 barrier.wait()
处同步,然后同时继续执行。
5.4.3 线程与同步原语的应用
线程与同步原语在 Rust 中有广泛的应用场景,以下是一些常见的应用示例:
1. 线程池
线程池是一种常见的并发编程模式,它通过预先创建一组线程来执行任务,避免频繁创建和销毁线程的开销。以下是一个简单的线程池实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: std::sync::mpsc::Sender<Job>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
let (sender, receiver) = std::sync::mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> Self {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
});
Worker { id, thread }
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} is running", i);
thread::sleep(std::time::Duration::from_secs(1));
println!("Task {} is done", i);
});
}
thread::sleep(std::time::Duration::from_secs(5));
}
|
2. 生产者-消费者模型
生产者-消费者模型是一种常见的并发编程模式,它通过消息队列协调生产者和消费者的执行顺序。以下是一个使用互斥锁和条件变量实现的生产者-消费者模型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
fn main() {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let not_empty = Arc::new(Condvar::new());
let producer_queue = Arc::clone(&queue);
let producer_not_empty = Arc::clone(¬_empty);
let producer = thread::spawn(move || {
for i in 0..10 {
let mut queue = producer_queue.lock().unwrap();
queue.push_back(i);
producer_not_empty.notify_one();
println!("Produced: {}", i);
thread::sleep(std::time::Duration::from_millis(500));
}
});
let consumer_queue = Arc::clone(&queue);
let consumer_not_empty = Arc::clone(¬_empty);
let consumer = thread::spawn(move || {
loop {
let mut queue = consumer_queue.lock().unwrap();
while queue.is_empty() {
queue = consumer_not_empty.wait(queue).unwrap();
}
let item = queue.pop_front().unwrap();
println!("Consumed: {}", item);
if item == 9 {
break;
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
|
5.4.4 线程与同步原语的性能考虑
线程与同步原语虽然提供了强大的并发编程功能,但也带来了一定的性能开销。以下是一些性能优化的建议:
- 减少锁的粒度: 使用更细粒度的锁,减少锁竞争。
- 使用无锁数据结构: 使用无锁数据结构(如原子变量)避免锁的开销。
- 避免过度同步: 尽量减少同步原语的使用,避免不必要的性能开销。
5.4.5 总结
线程与同步原语是并发编程中的核心工具,它们通过协调线程的执行顺序和共享资源的访问,确保程序的正确性和性能。Rust 提供了多种同步原语(如互斥锁、信号量、条件变量等),使得开发者能够轻松地实现并发程序。理解线程与同步原语的工作原理,对于编写高效、安全的并发程序至关重要。通过合理地使用线程与同步原语,可以构建出高性能、高可靠性的并发应用程序。