rCore OS Note - Chapter 7

第七章:进程间通信与 I/O 重定向, 阅读 rCore tutorial book v3 的笔记以及实践部分的实现与记录。

该章节尝试建立基于文件的统一I/O抽象, 将标准输入/标准输出的访问改造为基于文件描述符, 然后同样基于文件描述符实现一种父子进程之间的通信机制 —— 管道, 从而实现灵活的进程间通信, 并基于文件抽象和管道支持不同的独立进程之间的动态组合来实现复杂功能。

管道
一种数据交换的机制, 管道可被视为一种特殊的内存文件, 在进程被打开的 fd_table 中被管理。 管道通过复用文件读写的系统函数调用可以方便地进行数据交换。
信号
进程间通信的一种异步通知机制, 可以看成是一个应用发出某种信号, 希望另外一个应用能及时响应。
IPC OS details, rCore
IPC OS Details, rCore

0. 资料汇总

1. 文件抽象

文件 抽象统一了对 I/O 设备的管理, Everything is a file 的思想继承于 Multics OS 的 通用性 文件设计。 操作系统不关心文件的具体内容, 而只关心如何对文件按字节流进行读写。 rCore 对此进行了举例, 例如 键盘设备 对操作系统而言就是一个只读文件(读取键盘的输入数据), 屏幕设备 是只写文件(输出图片文字等信息) 等等。 在该章节基于文件的抽象, rCore 对标准输入和标准输出进行了重构, 为其实现了 File Trait, 使得进程能按照文件的接口与 I/O 外设交互。

一般来说, 文件的会涉及到, openclosereadwrite 这些基本操作。

另外, 进程被创建的时候内核会默认打开三个缺省的文件, 这个设计对于后续的 I/O 重定向而言非常重要。

  • fd: 0 标准输入
  • fd: 1 标准输出
  • fd: 2 标准错误输出

需要注意的是, 子进程需要继承其父进程的所有的文件描述符与父进程共享所有文件, 基于这个设计, pipe 管道机制能够让父子进程共用一套标准输入和输出接口, 这样就构成了进程之间通信的基本桥梁。

2. 管道

管道的设计就是基于以上文件抽象实现的几种机制, 其本质可被描述为一个 环形的缓冲队列, 具备 读端写端, 读写两端通过不同的文件描述符进行访问。 通过文件抽象, 我们能对读写进行权限控制, 控制数据的流向。 另外, 只有读写两端都关闭后管道的资源才能被回收, 这要求除了实现 sys_pipe 系统调用, 还需要实现 sys_close 对文件的关闭进行控制。

rCore 对管道的设计分为端口设计与缓存设计。

  • 端口设计 struct Pipe, 通过实现 File Trait 使得这两个端口能够通过文件描述符进行访问。
  • 缓存设计 struct PipeRingBuffer, 实际就是维护了一个环形的缓存队列。 值得注意的是 struct PipeArc<Mutex<PipeRingBuffer>> 维护对 PipeRingBuffer 的强引用计数, 这能保证在读写两端都被关闭后 PipeRingBuffer 中的资源能够被自动回收, 而 PipeRingBuffer::write_end 维护了一个弱引用计数, 这是为了后续检查写端是否全部被关闭的依据。

只有写端都被关闭后才能保证没有数据被遗漏, 此时读端的关闭才是安全的。 另外, 写端实际是对读端存在依赖的, 若仅有一个写端, pipe 的缓冲区一定会耗尽, 而只存在一个读端却没什么问题, 大不了每次读返回都是 0。

// os/src/fs/pipe.rs

pub struct Pipe {
    readable: bool,
    writable: bool,
    buffer: Arc<Mutex<PipeRingBuffer>>,
}

pub struct PipeRingBuffer {
    arr: [u8; RING_BUFFER_SIZE],
    head: usize,
    tail: usize,
    status: RingBufferStatus,
    write_end: Option<Weak<Pipe>>,
}

#[derive(Copy, Clone, PartialEq)]
enum RingBufferStatus {
    FULL,
    EMPTY,
    NORMAL,
}

