ストリームを使った素数生成
その昔、第5世代コンピュータというプロジェクトがあり、そこでFlatGHCとかKL-1という細粒度並列プログラミング言語が作られていた。この言語は、すべての関数呼び出し(みたいなもの)が並列に動作し、未定義状態の変数に対して束縛することで計算が進む、今から考えても非常に特殊な言語だった。
その言語のサンプルとしてよく使われていたのが素数生成だった。アルゴリズムは、いわゆるエラトステネスの篩なのだが、各素数に対する篩を並列に再帰関数で表現されたプロセスとして動作させ、
リストをストリームとして使って、そのプロセス間をつなぐものだ。
ポイントは、篩を行うプロセスを自動的に生成していくところ。
KL-1でのコード
もうシンタックスとかもうろ覚えだし、手元にKL-1の処理系がないので動作が確認できないが、コードとしてはこんな感じ。
prime(N, R):-
gen(2, N, Numbers),
sieve(Numbers, R).
gen(Start, End, R) :- Start <= End |
R = [Start| T],
gen (Start + 1, End, T).
gen(Start, End, []) :- Start > End |
R = [].
sieve([H|T], R) :-
R = [H|R2],
filter(H, T, T2),
sieve(T2, R2).
sieve([], []).
filter(V, [H|T], R) :- H mod V = 0 |
filter(V, T, R).
filter(V, [H|T], R) :- H mod V != 0 |
R = [H|R2],
filter(V, T, R2).
filter(_, [], []).
Rustで書く
これをRustで書いてみた。プロセスはスレッドで、ストリームはチャンネルで実装する。
main関数はこんな感じ。2組チャンネルを作る。この2組のチャンネルは下図の2つの矢印に相当する。
1つ目のチャンネルの入力に2以上の自然数を流し込むと、2つ目のチャンネルの出力から素数が出てくる。出てきた素数は、iter().collect()
で回収。
use std::thread;
use std::sync::mpsc::{
channel,
Sender,
Receiver
};
fn main() {
let (numbers_in, to_sieves) = channel();
let (result_stream, result_out) = channel();
thread::spawn(move|| {
sieve(to_sieves, result_stream);
});
thread::spawn(move || {
for i in 2..1000 {
numbers_in.send(i).unwrap();
}
});
let res: Vec<i32> = result_out.iter().collect();
println!("Received {:?}", res);
}
篩プロセスは
- 1つ目に取り出した値が素数であると想定して、
- 素数を回収しているチャンネルにその値を書き、
- 入力列からその値の倍数をすべて削除した新しいストリームを作り、
- 次の篩プロセスを起動して、そのストリームを渡す。
fn sieve(in_stream: Receiver<i32>, result_stream: Sender<i32>) {
if let Ok(prime) = in_stream.recv() {
result_stream.send(prime).unwrap();
let (tx, rx) = channel();
thread::spawn(move|| {
sieve(rx, result_stream);
});
for i in in_stream {
if i % prime != 0 {
tx.send(i).unwrap();
}
}
}
}
結果を書き込むチャンネルを明示的にクローズしなければいけないかと思ったのだが、ドロップされたタイミングで、勝手にクローズされるので大丈夫。
感想
KL-1のコードにCopilotがガンガン適切なサジェストをしてくるのにおどろいた。一体なにがどうなってるんだ。
次はasync awaitで書いてみよう。