はじめに
初投稿です。
Rustの勉強のためにstd::sync::mpsc::channel擬きをpipe(2)を使って実装してみたい!
もちろんstd::sync:mpsc::channel
を使った方が便利です。
生ポインタやファイルディスクリプタを扱っていきます。
子プロセスとの通信についてもやってみる。
std::sync::mpsc::channel
Rustの非同期処理で便利に使えます。
2つのエンドポイントsender
とreciver
の2つをつくりsender -> receiver
に情報を送れます。
Rust By Exampleでもっと詳しくみれます。
Golangにもchannelがありますがあれは双方向で可能です。
Rust Playground
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
thread::spawn(move|| {
sender.send("foo").unwrap();
sender.send("bar").unwrap();
});
println!("{:?}", receiver.recv().unwrap()); // => "foo"
println!("{:?}", receiver.recv().unwrap()); // => "bar"
pipe(2)を用いた自作channel
方針
- 生のfdを扱ってみる
- pipe(2)/read(2)/write(2)を使い送受信
- スタックにあるString構造体(head, capacity, len)を送る
実装
rustc 1.26.0-nightly
全ソースコード
- Sender
std::mem::forgetを使ってstack領域のString構造体のdrop処理を無効にしています。
drop処理で生fdをcloseしてます。これめっちゃ便利で感動しました。
extern crate libc;
use std::mem;
use std::os::unix::io::RawFd;
use std::sync::mpsc::SendError;
use libc::{c_void, close, size_t};
use libc::write as c_write;
pub struct Sender {
fd: RawFd,
}
impl Drop for Sender {
fn drop(&mut self) {
unsafe {
close(self.fd);
}
}
}
impl Sender {
fn new(fd: RawFd) -> Self {
Sender { fd }
}
pub fn send(&self, msg: String) -> Result<(), SendError<String>> {
let p: *const c_void = &msg as *const String as *const u8 as *const c_void;
match unsafe { c_write(self.fd, p, mem::size_of_val(&msg) as size_t) } {
ret if ret < 0 => return Err(SendError(msg)),
_ => {
mem::forget(msg);
Ok(())
}
}
}
}
- Receiver
Senderから送られてきたものを復元していきます。
Stringなのでsizeがわかっているので簡単に復元できます。
extern crate libc;
use std::mem;
use std::cell::UnsafeCell;
use std::os::unix::io::RawFd;
use std::sync::mpsc::RecvError;
use libc::{c_void, close};
use libc::read as c_read;
pub struct Receiver {
fd: RawFd,
}
impl Drop for Receiver {
fn drop(&mut self) {
unsafe {
close(self.fd);
}
}
}
impl Receiver {
fn new(fd: RawFd) -> Self {
Receiver { fd }
}
pub fn recv(&self) -> Result<String, RecvError> {
unsafe {
let uc = UnsafeCell::new(mem::uninitialized());
let p: *mut c_void = uc.get() as *mut c_void;
match c_read(self.fd, p, mem::size_of::<String>()) {
ret if ret < 0 => return Err(RecvError),
_ => Ok(uc.into_inner()),
}
}
}
}
- channel
pipe(2)を呼んで生fdを取ってきてSender/Receiverを作ります。
extern crate libc;
use libc::c_int;
use libc::pipe as c_pipe;
fn channel() -> (Sender, Receiver) {
unsafe {
let mut fds: [c_int; 2] = mem::uninitialized();
c_pipe(fds.as_mut_ptr());
(Sender::new(fds[1]), Receiver::new(fds[0]))
}
}
- main
最初のstd::sync::mpsc::channel
の例と同じです。
extern crate libc;
use std::thread;
fn main() {
let (tx, rx) = channel();
let handle = thread::spawn(move || {
tx.send("foo".to_string()).unwrap();
tx.send("bar".to_string()).unwrap();
});
println!("{:?}", rx.recv().unwrap()); // => "foo"
println!("{:?}", rx.recv().unwrap()); // => "bar"
handle.join().unwrap();
}
子プロセスとの通信
std::sync::mpsc::channel
fork(2)を使って子プロセスとの通信がどうなるか試してみました。
fn main() {
use std::sync;
let (tx, rx) = sync::mpsc::channel();
let pid: c_int = unsafe { libc::fork() };
if pid == 0 {
println!("in child ({}): received {:?}", pid, rx.recv().unwrap());
println!("in child ({}): received {:?}", pid, rx.recv().unwrap());
println!("in child ({}): done", pid);
} else {
let msg = "hoge".to_string();
println!("in parent ({}): sending {:?}", pid, msg);
tx.send(msg).unwrap();
let msg = "fuge".to_string();
println!("in parent ({}): sending {:?}", pid, msg);
tx.send(msg).unwrap();
println!("in parent ({}): done", pid);
}
}
std::sync::mpsc::channelはスレッド間でのメッセージングをサポートするものなのでもちろん正常に動作しません。
"in child"すら出力されなかった。よくわからない挙動でした。
pipe(2)を用いた自作channel
fork(2)する前の状態をrecv()する場合のみ正常に動作します。
fork(2)が親プロセスをコピーしてるの子プロセスでもきちんとアクセスできますが、fork(2)後にsend()すると壊れます。
解決策
ヒープにある文字列そのものを送信する。
ソースコード
#おわり
unsafeばかりになってしまいました...
今回はstd::sync::mpsc::channel
の実装自体は見ていません。
生fdを扱っていますがstd::io::File
を使う方がいろいろ便利だと思います。
@tatsuya6502さんにアドバイス頂きました。ありがとうございます。