3. 标准 I/O 重定向

重定向的想法在于对 fd 文件描述符进行替换。 由前述内容可知, 新建进程默认打开三个缺省的文件描述符分别描述标准输入(0), 标准输出(1), 标准错误输出(2), 文件描述符分配有一条重要的性质: 必定分配可用描述符中编号最小的一个, 而 sys_dup 做的工作就是将当前的文件拷贝到一个新的文件描述符。 所以, 例如我们先通过 sys_close 关闭标准输入, 那么再调用 sys_dup(current_fd) 就能让 fd=1 的位置的内容被替换为当前的目标文件。

// os/src/syscall/fs.rs

/// 功能:将进程中一个已经打开的文件复制一份并分配到一个新的文件描述符中。
/// 参数:fd 表示进程中一个已经打开的文件的文件描述符。
/// 返回值:如果出现了错误则返回 -1,否则能够访问已打开文件的新文件描述符。
/// 可能的错误原因是:传入的 fd 并不对应一个合法的已打开文件。
/// syscall ID:24
pub fn sys_dup(fd: usize) -> isize {
    let task = current_task().unwrap();
    let mut inner = task.acquire_inner_lock();
    if fd >= inner.fd_table.len() {
        return -1;
    }
    if inner.fd_table[fd].is_none() {
        return -1;
    }
    let new_fd = inner.alloc_fd();
    inner.fd_table[new_fd] = Some(Arc::clone(inner.fd_table[fd].as_ref().unwrap()));
    new_fd as isize
}

为了能让用户端使用重定向, rCore 对 user_shell 进行了改进使其支持用户参数输入。 另外 sys_exec 需要取出这些传入的命令行参数, 放入 args_vec: Vec<String> 中作为参数传入 TaskControlBlock::exec 中, 这些参数会被压入用户栈中。 压栈的主体代码如下代码所示, 为了更清晰的表示压栈的过程, 我绘制了压栈的过程图, 假设压入了两个命令行参数 aabb

// os/src/task/task.rs

impl TaskControlBlock {
    pub fn exec(&self, elf_data: &[u8], args: Vec<String>) {
        ...
        // push arguments on user stack
        user_sp -= (args.len() + 1) * core::mem::size_of::<usize>();
        let argv_base = user_sp;
        let mut argv: Vec<_> = (0..=args.len())
            .map(|arg| {
                translated_refmut(
                    memory_set.token(),
                    (argv_base + arg * core::mem::size_of::<usize>()) as *mut usize
                )
            })
            .collect();
        *argv[args.len()] = 0;
        for i in 0..args.len() {
            user_sp -= args[i].len() + 1;
            *argv[i] = user_sp;
            let mut p = user_sp;
            for c in args[i].as_bytes() {
                *translated_refmut(memory_set.token(), p as *mut u8) = *c;
                p += 1;
            }
            *translated_refmut(memory_set.token(), p as *mut u8) = 0;
        }
        // make the user_sp aligned to 8B for k210 platform
        user_sp -= user_sp % core::mem::size_of::<usize>();
        ...
        trap_cx.x[10] = args.len();
        trap_cx.x[11] = argv_base;
        ...
    }
}
Push stack process 1, HangX-Ma
Push stack process 1, HangX-Ma
Push stack process 2, HangX-Ma
Push stack process 2, HangX-Ma

压栈完成后 rCore 将 TrapContext中的 a0a1 两个寄存器的值分别设置为输入参数的数量以及前述的 argv_base。 这样, 在应用第一次进入用户态的时候, 放在 Trap 上下文 a0a1 两个寄存器中的内容可以被用户库中的入口函数以参数的形式接收。 rCore 在 _start 入口函数对这些起始地址做了转化, 将其转化为编写应用的时候看到的 &[&str] 的形式。

4. 信号

