- CS110L Spring 2020 仓库: https://github.com/HangX-Ma/cs110l-sp20
- CS110L Spring 2020 课程网站: https://reberhardt.com/cs110l/spring-2020/
CS110L Spring20 课程的 Lecture 12 提及了 Rust 多线程间的数据共享的问题。 一般来说, 线程之间会通过一块共享内存传递数据, 并使用 Arc
以及 Mutex
避免数据竞争 (Race Condition)。
但是 使用锁的办法是开销极大的。 假设内核调度时线程 A 遇到了上锁的数据, 虽然处理这一小段数据可能只需要几纳秒的时间, 但是剩余的时间片因为锁的原因而被浪费了, 而此时该线程又被内核放在了调度队列的末尾, 这意味着线程 A 真正运行时需要经历多轮调度等待很长时间。 那么最好的办法就是使用一种不上锁的数据结构。
1. Channel 模型的提出
-- Effective Golang
早在 1978 年就已经提出了 Channel 的理论模型, 多个线程之间的通信以消息的方式传递, 由于没有使用共享内存, 因而不会产生数据竞争。 但消息传递并不意味着高效, 由于我们没有使用共享内存, 这需要我们将需要传递的数据拷贝进 Message
中, 这会造成极大的资源浪费和严重的效率问题。
在 Rust 中可以通过浅拷贝达到数据共享的目的, 我们需要在 Message
中传递指针。 用以描述指针的数据结构所占的资源是极小的, 这种 Partial Shared Memory 的安全性也能得到保证。 由于 Rust 语言的特性, 指针的所有权会在传递时转移至 Channel 中, 编译器会保证我们在传递指针后无法再在除 Channel 外的任何地方使用这个指针。
完美的 Channel 模型应当是 MPMC (multi-producer, multi-consumer) 类型, 简单的 Mutex<T> + CondVar
组合能够实现 MPMC 但正如前述所言, 使用锁会降低效率。 比如在 Go 语言中实现了基于锁的安全的 MPMC Channel, 虽然使用了 futex (fast userspace mutex) 但由于锁的存在而效率低下。 因而, 标准的 Rust 库中实现的是 MPSC (multi-producer, single-consumer), 这种实现方式有如下优缺点:
- 优点
- 适用于归约数据汇总的场合, 多个线程需要将结果统一发送给一个单独的线程。
- 可以退化为 SPSC(single-producer, single-consumer) 进行线程与线程之间独立的通信。
- 缺点
- 当我们想分发数据的时候即一对多时, 情况就比较难办且复杂了。
- Rust 标准库中的 Channel 比较古老, 一些术语和标准没有统一, 不仅难用而且很有可能会被替代。
crossbeam 是 Rust 官方指定的 Channel 的实现库, 实现了 MPMC 的特性, 改进了很多 API, 甚至比当前的标准库的实现还要快上不少。
2. crossbeam Channel
这里使用 CS110L 课程的 farm v3.0 这个例子对 Channel 的使用进行解释说明。
use crossbeam_channel;
use core::num;
use std::{thread, time};
fn factor_number(...) {...}
fn main() {
// unbounded 表示使用的内存是无上限的
let (sender, receiver) = crossbeam::channel::unbounded();
let mut threads = Vec::new();
for _ in 0..num_cpus::get {
// 创建一个 receiver 的拷贝, 避免下一个线程因所有权转移而无法使用 receiver
let receiver = receiver.clone();
threads.push(thread::spawn(move || {
while let Ok(next_num) = receiver.recv() {
factor_number(next_num);
}
}));
}
let stdin = std::io::stdin();
for line in stdin.lock().lines() {
let num = line.unwrap().parse::<u32>().unwrap();
sender
.send(num)
.expect("Tried writing to channel, but there are no receivers!");
}
// 如果没有调用 drop, 那么通道关闭的两个条件: 发送者全部 drop 或接收者被 drop 就无法满足。
// 那么对于子线程而言, 如果它想从 while 循环中跳出必须要让 recv() 返回一个 Err(), 只有提早
// 通过 drop 函数关闭了 sender 才能让 Channel 关闭, 让子线程完成处理。
drop(sender);
for thread in threads {
thread.join().expect("Panic occurred in thread");
}
}
3. 参考
- CS110L, Spring 2020: Channels
- CS110L, Spring 2020: Synchronization
- CS110L, Spring 2020: Synchronization in Rust
- Rust Course: Channel
Thanks for Roberto Huertas providing the background image, you can find some from his blog: Wallpapers for the Rust of us