《深入Rust系统编程》5.4 线程与同步原语

线程是操作系统中最基本的并发执行单元,它允许多个任务在同一进程中并发执行。线程的并发执行带来了性能的提升,但也引入了数据竞争、死锁等问题。为了解决这些问题,Rust 提供了多种同步原语(如互斥锁、信号量、条件变量等),用于协调线程的执行顺序和共享资源的访问。

5.4 线程与同步原语

线程是操作系统中最基本的并发执行单元,它允许多个任务在同一进程中并发执行。线程的并发执行带来了性能的提升,但也引入了数据竞争、死锁等问题。为了解决这些问题,Rust 提供了多种同步原语(如互斥锁、信号量、条件变量等),用于协调线程的执行顺序和共享资源的访问。

5.4.1 线程的基本概念

线程是操作系统调度的基本单位,它是进程中的一个执行流。每个线程都有自己的栈和寄存器状态,但共享进程的地址空间和资源。

1. 线程的创建与销毁

Rust 通过 std::thread 模块提供了线程管理的功能。以下是一个创建线程的示例:

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("Thread: {}", i);
            thread::sleep(Duration::from_millis(500));
        }
    });

    for i in 1..5 {
        println!("Main: {}", i);
        thread::sleep(Duration::from_millis(1000));
    }

    handle.join().unwrap();
}

在这个示例中,thread::spawn 函数用于创建一个新线程,handle.join() 用于等待线程执行完毕。

2. 线程的局部变量

每个线程都有自己的栈,因此线程之间无法直接共享局部变量。如果需要在线程之间共享数据,可以使用同步原语(如互斥锁、原子变量等)。

3. 线程的优先级

线程的优先级决定了线程获得 CPU 时间片的顺序。Rust 标准库没有直接提供设置线程优先级的功能,但可以通过操作系统 API 实现。

5.4.2 同步原语的基本概念

同步原语是用于协调线程执行顺序和共享资源访问的工具。常见的同步原语包括:

1. 互斥锁(Mutex)

互斥锁用于保护共享资源,确保同一时间只有一个线程可以访问资源。以下是一个使用互斥锁的示例:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap()); // 输出 "Result: 10"
}

在这个示例中,Mutex 用于保护 counter,确保多个线程可以安全地修改 counter

2. 信号量(Semaphore)

信号量用于控制对共享资源的访问数量。Rust 标准库没有直接提供信号量的实现,但可以通过条件变量和互斥锁实现。

以下是一个使用条件变量和互斥锁实现信号量的示例:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

struct Semaphore {
    count: Mutex<usize>,
    condvar: Condvar,
}

impl Semaphore {
    fn new(count: usize) -> Self {
        Semaphore {
            count: Mutex::new(count),
            condvar: Condvar::new(),
        }
    }

    fn acquire(&self) {
        let mut count = self.count.lock().unwrap();
        while *count == 0 {
            count = self.condvar.wait(count).unwrap();
        }
        *count -= 1;
    }

    fn release(&self) {
        let mut count = self.count.lock().unwrap();
        *count += 1;
        self.condvar.notify_one();
    }
}

fn main() {
    let semaphore = Arc::new(Semaphore::new(2));
    let mut handles = vec![];

    for i in 0..5 {
        let semaphore = Arc::clone(&semaphore);
        let handle = thread::spawn(move || {
            semaphore.acquire();
            println!("Thread {} is running", i);
            thread::sleep(std::time::Duration::from_secs(1));
            println!("Thread {} is done", i);
            semaphore.release();
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

3. 条件变量(Condvar)

条件变量用于线程之间的条件等待和通知。以下是一个使用条件变量的示例:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);

    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        cvar.notify_one();
    });

    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        started = cvar.wait(started).unwrap();
    }

    println!("Thread started!");
}

在这个示例中,主线程等待子线程设置 startedtrue,然后继续执行。

4. 屏障(Barrier)

屏障用于同步多个线程的执行进度。以下是一个使用屏障的示例:

use std::sync::{Arc, Barrier};
use std::thread;

fn main() {
    let barrier = Arc::new(Barrier::new(3));
    let mut handles = vec![];

    for i in 0..3 {
        let barrier = Arc::clone(&barrier);
        let handle = thread::spawn(move || {
            println!("Thread {} is waiting", i);
            barrier.wait();
            println!("Thread {} is running", i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

在这个示例中,三个线程在 barrier.wait() 处同步,然后同时继续执行。

5.4.3 线程与同步原语的应用

线程与同步原语在 Rust 中有广泛的应用场景,以下是一些常见的应用示例:

1. 线程池

线程池是一种常见的并发编程模式,它通过预先创建一组线程来执行任务,避免频繁创建和销毁线程的开销。以下是一个简单的线程池实现:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;

type Job = Box<dyn FnOnce() + Send + 'static>;

struct ThreadPool {
    workers: Vec<Worker>,
    sender: std::sync::mpsc::Sender<Job>,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        let (sender, receiver) = std::sync::mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);
        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) -> Self {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
            println!("Worker {} got a job; executing.", id);
            job();
        });

        Worker { id, thread }
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Task {} is running", i);
            thread::sleep(std::time::Duration::from_secs(1));
            println!("Task {} is done", i);
        });
    }

    thread::sleep(std::time::Duration::from_secs(5));
}

2. 生产者-消费者模型

生产者-消费者模型是一种常见的并发编程模式,它通过消息队列协调生产者和消费者的执行顺序。以下是一个使用互斥锁和条件变量实现的生产者-消费者模型:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;

fn main() {
    let queue = Arc::new(Mutex::new(VecDeque::new()));
    let not_empty = Arc::new(Condvar::new());

    let producer_queue = Arc::clone(&queue);
    let producer_not_empty = Arc::clone(&not_empty);
    let producer = thread::spawn(move || {
        for i in 0..10 {
            let mut queue = producer_queue.lock().unwrap();
            queue.push_back(i);
            producer_not_empty.notify_one();
            println!("Produced: {}", i);
            thread::sleep(std::time::Duration::from_millis(500));
        }
    });

    let consumer_queue = Arc::clone(&queue);
    let consumer_not_empty = Arc::clone(&not_empty);
    let consumer = thread::spawn(move || {
        loop {
            let mut queue = consumer_queue.lock().unwrap();
            while queue.is_empty() {
                queue = consumer_not_empty.wait(queue).unwrap();
            }
            let item = queue.pop_front().unwrap();
            println!("Consumed: {}", item);
            if item == 9 {
                break;
            }
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

5.4.4 线程与同步原语的性能考虑

线程与同步原语虽然提供了强大的并发编程功能,但也带来了一定的性能开销。以下是一些性能优化的建议:

  1. 减少锁的粒度: 使用更细粒度的锁,减少锁竞争。
  2. 使用无锁数据结构: 使用无锁数据结构(如原子变量)避免锁的开销。
  3. 避免过度同步: 尽量减少同步原语的使用,避免不必要的性能开销。

5.4.5 总结

线程与同步原语是并发编程中的核心工具,它们通过协调线程的执行顺序和共享资源的访问,确保程序的正确性和性能。Rust 提供了多种同步原语(如互斥锁、信号量、条件变量等),使得开发者能够轻松地实现并发程序。理解线程与同步原语的工作原理,对于编写高效、安全的并发程序至关重要。通过合理地使用线程与同步原语,可以构建出高性能、高可靠性的并发应用程序。

继续阅读

探索更多技术文章

浏览归档,发现更多关于系统设计、工具链和工程实践的内容。

全部文章 返回首页