信号机制补足了单向事件通知的机制, 这是一种类似于硬件中断的软件级异步通知机制, 使得进程在接收到特定事件的时候能够暂停当前的工作并及时响应事件, 并在响应事件之后可以恢复当前工作继续执行。 如果进程没有接收到任何事件, 它可以执行自己的任务。 这里的暂停与恢复的工作, 都由操作系统来完成, 应用程序只需设置好响应某事件的事件处理例程就够了。

信号的 接收方 是一个进程。 当某进程或操作系统发出信号时,会指定信号响应的对象, 即某个进程的 pid, 并由该进程预设的信号处理例程来进行具体的信号响应。 接收到信号有多种处理方式, 最常见的三种如下:

  • 忽略:就像信号没有发生过一样。

  • 捕获:进程会调用相应的处理函数进行处理。

  • 终止:终止进程。

而信号的 发送方 有两个:

  • 异步信号: 进程在正常执行, 此时可能内核或者其他进程给它发送了一个信号, 这些就属于异步信号。
  • 同步信号: 由进程自身的执行触发, 在处理 Trap 的时候内核会将相应的信号直接附加到进程控制块中, 这种属于同步信号。
Signal process, rCore
Signal process, rCore

内核会在 Trap 处理完成即将返回用户态之前检查要返回到的进程是否还有信号待处理。 如果需要处理的话,取决于进程是否提供该种信号的处理函数, 若没有提供该函数内核会接管进行默认处理, 整体流程如上图所示。

4.1 信号相关的数据结构

进程间发送的信号是某种事件, UNIX 采用了整数来对信号进行编号, 这些整数编号都定义了对应的信号的宏名, 宏名都是以 SIG 开头, 比如 SIGABRT, SIGKILL, SIGSTOP, SIGCONT。 rCore 定义了一个 SignalFlags 类型用以描述这些信号量的集合, 这个数据结构会大量应用到信号处理相关的功能上。

bitflags! {
    pub struct SignalFlags: i32 {
        const SIGDEF = 1; // Default signal handling
        const SIGHUP = 1 << 1;
        const SIGINT = 1 << 2;
        ...
        const SIGSYS = 1 << 31;
    }
}

另外为了处理收到的信号, rCore 定义了 SignalAction 结构, 其中 handler 记录了信号处理的回调函数的入口地址, mask 则表示在处理该函数时需要屏蔽的其他信号的掩码, 这样内核就不会对这些信号进行处理而是直接回到用户态继续执行回调函数, 但这些被屏蔽的信号会被记录在 TCB 中延后处理。

/// Action for a signal
#[repr(C, align(16))]
#[derive(Debug, Clone, Copy)]
pub struct SignalAction {
    pub handler: usize,
    pub mask: SignalFlags,
}

4.2 信号的产生

如前所述, 信号产生有异步信号和同步信号, rCore 将其分作三类, 前两类是异步信号, 最后一类是同步信号。

  1. 进程通过 kill 系统调用给自己或者其他进程发送信号。
  2. 内核检测到某些事件给某个进程发送信号, 但这个事件与接收信号的进程的执行无关。
  3. 进程执行的时候触发了某些条件, 于是在 Trap 到内核处理的时候, 内核给该进程发送相应的信号。 典型的就是 SIGSEGV 段错误和 SIGILL 非法指令。

kill 系统调用

实现 kill 系统调用发送信号, 需要现在 TCB 中加入 SignalFlags 用以记录当前进程收到了哪些信号还未处理。 该系统调用的实现不算复杂, 我们需要给 manager.rs 中增添 PID2TCB 全局变量, 这个全局变量是一个 BtreeMap 数据结构, 在每次 add_task 的时候建立当前进程 pid 与当前进程的 TCB 的映射。 这样在实现 sys_kill 的时候能通过 pid2task 获取到对应的 TCB。

// os/src/task/task.rs

pub struct TaskControlBlockInner {
    ...
    pub signals: SignalFlags,
    ...
}

