はじめに
rubyで一つのタブで複数のプロセスを立ち上げるためのgemでforemanというのがあるのですが、これのコア部分を勉強のためrustで実装してみたという話です。foremanのコア部分というのは以下の事を指します。
- マルチスレッドでのプロセスのIO処理
- 別スレッドでの子プロセスの死活監視とSIGTERM送信処理
- 別スレッドでのctrl-cの検知 (SIGTERM送信処理などの信号処理)
以上のような事をテーマにしたrustでの実装例がなかなか出てこなかったので自分で作って記事にしてみました。
※注意
- rust学び始めて2ヶ月弱なのでコードの書き方はベストではないかもしれません。悪しからず。
- 表現が適切ではないかもしれません。悪しからず。
作ったものの特徴
作ったもの => yukihirop/eg_foremanです。
(この記事が公開される頃にはスッキリしたコードになっていると思います。)
(ライフタイム指示子を使わない実装なのでメモリ効率?は悪いかもしれません。まだフレンドリーじゃないので使いこなせません。とりあえずは、三木道三/ライフタイムリスペクト「一生一緒にいてくれや」状態でいいかなって思ってます(笑)。)
process | concurrency | command |
---|---|---|
exit_0 | 1 | sleep 5 && echo 'success' && exit 0; |
exit_1 | 1 | sleep 5 && echo 'failed' && exit 1; |
loop | 2 | while :; do sleep 1 && echo 'hello world'; done; |
以上のように定義したプロセスを並行で実行する事ができます。
ctrl-c
をしないと5秒後に、exit_0
か exit_1
のプロセスが終了して、残りのプロセスに対して、 SIGTERM
を送ります。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/eg_foreman`
exit_0.1 | start at pid: 42256
exit_1.1 | start at pid: 42257
loop.2 | start at pid: 42258
loop.1 | start at pid: 42259
loop.1 | hello world
loop.2 | hello world
loop.1 | hello world
loop.2 | hello world
loop.2 | hello world
loop.1 | hello world
loop.1 | hello world
loop.2 | hello world
exit_1.1 | failed
exit_0.1 | success
system | sending SIGTERM for exit_1.1 at pid 42257
system | sending SIGTERM for loop.2 at pid 42258
system | sending SIGTERM for loop.1 at pid 42259
system | exit 0
5秒以内に ctrl-c
を検知したら、全ての子プロセスにSIGTERMを送ります。
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.01s
Running `target/debug/eg_foreman`
exit_0.1 | start at pid: 42170
exit_1.1 | start at pid: 42171
loop.1 | start at pid: 42172
loop.2 | start at pid: 42173
loop.2 | hello world
loop.1 | hello world
loop.2 | hello world
loop.1 | hello world
loop.2 | hello world
loop.1 | hello world
^Csystem | ctrl-c detected
system | sending SIGTERM for children
system | sending SIGTERM for exit_0.1 at pid 42170
system | sending SIGTERM for exit_1.1 at pid 42171
system | sending SIGTERM for loop.1 at pid 42172
system | sending SIGTERM for loop.2 at pid 42173
system | exit 0
これを引き出しとして持っておけば、改造して色んなものが作れるはずだと思います。 (たぶん...)
どうやって作っていったか
最初にまとめておくとこんな感じで作っていきました。
自分で何個も例題を用意してそれをクリアしていく形で作りました。
- マルチスレッドでの子プロセスのIO処理
- コマンドの実行と出力
- 子プロセスのspawnと出力
- マルチスレッドでの子プロセスのspawnと出力
- 出力を別モジュール化
- 子プロセスの
nix
のwaitpidを使った死活監視とSIGTERM送信- mpsc::channelを使って子プロセスIDを各スレッドから送って監視 (失敗)
- 無限ループで子プロセスのどれかが死ぬまで監視 (成功)
-
signal_hook
を使ったctrl-c
検知処理
マルチスレッドでの子プロセスのIO処理
コマンドの実行と出力
まずはコマンドの実行の仕方から学んでいきました。
公式ドキュメントに試せるサンプルがあったからそれを実際に試して感覚を掴みました。
(ちょっと改造してます。)
use std::process::Command;
fn main() {
let output = Command::new("echo")
.arg("Hello World")
.output()
.expect("Failed to execute command");
assert_eq!(b"Hello World\n", output.stdout.as_slice());
// https://qiita.com/4hiziri/items/dd9800ad7be42c395082
let bytes = output.stdout.as_slice();
let converted: String = String::from_utf8(bytes.to_vec()).unwrap();
println!("{}", converted); // Hello World
}
子プロセスのspawnと出力
次に子プロセスでの標準出力(標準エラー)を同じように取得するサンプルを考えました。
公式ドキュメントには完全な例はなかったですが、ヒントはあったのでなんとかサンプルを作る事ができました。
use std::process::{Command, Stdio};
fn main() {
let child = Command::new("echo")
.arg("Hello World")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed to execute child");
let output = child
.wait_with_output()
.expect("failed to wait on child");
assert_eq!(b"Hello World\n", output.stdout.as_slice());
// https://qiita.com/4hiziri/items/dd9800ad7be42c395082
let bytes = output.stdout.as_slice();
let converted: String = String::from_utf8(bytes.to_vec()).unwrap();
println!("{}", converted); // Hello World
}
できました。しかし、これは子プロセスが一つしかないシングルスレッドでの例です。
マルチスレッドだとどうなるのか次の考えました。
マルチスレッドでの子プロセスのspawnと出力
マルチスレッドだとどうなるのだろうと思って次のサンプルを作りました。
use std::process::{Command, Stdio};
use std::thread;
fn main() {
let mut threads = vec![];
for n in 0..2 {
let thread = thread::Builder::new()
.name(String::from(format!("child {}", n)))
.spawn(move || {
let child = Command::new("echo")
.arg(&format!("Hello World {}", n))
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed to execute child");
let output = child
.wait_with_output()
.expect("failed to wait on child");
// https://qiita.com/4hiziri/items/dd9800ad7be42c395082
let bytes = output.stdout.as_slice();
let converted: String = String::from_utf8(bytes.to_vec()).unwrap();
println!("{}", converted);
}).unwrap();
threads.push(thread);
}
for thread in threads {
thread.join().expect("failed join")
}
}
// Hello World 0
// Hello World 1
上手くいった上手くいったって感じだったのですが、子プロセスIDを表示してみたいなと思って安易に表示しようとしたらできませんでした。
let output = child
.wait_with_output()
.expect("failed to wait on child");
// ここに追加
println!("child pid: {}", &child.id());
wait_with_output
を実行する時に child
の所有権は奪われたから、借用できないよってエラーです。
11 | let child = Command::new("echo")
| ----- move occurs because `child` has type `Child`, which does not implement the `Copy` trait
...
19 | .wait_with_output()
| ------------------ `child` moved due to this method call
...
22 | println!("child pid: {}", &child.id());
| ^^^^^ value borrowed here after move
これは困りました。 なぜかというと将来的に、ctrl-cを検知して、別スレッドから子プロセスのIDを参照する必要があったからです。
公式ドキュメントを読んでいたら、wait_with_output
以外にも wait
があるという事を知りました。定義を見て借用だから所有権奪われなくて現在の問題が解決できるなって思いました。
pub fn wait_with_output(mut self) -> io::Result<Output>
pub fn wait(&mut self) -> io::Result<ExitStatus>
だが、戻り値がwait_with_output
と違う事に絶望しました。
一体どうすればいいのだと、公式ドキュメントを見てたら、子プロセスの標準出力や標準エラーは
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
として取得しますと書いてありした。「なんだ、ちゃんと他の方法も用意してくれてるじゃん!!」って思って戻り値を見てまた絶望しました。
- ChildStdout
- ChildStderr
「うぅ...構造体か... どうやったらこれから文字列で標準出力(エラー)を表示できるんだろう」って思いました。まぁ公式ドキュメントに書いてあるだろうと軽い気持ちで探してたのですが、これが全然書いてないのです。
出力を別モジュール化
「うわ、詰んだわ...」
って思ったのですが、GitHubで 「child.stdout.take() lang:rust
」で検索をかけたらこのコードが引かかりました。ですがコードを読んでみると問題があるような気がして、
- io::stdout()への書き込み処理がマルチスレッドだと上手くいななさそうー...
な気がしまして... しばらく悩み...想像で output::handle_output
とか handle_output
みたいな関数を定義してくれてる人おらんかなーみたいなノリでGitHubで検索したら無事引っかかって解決しました。
実際、これで検索したら引っかかります。「 fn handle_output Arc<Mutex
」
profileという第一引数をみてforemanっぽいのを作ろうとしている人だなってピンときて、第二引数の型が将来自分が作ろうとしている型と同じだったので、「これや!」となってこのコードを引っ張ってきました。
use crate::log;
use crate::process::Process;
use crate::stream_read::{PipeStreamReader, PipedLine};
use crossbeam_channel::Select;
use std::sync::{Arc, Mutex};
pub fn handle_output(proc: &Arc<Mutex<Process>>) {
let mut channels: Vec<PipeStreamReader> = Vec::new();
channels.push(PipeStreamReader::new(Box::new(
proc.lock().unwrap().child.stdout.take().expect("!stdout"),
)));
channels.push(PipeStreamReader::new(Box::new(
proc.lock().unwrap().child.stderr.take().expect("!stderr"),
)));
let mut select = Select::new();
for channel in channels.iter() {
select.recv(&channel.lines);
}
let mut stream_eof = false;
while !stream_eof {
let operation = select.select();
let index = operation.index();
let received = operation.recv(&channels.get(index).expect("!channel").lines);
match received {
Ok(remote_result) => match remote_result {
Ok(piped_line) => match piped_line {
PipedLine::Line(line) => {
// ここで標準出力(標準エラー)をprntln!している
log::output(&proc.lock().unwrap().name, &line);
}
PipedLine::EOF => {
stream_eof = true;
select.remove(index);
}
},
Err(error) => {
let err = format!("error: {:?}", error);
println!("{}", err);
}
},
Err(_) => {
stream_eof = true;
select.remove(index);
}
}
}
}
後書
簡単にこのコードの説明をすれば、
-
handle_output
は、crossbeam_channelを使って、rubyで言う所のIO.selectのような処理をしてます。IOオブジェクトとして、PipeStreamReader
を渡しているイメージ。 -
PipeStreamReader
は、streamから1バイトずつ読み込んで、改行コード(LF,0x0A
)を読み込むまで、bufferに突っ込んでいき、改行コードを読み込んだら、befferに突っ込んだ情報をString型に変換してsenderで送信。その後、bufferをクリアする処理をしています。
streamの型: Box<dyn io::Read + Send>
buffer: Vec<0u8>
sender: crossbeam_channelのunbounded()のsender
(当時、)内容はわかってなかったですが(後で学べばいいやーくらいの軽い気持ち)、Process構造体を作って使ってみたら上手くいったので良しとしました。
完成品はこんな感じになりました。
pub struct Process {
pub name: String,
pub child: Child,
}
一部抜粋です。
for (key, script) in scripts {
let con = script.concurrency;
let script = Arc::new(script);
for n in 0..con {
let script = script.clone();
let procs = procs.clone();
let handle_output = thread::Builder::new()
.name(String::from("handling output"))
.spawn(move || {
let tmp_proc = process::Process {
name: String::from(format!("{}.{}", key, n + 1)),
child: Command::new(&script.cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap(),
};
let proc = Arc::new(Mutex::new(tmp_proc));
let proc2 = Arc::clone(&proc);
let child_id = proc.lock().unwrap().child.id() as i32;
log::output(
&proc.lock().unwrap().name,
&format!("start at pid: {}", &child_id),
);
procs.lock().unwrap().push(proc);
// ここで使用
output::handle_output(&proc2);
})?;
proc_handles.push(handle_output);
}
}
これで第一関門を突破する事ができました。
ここまでのサンプル
子プロセスのnix
のwaitpidを使った死活監視とSIGTERM送信
次に終了した子プロセスを監視する方法の検討に入りました。「まぁ普通に考えて、子プロセスのIDをスレッドから送信する必要があるよな」と思って、mpsc::channel
を使って実装する事を考えました。
mpsc::channelを使って子プロセスIDを各スレッドから送って監視 (失敗)
まずはじめにスレッドから子プロセスのインスタンスを入れてあるベクターprocsから要素を取り出したりする必要があったから、Arc<Mutex化する必要がありました。
後でコードに出てくるのすが、死んだ子プロセスIDを使って、procsから除外する処理があります。
// ここで死んだ子プロセスのインスタンスを除去
procs.lock().unwrap().retain(|p| p.lock().unwrap().child.id() != pid as u32);
という事で、Arc<Mutex化
// before
let procs Vec<Arc<Mutex<process::Process>>> = vec![];
// after
let procs: Arc<Mutex<Vec<Arc<Mutex<process::Process>>>>> = Arc::new(Mutex::new(vec![]));
だいぶカオスだなって最初は思いましたね。 >
書きすぎて目がかすむほどに...
// 追加
let (tx, rx) = sync::mpsc::channel();
for (key, script) in scripts {
let con = script.concurrency;
let script = Arc::new(script);
for n in 0..con {
// 追加
let tx = tx.clone();
let script = script.clone();
let procs = procs.clone();
let handle_output = thread::Builder::new()
.name(String::from("handling output"))
.spawn(move || {
let tmp_proc = process::Process {
name: String::from(format!("{}.{}", key, n+1)),
child: Command::new(&script.cmd)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn().unwrap(),
};
let proc = Arc::new(Mutex::new(tmp_proc));
let proc2 = Arc::clone(&proc);
let child_id = proc.lock().unwrap().child.id() as i32;
log::output(&proc.lock().unwrap().name, &format!("start at pid: {}", &child_id));
// 追加
tx.send(child_id).unwrap();
procs.lock().unwrap().push(proc);
output::handle_output(&proc2);
})?;
proc_handles.push(handle_output);
}
}
「rust process Child」みたいな感じで散々調べていたので、nix
の存在は知っていました。
また、rubyのforemanのコードを以前読んでいたので、子プロセスの終了は waitpid
で待てばいいのだと思いました。で以下のように実装してみました。子プロセスの数だけスレッドを生成してwaitpidで終了を待つという実装になってます。
// 送られてきた子プロセスをwaitpidで終了監視、
// 終了を検知したら、残りの子プロセスにSIGTERMを送り終了させる。
for (idx, pid) in rx.iter().enumerate() {
let procs = procs.clone();
let check_child_terminated = thread::Builder::new()
.name(String::from(format!("check child terminated: {}", idx)))
.spawn(move || {
match nix::sys::wait::waitpid(Pid::from_raw(pid), None) {
Ok(_) => {
// ここで死んだ子プロセスのインスタンスを除去
procs.lock().unwrap().retain(|p| p.lock().unwrap().child.id() != pid as u32);
for proc in procs.lock().unwrap().iter() {
let proc = proc.lock().unwrap();
let child_id = proc.child.id();
log::output("system", &format!("sending SIGTERM to {} at pid {}", &proc.name, &child_id));
nix_signal::kill(
Pid::from_raw(child_id as i32),
Signal::SIGTERM,
)
.unwrap();
}
log::output("system", "exit 0");
exit(0);
},
Err(e) => log::error("system", &e)
};
})?;
proc_handles.push(check_child_terminated);
}
しかし、これでは上手くいきませんでした。
子プロセスのどれかが終了したら以下のような出力が安定して出るものだと思ってました。
system | sending SIGTERM to loop.2 at pid 28477
system | sending SIGTERM to exit_1.1 at pid 28479
system | sending SIGTERM to loop.1 at pid 28480
system | exit 0
しかし、頻発して以下のような出力が出てしまうのです。
exit_0.1 | start at pid: 51004
loop.1 | start at pid: 51005
loop.2 | start at pid: 51006
exit_1.1 | start at pid: 51007
loop.2 | hello world
loop.1 | hello world
loop.2 | hello world
loop.1 | hello world
loop.2 | hello world
loop.1 | hello world
loop.1 | hello world
loop.2 | hello world
exit_0.1 | success
exit_1.1 | failed
system | sending SIGTERM to exit_0.1 at pid 51004
thread 'check child terminated: 3' panicked at 'called `Result::unwrap()` on an `Err` value: Sys(ESRCH)', src/main.rs:107:30
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'check child terminated: 0' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src/main.rs:92:38
loop.2 | hello world
loop.1 | hello world
thread 'check child terminated: 3' panicked at 'called
Result::unwrap()
on anErr
value: Sys(ESRCH)', src/main.rs:107:30
このエラーは調べてみると、既に死んでる子プロセスにシグナルを送信しようとすると起るエラーだとわかりました。
子プロセスの数だけスレッドを立ち上げて、どっかのスレッドで子プロセスが死んだら、残り全ての子プロセスにSIGTERMを送るようになっているのでタイミングによっては起こりうるなという事でこの実装は駄目だとなりました。
「別にpid使って監視ではなく、どれか一つでも子プロセスが死んだ事を検知できればいいんだけどなぁー」と思いながら、rubyのforemanのコードを読んでたらそれができる事を思い出しまして、
# https://docs.ruby-lang.org/ja/latest/method/Process/m/wait2.html
Process.wait2(-1, Process::WNOHANG)
nix::sys::wait::waitpid(Pid::from_raw(-1), Some(nix::sys::wait::WaitPidFlag::WNOHANG),)
これを使って実装し直したわけです。
ここまでのサンプル
無限ループで子プロセスのどれかが死ぬまで監視 (成功)
子プロセスの監視は一つのスレッドでやるように実装し直しました。
子プロセスのどれかが死んだら、WaitStatus(enum)で定義されているどれかのexit_statusが帰ってくるのでパターンマッチさせて後は同じです。
let check_child_terminated = thread::Builder::new()
.name(String::from(format!("check child terminated")))
.spawn(move || {
loop {
// Waiting for the end of any one child process
match nix::sys::wait::waitpid(
Pid::from_raw(-1),
Some(nix::sys::wait::WaitPidFlag::WNOHANG),
) {
Ok(exit_status) => match exit_status {
WaitStatus::Exited(pid, code) => {
procs.lock().unwrap().retain(|p| {
let child_id = p.lock().unwrap().child.id() as i32;
Pid::from_raw(child_id) != pid
});
for proc in procs.lock().unwrap().iter() {
let proc = proc.lock().unwrap();
let child_id = proc.child.id();
log::output(
"system",
&format!(
"sending SIGTERM for {} at pid {}",
&proc.name, &child_id
),
);
nix_signal::kill(Pid::from_raw(child_id as i32), Signal::SIGTERM)
.unwrap();
}
log::output("system", &format!("exit {}", &code));
// close loop (thread finished)
exit(code);
}
_ => (),
},
Err(e) => log::error("system", &e)
};
}
})?;
proc_handles.push(check_child_terminated);
これで第二関門もクリアです。山場乗り越えた感じがあったのでやったねって感じでしたね。
ここまでのサンプル
signal_hook
を使ったctrl-c
検知処理
いよいよ最後の関門のctrl-cの検知からの全ての子プロセスへのSIGTERMの送信です。
実は第二関門よりも最初にできたのですが、記事の後ろに重めの内容を持ってくるのは読み心地悪いなって感じで入れ替えてます。記事の読み終わりはスッキリしてたいものです。
こちらは、「rust sigterm」みたいに検索したらHandling other types of signalsの記事がすぐ出てきて、そちらを参考にして実装したら出来た感じです。
// ここに追加
let procs_2 = Arc::clone(&procs);
let check_child_terminated = thread::Builder::new()
.name(String::from(format!("check child terminated")))
.spawn(move || {
loop {
let procs = Arc::clone(&procs_2);
let handle_signal = thread::Builder::new()
.name(String::from("handling signal"))
.spawn(move || signal::handle_signal(procs).expect("fail to handle signal"))?;
proc_handles.push(handle_signal);
pub fn handle_signal(
procs: Arc<Mutex<Vec<Arc<Mutex<Process>>>>>,
) -> Result<(), Box<dyn std::error::Error>> {
let signals = Signals::new(&[SIGALRM, SIGHUP, SIGINT, SIGTERM])?;
for sig in signals.forever() {
match sig {
SIGINT => {
log::output("system", "ctrl-c detected");
log::output("system", "sending SIGTERM for children");
for proc in procs.lock().unwrap().iter() {
let proc = proc.lock().unwrap();
let child = &proc.child;
log::output(
"system",
&format!("sending SIGTERM for {} at pid {}", &proc.name, &child.id()),
);
if let Err(e) = signal::kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM)
{
log::error("system", &e);
log::output("system", "exit 1");
exit(1);
}
}
log::output("system", "exit 0");
exit(0)
}
_ => (),
}
}
Ok(())
}
よしこれで完成だと思って、Ctrl-c
を押して、動作確認したらエラーが起きました。
.
.
.
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
system | error: Sys(ECHILD)
.
.
.
怒涛の如く出力が出たのでどこの処理がまずいか一発でわかりました。
このエラーは、waitpidする子プロセスが存在してないというエラーです。
今の実装だと子プロセスの監視でエラーが起きた時はエラーを吐き出すだけになってます。
Err(e) => log::error("system", &e)
};
}
})?;
proc_handles.push(check_child_terminated);
ここが駄目なんです。ECHILDをキャッチしてハンドルしてやる必要がありました。
こんな感じです。
Err(e) => {
if let nix::Error::Sys(nix::errno::Errno::ECHILD) = e {
// close loop (thread finished)
exit(0);
}
}
};
}
})?;
proc_handles.push(check_child_terminated);
はい! これで完成です。 🎉
ここまでのサンプル
まとめ
いやー時間かかりました。調査から完成まで2週間くらいかかりました。でもできてよかったです。
公式ドキュメント読んだ時は、「rustでマルチスレッドとか俺無理だわ...」って思ってましたが、作り終えて少しなりと自信がついたかと思います。書いてみて思ったのですが、Arc::cloneが多くなりがちで、変数名の付け方に困りますね。どうしても数字のsuffixで誤魔化してしまいます。
改善点も認識している範囲で以下の通りあるので、しばらくこのサンプルプログラムで遊学できそうです。
- main.rsに処理書きすぎ問題。関心事毎にモジュールへの切り出し
- 子プロセスからのIO処理を人のコード丸パクリで実装してる所の理解
- 標準出力(標準エラー)のカラーリング
- エラーハンドリングの追加
- テストの追加
- ライフタイム指示子を使った実装へのチャレンジ
以上です。長々と読み頂きありがとうございました。🙇♂️