D言語 Advent Calendar 11日目の記事です.
std.concurrency
D言語にはErlangやGoのように組み込みで並行処理向けの関数/シンタックスはありません.その代わりstd.concurrencyを使うことで,メッセージパッシングスタイルの並行処理が出来るようになります.
多分日本ではTDPL以外であんまり記事がないので,簡単に使い方とかについて書きます.
対象バージョンは最新のdmd 2.064.2です.
ちなみに,単純に処理を並列化して結果を取得したい場合には,std.parallelismというのがあります.
使い方
std.concurrencyでは,並行に動く単位をコンテキストと呼びます.なので,これ以降もコンテキストと書きますが,実体としては関数だったりopCallを持つオブジェクトのことを指します.
コンテキストの起動
新しくコンテキストを作るにはspawn
を使います.
import std.stdio, std.concurrency;
void func(string message)
{
writeln(message);
}
void main()
{
spawn(&func, "Hello!");
}
spawn
の第一引数に新しいコンテキストで起動したいオブジェクト,それ移行の引数は新しいコンテキストで受け取る引数になります.これを走らせると,Hello!
と表示されます.
新しいコンテキストとのやりとり
さっきの例だと起動時に値を渡してましたが,実際の利用ではむしろコンテキスト内でデータをやりとりするのがほとんどです.std.concurrencyでデータをやりとりするためにはsend
,receive
, receiveOnly
を使います.
Goとかとは違い明示的にchannelを渡すことはなく,spawn
によって作られたコンテキストには最初からchannel(std.concurrencyではメッセージボックスと呼ばれる)が割り当てられます.
sendとreceiveOnly
起動したコンテキストに値を送るにはsend
を使います.spawn
すると新しいコンテキストを表すTid
型のidが返ってくるので,それに対してsend
を呼び出します.
新しいコンテキスト側では,値を受ける時にパターンが一つだけの場合receiveOnly
を使うことが出来ます.
import std.stdio, std.concurrency;
void func()
{
auto num = receiveOnly!(int);
writeln("Received: ", num);
}
void main()
{
auto tid = spawn(&func);
send(tid, 42); // UFCSがあるので tid.send(42); でも良い
}
同時に複数の値を送ることも出来,その時はstd.typecons.Tuple
が返ってきます.
import std.stdio, std.concurrency;
void func()
{
auto msg = receiveOnly!(int, string);
writeln("Received number: ", msg[0]);
writeln("Received message: ", msg[1]);
}
void main()
{
auto tid = spawn(&func);
send(tid, 42, "Hello!");
}
sendとreceive
receiveOnly
は使い方は簡単ですが,アプリケーションを書いて行くと,コンテキストとやりとりする型は複数になったりします.そのような場合には,複数の型のパターンを同時に扱うことが出来るreceive
を使います.
receive
は引数に複数のハンドラを登録し,そのハンドラの引数との型でマッチングを行うことで,複数のメッセージを処理します.
import std.stdio, std.concurrency;
void func()
{
receive((int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); });
}
void main()
{
auto tid = spawn(&func);
// 下のどっちでもfunc内で処理可能
send(tid, 42);
send(tid, 42.0f);
}
receive
はハンドラに存在しない型のメッセージを受け取ったら,何も処理をせずメッセージボックスに残したままにします.
これはこれでメッセージ処理に柔軟性をもたらしていて便利ですが,場合によっては捨てたいこともあるかもしれません.そういう時には末尾にstd.variant.Variant
型のハンドラを登録し,すべての処理をそこに流すようにするのが一般的です.
import std.stdio, std.concurrency;
void func()
{
bool running = true;
while (running) {
receive((int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); },
(Variant v) {
writeln("Received unknown type. Terminated...");
running = false;
});
}
}
void main()
{
auto tid = spawn(&func);
send(tid, 42);
send(tid, 42.0f);
send(tid, "Hello!");
}
receiveTimeout
今までは書きませんでしたが,receive
やreceiveOnly
はメッセージが届くまで,Goのchannelなどと同様にブロックします.receive
の最初のサンプルである
receive((int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); });
は,誰かがこのコンテキストにsend
をしない限り,止まるということです.
receiveTimeout
を使うと,ずっとブロックすることはなく,指定した時間で処理を中断します.
import std.stdio, std.concurrency;
import core.thread;
void func()
{
receiveTimeout(dur!"seconds"(1),
(int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); });
writeln("1 seconds passed");
}
void main()
{
auto tid = spawn(&func);
send(tid, "string");
Thread.sleep(dur!"seconds"(2));
}
1秒経ったら”1 seconds passed”が表示されます.
他のコンテキストとのやりとり
今までは新しいコンテキストとのやりとりの例でしたが,他のコンテキストとももちろん同様のことが出来ます.使うものは変わらずsend
,receive
,receiveOnly
の3つです.
親コンテキストへの送信
コンテキスト内では自分のTidを取得するthisTid
と,spawn
をした親のTid
を取得するownerTid
の二つのプロパティがあります.このownerTid
を使うことで,親コンテキストにメッセージを送信出来ます.
import std.stdio, std.concurrency;
void func()
{
auto msg = receiveOnly!(string);
send(ownerTid, "pong");
}
void main()
{
auto tid = spawn(&func);
send(tid, "ping");
writeln("Received message: ", receiveOnly!(string));
}
新しいコンテキスト同士の送信
これには直接Tid
を渡す方法と,Tidレジストリを経由する二つの方法があります.直接Tid
を渡す方法ですが,単にspawn
の引数で渡すかsend
で送るかという,すでに説明した方法になります.
import std.stdio, std.concurrency;
void func1()
{
auto tid = receiveOnly!(Tid);
send(tid, "pong");
}
void func2(Tid tid1)
{
send(tid1, ownerTid);
}
void main()
{
auto tid1 = spawn(&func1);
auto tid2 = spawn(&func2, tid1);
writeln("Received message: ", receiveOnly!(string));
}
Tidレジストリは,あるTid
に対して名前をつけることが出来るregister
,名前を指定してTidを取得するlocate
の二つの関数を使って,動的にTid
をやりとりする方法です.
import std.stdio, std.concurrency;
void func1()
{
register("func1", thisTid);
auto tid = receiveOnly!(Tid);
send(tid, "pong");
}
void func2()
{
auto func1 = locate("func1");
auto owner = locate("owner");
send(func1, owner);
}
void main()
{
register("owner", thisTid);
auto tid1 = spawn(&func1);
auto tid2 = spawn(&func2);
writeln("Received message: ", receiveOnly!(string));
}
基本的には最初の引数で渡す方法で十分ではないかと思います.
メッセージボックスの設定
各コンテキストが持っているメッセージボックスに対して,サイズや挙動をsetMaxMailboxSize
関数を使って設定することが出来ます.シグネチャは以下のようになっています.
void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
messages
がメッセージボックスの大きさで,0を設定すると制限がなくなります.OnCrowding doThis
は,メッセージボックスが満杯になった時の挙動を指定します.
- OnCrowding.block
メッセージボックスに空きが出来るまで,send
をブロック
- OnCrowding.ignore
メッセージを捨てる
- OnCrowding.throwException
MailboxFull
例外を投げる
以上の3つがデフォルトで利用出来るので,アプリケーションの要件に合わせて適宜指定してください.
独自ハンドラの指定
setMaxMailBoxSize
を使って独自ハンドラを指定することも出来ます.
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis);
シグネチャはこの通りなので, bool function(Tid)
な関数を渡してあげることで,メッセージボックス満杯時の挙動をカスタマイズ出来ます.Tid
はメッセージボックスを持ってるコンテキストになります.
エラー処理
今まで言及してこなかったエラーハンドリング周りについて,ここでまとめて説明します.
MessageMismatch
今までの例を見て,std.concurrencyのメッセージボックスにはGoのようにchannelそのものに型がついていないことに気づくと思います.ここで,receiveOnlyに型が違うメッセージを送ったらどうなるのかというと,MessageMismatch
例外が飛びます.
// receiveOnlyの最初の例を以下のように置き換えてみる
void func()
{
try {
auto num = receiveOnly!(string);
writeln("Received: ", num);
} catch (MessageMismatch e) {
writeln(e.msg);
}
}
上の例を動かすとUnexpected message type: expected 'string', got 'int'
というメッセージが表示されるようになります.receiveはすでにメッセージボックスに残ると説明してあるので,この例外が飛ぶのはreceiveOnlyのみです.
OwnerTerminated
新しいコンテキストは別スレッドで動いているので,場合によっては親が先に終了する時があります.この時には,receive
からOwnerTerminated
例外が投げられます.適宜catch
しましょう.
void func()
{
try {
receive((int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); });
} catch (OwnerTerminated e) {
writeln("Owner dead");
}
}
また,この例外は実は通常のメッセージと同じように扱われていて,ハンドラとして処理することも出来ます.
void func()
{
receive((int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); },
(OwnerTerminated e) { writeln("Owner dead"); });
}
spawnLinked
通常,親は子となる新しいコンテキストが死んだかどうかというのには関知しません.そのため,すでに起動したコンテキストが終了しているのにreceive
してもずっとブロックするだけです.
終了した段階で何か通知が来てくれると,各コンテキストの管理がしやすくなります.std.concurrencyにはその用途のために,spawnLinked
というspawn
の派生があります.
import std.stdio, std.concurrency;
void func()
{
auto msg = receiveOnly!(string);
send(ownerTid, "pong");
}
void main()
{
auto tid = spawnLinked(&func);
send(tid, "ping");
writeln("Received message: ", receiveOnly!(string));
writeln("Received message again: ", receiveOnly!(string));
}
この例では,2回目のreceiveOnlyでLinkTerminated
例外が投げられます.試しにspawnLinked
をspawn
にしてみると,処理がブロックするようになります.コンテキストがチェインになったりする場合には,spawnLinked
を使ってリンクを張り,終了をチェインさせて処理していくのがよくあるパターンだと思います.
優先メッセージ
特別な状況に陥った場合など,あるメッセージをすでにsend
したメッセージよりも優先的に送りたい時があります.そのような場合にはprioritySend
が使えます.これを使うと,通常のメッセージボックスにどれだけメッセージが貯まっていても,prioritySend
を使ったメッセージが先に処理されます.
import std.stdio, std.concurrency;
void func()
{
receive((int i) { writeln("Received an int: ", i); },
(float f) { writeln("Received a float: ", f); });
}
void main()
{
auto tid = spawn(&func);
send(tid, 42);
prioritySend(tid, 42.0f);
}
上のコードを試すと”Received a float: 42”が出力されると思います.
一つ気をつけることは,通常のメッセージと違いこのメッセージはハンドラがないとPriorityMessageException
が飛ぶ所です.通常のsend
と違い,メッセージボックスに貯まるということはありません.
利用時に注意する所
D言語はデフォルトTLSなこともそうですが,データの共有に関してはかなり厳格にチェックし,Thread同士のIsolationを保つようにしています.そのため,std.concurrencyが提供するAPIでは,共有しても大丈夫なデータか,共有していると明示されているデータしかやりとり出来ません.
send(tid, 42); // intはコピーされるのでOK
send(tid, "42"); // stringの実体は immutable(char)[] で内部は書き換えられないのでOK
send(tid, [42]); // int[]は共有状態を作りかねないのでNG
これは実はspawn
にも当てはまっており,spawn
では普通のdelegate
やoCall
を持ったオブジェクトは起動することが出来ず,immutable
やshared
がついている必要があります.そうでないと,間違って状態を変更してしまう可能性があるからです.
制限を回避する
std.concurrencyのAPIは一貫しているのですが,実際コードを書いていくとmutableなデータのやりとりは普通に起きます.これに関して良い解は出ていなくて,今のところ悪魔のcast
を使うしかありません!
import std.stdio, std.concurrency;
void func()
{
receive((int i) { writeln("Received an int: ", i); },
(immutable int[] a) { writeln("Received a array: ", cast()a); });
}
void main()
{
auto tid = spawn(&func);
send(tid, cast(immutable)[1, 2]);
}
つらいですが,まぁ新しいコンテキストで生成したデータなどはsend
することで所有権が親にそのまま移ったりすることが多いので,今の所はこれで我慢するしかないです.
実装
コンテキストは単にスレッドです.HaskellやGoのようにsegmented stackやその派生を使っているわけではないので,メモリ効率の点では劣ります(Rustを例に挙げようと思いましたが,Rustは最近segmented stackをやめてしまいました - togetter).
その代わりあれやこれやがなくて実装はシンプルになっているので,仕方なく受け入れましょう.ついでに言うと,D言語がsegmented stackを採用する可能性は限りなく低いです.
まとめ
ということで,std.concurrencyを一通り説明してみました.これ自身はプリミティブなAPI群なので,使い方自身はシンプルな感じになっていると思います.
将来的にはネットワーク越しのパッシングも考えているようですが,標準にシリアライゼーションモジュールが今のところないので,当分先になるのではないかと思います.