pub fn sys_kill(pid: usize, signum: i32) -> isize {
    if let Some(task) = pid2task(pid) {
        if let Some(flag) = SignalFlags::from_bits(1 << signum) {
            // insert the signal if legal
            let mut task_ref = task.inner_exclusive_access();
            if task_ref.signals.contains(flag) {
                return -1;
            }
            task_ref.signals.insert(flag);
            0
        } else {
            -1
        }
    } else {
        -1
    }
}

需要注意的是, 如果 TaskControlBlockInner::signals 中已经记录了未处理的信号, 在这个信号被处理之前不能加入同一个信号的待处理量。

内核信号生成

就是将之前说的 SIGSEGV 以及 SIGILL, 在捕获到相应的 trap 之后, 内核主动对 TaskControlBlockInner::signals 进行修改补充。

4.3 信号的处理

与信号处理相关的系统调用则有三个:

  • sys_sigaction:设置信号处理例程
  • sys_procmask: 设置进程的信号屏蔽掩码
  • sys_sigreturn: 清除栈帧,从信号处理例程返回

需要在 TaskControlBlock 中增加 signal_mask 以及 signal_actions 这两个变量用以屏蔽其他信号量以及记录如何对前述 TaskControlBlockInner::signals 记录的信号进行响应。 因而可以看到, SignalActions 实际上是个信号数量匹配的数组, 数组类型是 SignalAction 这样就能将信号量与待处理函数进行一一对应。 需要说明的是 TaskControlBlockInner::signal_mask 声明了在该进程内会屏蔽的信号, 而 SignalAction::mask 是该回调函数会屏蔽的信号, 二者需要做 操作。

// os/src/task/task.rs
pub struct TaskControlBlockInner {
    ...
    pub signal_mask: SignalFlags,
    pub signal_actions: SignalActions,
    ...
}

// os/src/task/action.rs
#[derive(Clone)]
pub struct SignalActions {
    pub table: [SignalAction; MAX_SIG + 1],
}
  • 进程可以通过 sys_sigaction 系统调用捕获某种信号, 暂停进程当前的执行, 调用进程为该种信号提供的函数对信号进行处理, 处理完成之后再恢复进程原先的执行。 可以看到 prev_action 中存储着之前的处理例程(回调函数), 需要将这部分内容存放在 old_action 中, 而将 action 中的内容替换进 table 中。

      pub fn sys_sigaction(
          signum: i32,
          action: *const SignalAction,
          old_action: *mut SignalAction,
      ) -> isize {
          ...
          if let Some(flag) = SignalFlags::from_bits(1 << signum) {
              if check_sigaction_error(flag, action as usize, old_action as usize) {
                  return -1;
              }
              let prev_action = inner.signal_actions.table[signum as usize];
              *translated_refmut(token, old_action) = prev_action;
              inner.signal_actions.table[signum as usize] = *translated_ref(token, action);
              0
          } else {
              -1
          }
      }
    
  • sys_procmask 就没啥好说的, 把传入进来的 flags 设置好就行。

  • sys_sigreturn 的实现在逻辑上也不复杂, 更新 TaskControlBlockInner::handling_sig 并恢复保存在 trap_ctx_backup 中的上下文, 需要注意的是返回的时候是 trap_ctx.x[10], 这是为了避免上下文返回时原来的 a0 返回值被覆盖而出现返回值变更的问题。

      // os/src/syscall/process.rs
    
      pub fn sys_sigreturn() -> isize {
          if let Some(task) = current_task() {
              let mut inner = task.inner_exclusive_access();
              inner.handling_sig = -1;
              // restore the trap context
              let trap_ctx = inner.get_trap_cx();
              *trap_ctx = inner.trap_ctx_backup.unwrap();
              trap_ctx.x[10] as isize
          } else {
              -1
          }
      }
    

关于 handle_signals 包括的一系列内容详见 rCore 的指导书, 并没有太多疑问。

5. 课后练习

5.1 编程练习

  • TODO: 这部分回头再说

5.2 实验练习

进程通信:邮箱

没有测例也不知道写的对不对, 基本上就是魔改一下 Pipe 的环形缓冲队列就整完了。

具体可以参考 commit#e9b19e5