はじめに
前回の記事では並列で実行したジョブの終了を「プロセス存在チェックによる方法」と「シグナルを使った方法」の2つの方法で実装しました。前者は 1秒(任意の秒数)ごとにプロセスの存在チェックをするためわずかに待ち時間があり、後者はシェルによってシグナルの細かい挙動が異なり動作に不安があるという問題がありました。これら2つは共にプロセスのみ(つまりファイルを使わずに)で並列処理が行えるというメリットがあるものの小さな問題が残りましたが、名前付きパイプを用いることでこの問題を解決することが出来ます。
名前付きパイプ
名前付きパイプ(または FIFO ファイル)は mkfifo
コマンドを使って作成する特殊なファイルです。もちろん mkfifo は POSIX で規定されているコマンドです。複数のプロセスから同一の FIFO ファイルを読み書きすることでプロセス間通信を行うことができます。ファイルを作成しますが実際にはメモリ上でデータのやり取りが行われるためディスクを読み書きしません。
簡単な実験をしてみましょう。次の2つのファイルを作成してください。
#!/bin/sh
while IFS= read -r line; do
echo "$line"
done < ./fifo
#!/bin/sh
while :; do
echo "$1"
sleep 1
done > ./fifo
そしてターミナルを3つ開きそれぞれ次のコマンドを実行します。
mkfifo ./fifo # 最初の一回のみ
./read.sh
./write test1
./write test2
そうすると main のターミナルで次のように出力されたと思います。
test1
test2
test1
test2
...
パイプへの書き込みは最小 512 バイト以下であればアトミックに行われます。 (参考 pipe - パイプと FIFO の概要)
POSIX.1-2001 では、 PIPE_BUF バイト以下の write(2) は atomic に行われること、つまりパイプへの出力データの書き込みは 連続したシーケンスとして行われることを必須としている (MUST)。PIPE_BUF バイトより多くのデータを書き込み場合は atomic とはならない、 つまりパイプへの他のプロセスによるデータの書き込みが間に入る 可能性がある。 POSIX.1-2001 の仕様では、 PIPE_BUFは最小でも 512 バイトであることが要求されている (Linux では PIPE_BUF は 4096 バイトである)。 正確な動作は、ファイルディスクリプターが nonblocking (O_NONBLOCK) かどうか、パイプへの書き込みが複数から行われるかどうか、および 書き込みを行うバイト数 n により決定される。
つまり sleep 1
をなくして出力を速くしたとしても2つの出力が混ざることはありません。反対に複数のプロセスで読み取る場合は一行単位で読み取るわけではなく最小1バイト単位で読み取るので注意が必要です。read
が改行コードを読み取るまで処理を返さないことから考えると「1バイト単位で意味を持つデータ+改行」としてデータを出力し、読み取り側は(一行に複数のプロセスから出力した文字が混ざるので)1バイト単位で処理をするというような工夫をする必要があるんじゃないかと思います。(複数のプロセスで読み取るということが必要になったことがないので検証してません。)
書き込み側、読み取り側のどちらを先に起動(正確には FIFO ファイルのオープン)しても反対側が起動するまで書き込みまたは読み取りはブロックされます。両側が起動し書き込み・読み取り両方が開始した後は、書き込み側が全ていない状態(正確には FIFO ファイルのクローズ)で読み取り側で全てのデータを読み取ってしまうと処理完了となり、逆に読み取り側が全ていなくなると、書き込み側は「write error: broken pipe」のようなエラーになります。
名前付きパイプはパイプの一種であるため、パイプと同様に容量に制限があるので注意してください。読み取り側が遅い場合、制限を超えて書き込みをしようとするとブロックされます。(エラーにはなりません。)パイプの容量はシステムによって異なりますがデフォルトで 64 KB ぐらいのようです。
実装
さて、名前付きパイプの挙動がわかった所で、これを並列処理に応用してみましょう。
#!/bin/sh
set -eu
MAX_PROC=4 # 最大並列数
FIFO_FILE="./fifo"
# INT(CTRL-C)、TERM による終了処理
terminate() {
trap '' TERM
kill -TERM 0
exit "$1"
}
trap "terminate 130" INT
trap "terminate 143" TERM
# fifo ファイルの作成と後片付け
finished() {
ex=$?
rm "$FIFO_FILE" 2>/dev/null ||: # エラーを無視する
exit "$ex"
}
trap finished EXIT
mkfifo "$FIFO_FILE" 2>/dev/null ||: # エラーを無視する
# 並列で実行される処理
func() {
echo "sleep $1"
sleep "$1"
echo "done $1" >&3
}
# 最初に最大並列数までプロセスを起動する
proc=0
while [ "$proc" -lt "$MAX_PROC" ] && IFS= read -r line; do
func "$line" 3>"$FIFO_FILE" &
proc=$((proc + 1))
done
# プロセスが一つ終了するたびに次のプロセスを起動する
while IFS= read -r fifo <&3; do
echo "$fifo" # 簡単な終了データを取得することが出来る
IFS= read -r line || continue
func "$line" 3>"$FIFO_FILE" &
done 3<"$FIFO_FILE"
wait # 最後のプロセスが完全に終了するまで念の為に待つ(ほとんど意味はない)
実行方法
$ seq 10 | sh test.sh
解説
コードはさほど解説する所もないと思います(上のコメントで十分かなと)。やってることはたいしたことじゃないのですが、ファイルディスクリプタを駆使しなければいけないので意外と苦労しました。ファイルのオープンとクローズのタイミングをよく考えるとこうしなきゃいけないとわかるんですが難しいですね・・・。
この方法のもう一つの利点は FIFO ファイルにより並列で実行された処理から一行単位の簡単な終了データを取得することが出来るということです。処理が正常終了したかどうかなどを知ることが出来ます。
さいごに
なぜこのコードを最初に出さなかったかと言うと fifo ファイルの後片付けが面倒なのと昔 mkfifo
が入ってない環境があったと記憶していたのでなるべく使わないようにしていたので思いつきませんでした(言い訳ですが・・・)。別件で名前付きパイプを使わなきゃいけないかなと思う処理を考えていて fifo ファイルでプロセスの終了を検出する(正確にはプロセスが最後に出力するデータを待つ)方法をふと思いつきました。例によって検証は甘々なんですが fifo ファイルが必要な所を除けば、おそらく前の記事の実装よりもよい方法だと思います。こうやってみると POSIX シェルは最小限の機能で実現できるようにうまく規定されてますよね。ただしその使い方がちゃんとまとめられてないように思えます。