Async Programming in Rust: Channel

学习 CS110L Spring20 课程 Lecture 12: Channel, 总结相关原理及用法。

CS110L Spring20 课程的 Lecture 12 提及了 Rust 多线程间的数据共享的问题。 一般来说, 线程之间会通过一块共享内存传递数据, 并使用 Arc 以及 Mutex 避免数据竞争 (Race Condition)。

但是 使用锁的办法是开销极大的。 假设内核调度时线程 A 遇到了上锁的数据, 虽然处理这一小段数据可能只需要几纳秒的时间, 但是剩余的时间片因为锁的原因而被浪费了, 而此时该线程又被内核放在了调度队列的末尾, 这意味着线程 A 真正运行时需要经历多轮调度等待很长时间。 那么最好的办法就是使用一种不上锁的数据结构。

1. Channel 模型的提出

Do not communicate by sharing memory; instead, share memory by communicating

-- Effective Golang

早在 1978 年就已经提出了 Channel 的理论模型, 多个线程之间的通信以消息的方式传递, 由于没有使用共享内存, 因而不会产生数据竞争。 但消息传递并不意味着高效, 由于我们没有使用共享内存, 这需要我们将需要传递的数据拷贝进 Message 中, 这会造成极大的资源浪费和严重的效率问题。

在 Rust 中可以通过浅拷贝达到数据共享的目的, 我们需要在 Message 中传递指针。 用以描述指针的数据结构所占的资源是极小的, 这种 Partial Shared Memory 的安全性也能得到保证。 由于 Rust 语言的特性, 指针的所有权会在传递时转移至 Channel 中, 编译器会保证我们在传递指针后无法再在除 Channel 外的任何地方使用这个指针。

完美的 Channel 模型应当是 MPMC (multi-producer, multi-consumer) 类型, 简单的 Mutex<T> + CondVar 组合能够实现 MPMC 但正如前述所言, 使用锁会降低效率。 比如在 Go 语言中实现了基于锁的安全的 MPMC Channel, 虽然使用了 futex (fast userspace mutex) 但由于锁的存在而效率低下。 因而, 标准的 Rust 库中实现的是 MPSC (multi-producer, single-consumer), 这种实现方式有如下优缺点:

  • 优点
    • 适用于归约数据汇总的场合, 多个线程需要将结果统一发送给一个单独的线程。
    • 可以退化为 SPSC(single-producer, single-consumer) 进行线程与线程之间独立的通信。
  • 缺点
    • 当我们想分发数据的时候即一对多时, 情况就比较难办且复杂了。
    • Rust 标准库中的 Channel 比较古老, 一些术语和标准没有统一, 不仅难用而且很有可能会被替代。

crossbeam 是 Rust 官方指定的 Channel 的实现库, 实现了 MPMC 的特性, 改进了很多 API, 甚至比当前的标准库的实现还要快上不少。

2. crossbeam Channel

这里使用 CS110L 课程的 farm v3.0 这个例子对 Channel 的使用进行解释说明。

use crossbeam_channel;
use core::num;
use std::{thread, time};

fn factor_number(...) {...}

fn main() {
    // unbounded 表示使用的内存是无上限的
    let (sender, receiver) = crossbeam::channel::unbounded();

    let mut threads = Vec::new();
    for _ in 0..num_cpus::get {
        // 创建一个 receiver 的拷贝, 避免下一个线程因所有权转移而无法使用 receiver
        let receiver = receiver.clone();
        threads.push(thread::spawn(move || {
            while let Ok(next_num) = receiver.recv() {
                factor_number(next_num);
            }
        }));
    }

    let stdin = std::io::stdin();
    for line in stdin.lock().lines() {
        let num = line.unwrap().parse::<u32>().unwrap();
        sender
            .send(num)
            .expect("Tried writing to channel, but there are no receivers!");
    }

    // 如果没有调用 drop, 那么通道关闭的两个条件: 发送者全部 drop 或接收者被 drop 就无法满足。
    // 那么对于子线程而言, 如果它想从 while 循环中跳出必须要让 recv() 返回一个 Err(), 只有提早
    // 通过 drop 函数关闭了 sender 才能让 Channel 关闭, 让子线程完成处理。
    drop(sender);

    for thread in threads {
        thread.join().expect("Panic occurred in thread");
    }
}

3. 参考

Thanks for Roberto Huertas providing the background image, you can find some from his blog: Wallpapers for the Rust of us