12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Rustのchannelを自作してみる

Posted at

はじめに

初投稿です。
Rustの勉強のためにstd::sync::mpsc::channel擬きをpipe(2)を使って実装してみたい!
もちろんstd::sync:mpsc::channelを使った方が便利です。
生ポインタやファイルディスクリプタを扱っていきます。
子プロセスとの通信についてもやってみる。

std::sync::mpsc::channel

Rustの非同期処理で便利に使えます。
2つのエンドポイントsenderreciverの2つをつくりsender -> receiverに情報を送れます。
Rust By Exampleでもっと詳しくみれます。
Golangにもchannelがありますがあれは双方向で可能です。
Rust Playground

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

thread::spawn(move|| {
    sender.send("foo").unwrap();
    sender.send("bar").unwrap();
});

println!("{:?}", receiver.recv().unwrap()); // => "foo"
println!("{:?}", receiver.recv().unwrap()); // => "bar"

pipe(2)を用いた自作channel

方針

  • 生のfdを扱ってみる
  • pipe(2)/read(2)/write(2)を使い送受信
  • スタックにあるString構造体(head, capacity, len)を送る
String構造体

実装

rustc 1.26.0-nightly
全ソースコード

  • Sender
    std::mem::forgetを使ってstack領域のString構造体のdrop処理を無効にしています。
    drop処理で生fdをcloseしてます。これめっちゃ便利で感動しました。
extern crate libc;

use std::mem;
use std::os::unix::io::RawFd;
use std::sync::mpsc::SendError;
use libc::{c_void, close, size_t};
use libc::write as c_write;

pub struct Sender {
    fd: RawFd,
}

impl Drop for Sender {
    fn drop(&mut self) {
        unsafe {
            close(self.fd);
        }
    }
}

impl Sender {
    fn new(fd: RawFd) -> Self {
        Sender { fd }
    }

    pub fn send(&self, msg: String) -> Result<(), SendError<String>> {
        let p: *const c_void = &msg as *const String as *const u8 as *const c_void;
        match unsafe { c_write(self.fd, p, mem::size_of_val(&msg) as size_t) } {
            ret if ret < 0 => return Err(SendError(msg)),
            _ => {
                mem::forget(msg);
                Ok(())
            }
        }
    }
}
  • Receiver
    Senderから送られてきたものを復元していきます。
    Stringなのでsizeがわかっているので簡単に復元できます。
extern crate libc;

use std::mem;
use std::cell::UnsafeCell;
use std::os::unix::io::RawFd;
use std::sync::mpsc::RecvError;
use libc::{c_void, close};
use libc::read as c_read;

pub struct Receiver {
    fd: RawFd,
}

impl Drop for Receiver {
    fn drop(&mut self) {
        unsafe {
            close(self.fd);
        }
    }
}

impl Receiver {
    fn new(fd: RawFd) -> Self {
        Receiver { fd }
    }

    pub fn recv(&self) -> Result<String, RecvError> {
        unsafe {
            let uc = UnsafeCell::new(mem::uninitialized());
            let p: *mut c_void = uc.get() as *mut c_void;
            match c_read(self.fd, p, mem::size_of::<String>()) {
                ret if ret < 0 => return Err(RecvError),
                _ => Ok(uc.into_inner()),
            }
        }
    }
}
  • channel
    pipe(2)を呼んで生fdを取ってきてSender/Receiverを作ります。
extern crate libc;

use libc::c_int;
use libc::pipe as c_pipe;


fn channel() -> (Sender, Receiver) {
    unsafe {
        let mut fds: [c_int; 2] = mem::uninitialized();
        c_pipe(fds.as_mut_ptr());
        (Sender::new(fds[1]), Receiver::new(fds[0]))
    }
}
  • main
    最初のstd::sync::mpsc::channelの例と同じです。
extern crate libc;

use std::thread;

fn main() {
    let (tx, rx) = channel();
    let handle = thread::spawn(move || {
        tx.send("foo".to_string()).unwrap();
        tx.send("bar".to_string()).unwrap();
    });
    println!("{:?}", rx.recv().unwrap()); // => "foo"
    println!("{:?}", rx.recv().unwrap()); // => "bar"
    handle.join().unwrap();
}

子プロセスとの通信

std::sync::mpsc::channel

fork(2)を使って子プロセスとの通信がどうなるか試してみました。


fn main() {
    use std::sync;
    let (tx, rx) = sync::mpsc::channel();
    let pid: c_int = unsafe { libc::fork() };

    if pid == 0 {
        println!("in child ({}): received {:?}", pid, rx.recv().unwrap());
        println!("in child ({}): received {:?}", pid, rx.recv().unwrap());
        println!("in child ({}): done", pid);
    } else {
        let msg = "hoge".to_string();
        println!("in parent ({}): sending {:?}", pid, msg);
        tx.send(msg).unwrap();

        let msg = "fuge".to_string();
        println!("in parent ({}): sending {:?}", pid, msg);
        tx.send(msg).unwrap();
        println!("in parent ({}): done", pid);
    }
}

std::sync::mpsc::channelはスレッド間でのメッセージングをサポートするものなのでもちろん正常に動作しません。
"in child"すら出力されなかった。よくわからない挙動でした。

pipe(2)を用いた自作channel

fork(2)する前の状態をrecv()する場合のみ正常に動作します。
fork(2)が親プロセスをコピーしてるの子プロセスでもきちんとアクセスできますが、fork(2)後にsend()すると壊れます。

解決策

ヒープにある文字列そのものを送信する。
ソースコード

#おわり
unsafeばかりになってしまいました...
今回はstd::sync::mpsc::channelの実装自体は見ていません。
生fdを扱っていますがstd::io::Fileを使う方がいろいろ便利だと思います。
@tatsuya6502さんにアドバイス頂きました。ありがとうございます。

12
6
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?