はじめに

初投稿です。
Rustの勉強のためにstd::sync::mpsc::channel擬きをpipe(2)を使って実装してみたい!
もちろんstd::sync:mpsc::channelを使った方が便利です。
生ポインタやファイルディスクリプタを扱っていきます。
子プロセスとの通信についてもやってみる。

std::sync::mpsc::channel

Rustの非同期処理で便利に使えます。
2つのエンドポイントsenderreciverの2つをつくりsender -> receiverに情報を送れます。
Rust By Exampleでもっと詳しくみれます。
Golangにもchannelがありますがあれは双方向で可能です。
Rust Playground

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

thread::spawn(move|| {
    sender.send("foo").unwrap();
    sender.send("bar").unwrap();
});

println!("{:?}", receiver.recv().unwrap()); // => "foo"
println!("{:?}", receiver.recv().unwrap()); // => "bar"

pipe(2)を用いた自作channel

方針

  • 生のfdを扱ってみる
  • pipe(2)/read(2)/write(2)を使い送受信
  • スタックにあるString構造体(head, capacity, len)を送る

String構造体

実装

rustc 1.26.0-nightly
全ソースコード

  • Sender
    std::mem::forgetを使ってstack領域のString構造体のdrop処理を無効にしています。
    drop処理で生fdをcloseしてます。これめっちゃ便利で感動しました。
extern crate libc;

use std::mem;
use std::os::unix::io::RawFd;
use std::sync::mpsc::SendError;
use libc::{c_void, close, size_t};
use libc::write as c_write;

pub struct Sender {
    fd: RawFd,
}

impl Drop for Sender {
    fn drop(&mut self) {
        unsafe {
            close(self.fd);
        }
    }
}

impl Sender {
    fn new(fd: RawFd) -> Self {
        Sender { fd }
    }

    pub fn send(&self, msg: String) -> Result<(), SendError<String>> {
        let p: *const c_void = &msg as *const String as *const u8 as *const c_void;
        match unsafe { c_write(self.fd, p, mem::size_of_val(&msg) as size_t) } {
            ret if ret < 0 => return Err(SendError(msg)),
            _ => {
                mem::forget(msg);
                Ok(())
            }
        }
    }
}
  • Receiver
    Senderから送られてきたものを復元していきます。
    Stringなのでsizeがわかっているので簡単に復元できます。
extern crate libc;

use std::mem;
use std::cell::UnsafeCell;
use std::os::unix::io::RawFd;
use std::sync::mpsc::RecvError;
use libc::{c_void, close};
use libc::read as c_read;

pub struct Receiver {
    fd: RawFd,
}

impl Drop for Receiver {
    fn drop(&mut self) {
        unsafe {
            close(self.fd);
        }
    }
}

impl Receiver {
    fn new(fd: RawFd) -> Self {
        Receiver { fd }
    }

    pub fn recv(&self) -> Result<String, RecvError> {
        unsafe {
            let uc = UnsafeCell::new(mem::uninitialized());
            let p: *mut c_void = uc.get() as *mut c_void;
            match c_read(self.fd, p, mem::size_of::<String>()) {
                ret if ret < 0 => return Err(RecvError),
                _ => Ok(uc.into_inner()),
            }
        }
    }
}
  • channel
    pipe(2)を呼んで生fdを取ってきてSender/Receiverを作ります。
extern crate libc;

use libc::c_int;
use libc::pipe as c_pipe;


fn channel() -> (Sender, Receiver) {
    unsafe {
        let mut fds: [c_int; 2] = mem::uninitialized();
        c_pipe(fds.as_mut_ptr());
        (Sender::new(fds[1]), Receiver::new(fds[0]))
    }
}
  • main 最初のstd::sync::mpsc::channelの例と同じです。
extern crate libc;

use std::thread;

fn main() {
    let (tx, rx) = channel();
    let handle = thread::spawn(move || {
        tx.send("foo".to_string()).unwrap();
        tx.send("bar".to_string()).unwrap();
    });
    println!("{:?}", rx.recv().unwrap()); // => "foo"
    println!("{:?}", rx.recv().unwrap()); // => "bar"
    handle.join().unwrap();
}

子プロセスとの通信

std::sync::mpsc::channel

fork(2)を使って子プロセスとの通信がどうなるか試してみました。

fn main() {
    use std::sync;
    let (tx, rx) = sync::mpsc::channel();
    let pid: c_int = unsafe { libc::fork() };

    if pid == 0 {
        println!("in child ({}): received {:?}", pid, rx.recv().unwrap());
        println!("in child ({}): received {:?}", pid, rx.recv().unwrap());
        println!("in child ({}): done", pid);
    } else {
        let msg = "hoge".to_string();
        println!("in parent ({}): sending {:?}", pid, msg);
        tx.send(msg).unwrap();

        let msg = "fuge".to_string();
        println!("in parent ({}): sending {:?}", pid, msg);
        tx.send(msg).unwrap();
        println!("in parent ({}): done", pid);
    }
}

std::sync::mpsc::channelはスレッド間でのメッセージングをサポートするものなのでもちろん正常に動作しません。
"in child"すら出力されなかった。よくわからない挙動でした。

pipe(2)を用いた自作channel

fork(2)する前の状態をrecv()する場合のみ正常に動作します。
fork(2)が親プロセスをコピーしてるの子プロセスでもきちんとアクセスできますが、fork(2)後にsend()すると壊れます。

解決策

ヒープにある文字列そのものを送信する。
ソースコード

おわり

unsafeばかりになってしまいました...
今回はstd::sync::mpsc::channelの実装自体は見ていません。
生fdを扱っていますがstd::io::Fileを使う方がいろいろ便利だと思います。
@tatsuya6502さんにアドバイス頂きました。ありがとうございます。

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.