Help us understand the problem. What is going on with this article?

Dive into std.concurrency

More than 5 years have passed since last update.

D言語 Advent Calendar 11日目の記事です.

std.concurrency

D言語にはErlangやGoのように組み込みで並行処理向けの関数/シンタックスはありません.その代わりstd.concurrencyを使うことで,メッセージパッシングスタイルの並行処理が出来るようになります.
多分日本ではTDPL以外であんまり記事がないので,簡単に使い方とかについて書きます.
対象バージョンは最新のdmd 2.064.2です.

ちなみに,単純に処理を並列化して結果を取得したい場合には,std.parallelismというのがあります.

使い方

std.concurrencyでは,並行に動く単位をコンテキストと呼びます.なので,これ以降もコンテキストと書きますが,実体としては関数だったりopCallを持つオブジェクトのことを指します.

コンテキストの起動

新しくコンテキストを作るにはspawnを使います.

import std.stdio, std.concurrency;

void func(string message)
{
    writeln(message);
}

void main()
{
    spawn(&func, "Hello!");
}

spawnの第一引数に新しいコンテキストで起動したいオブジェクト,それ移行の引数は新しいコンテキストで受け取る引数になります.これを走らせると,Hello!と表示されます.

新しいコンテキストとのやりとり

さっきの例だと起動時に値を渡してましたが,実際の利用ではむしろコンテキスト内でデータをやりとりするのがほとんどです.std.concurrencyでデータをやりとりするためにはsendreceive,receiveOnlyを使います.
Goとかとは違い明示的にchannelを渡すことはなく,spawnによって作られたコンテキストには最初からchannel(std.concurrencyではメッセージボックスと呼ばれる)が割り当てられます.

sendとreceiveOnly

起動したコンテキストに値を送るにはsendを使います.spawnすると新しいコンテキストを表すTid型のidが返ってくるので,それに対してsendを呼び出します.
新しいコンテキスト側では,値を受ける時にパターンが一つだけの場合receiveOnlyを使うことが出来ます.

import std.stdio, std.concurrency;

void func()
{
    auto num = receiveOnly!(int);
    writeln("Received: ", num);
}

void main()
{
    auto tid = spawn(&func);
    send(tid, 42); // UFCSがあるので tid.send(42); でも良い
}

同時に複数の値を送ることも出来,その時はstd.typecons.Tupleが返ってきます.

import std.stdio, std.concurrency;

void func()
{
    auto msg = receiveOnly!(int, string);
    writeln("Received number: ", msg[0]);
    writeln("Received message: ", msg[1]);
}

void main()
{
    auto tid = spawn(&func);
    send(tid, 42, "Hello!");
}

sendとreceive

receiveOnlyは使い方は簡単ですが,アプリケーションを書いて行くと,コンテキストとやりとりする型は複数になったりします.そのような場合には,複数の型のパターンを同時に扱うことが出来るreceiveを使います.
receiveは引数に複数のハンドラを登録し,そのハンドラの引数との型でマッチングを行うことで,複数のメッセージを処理します.

import std.stdio, std.concurrency;

void func()
{
    receive((int i) { writeln("Received an int: ", i); },
            (float f) { writeln("Received a float: ", f); });
}

void main()
{
    auto tid = spawn(&func);
    // 下のどっちでもfunc内で処理可能
    send(tid, 42);
    send(tid, 42.0f);
}

receiveはハンドラに存在しない型のメッセージを受け取ったら,何も処理をせずメッセージボックスに残したままにします.
これはこれでメッセージ処理に柔軟性をもたらしていて便利ですが,場合によっては捨てたいこともあるかもしれません.そういう時には末尾にstd.variant.Variant型のハンドラを登録し,すべての処理をそこに流すようにするのが一般的です.

import std.stdio, std.concurrency;

void func()
{
    bool running = true;

    while (running) {
        receive((int i) { writeln("Received an int: ", i); },
                (float f) { writeln("Received a float: ", f); },
                (Variant v) {
                    writeln("Received unknown type. Terminated...");
                    running = false;
                });
    }
}

void main()
{
    auto tid = spawn(&func);
    send(tid, 42);
    send(tid, 42.0f);
    send(tid, "Hello!");
}

receiveTimeout

今までは書きませんでしたが,receivereceiveOnlyはメッセージが届くまで,Goのchannelなどと同様にブロックします.receiveの最初のサンプルである

receive((int i) { writeln("Received an int: ", i); },
        (float f) { writeln("Received a float: ", f); });

は,誰かがこのコンテキストにsendをしない限り,止まるということです.
receiveTimeoutを使うと,ずっとブロックすることはなく,指定した時間で処理を中断します.

import std.stdio, std.concurrency;
import core.thread;

void func()
{
    receiveTimeout(dur!"seconds"(1),
                   (int i) { writeln("Received an int: ", i); },
                   (float f) { writeln("Received a float: ", f); });
    writeln("1 seconds passed");
}

void main()
{
    auto tid = spawn(&func);
    send(tid, "string");
    Thread.sleep(dur!"seconds"(2));
}

1秒経ったら”1 seconds passed”が表示されます.

他のコンテキストとのやりとり

今までは新しいコンテキストとのやりとりの例でしたが,他のコンテキストとももちろん同様のことが出来ます.使うものは変わらずsendreceivereceiveOnlyの3つです.

親コンテキストへの送信

コンテキスト内では自分のTidを取得するthisTidと,spawnをした親のTidを取得するownerTidの二つのプロパティがあります.このownerTidを使うことで,親コンテキストにメッセージを送信出来ます.

import std.stdio, std.concurrency;

void func()
{
    auto msg = receiveOnly!(string);
    send(ownerTid, "pong");
}

void main()
{
    auto tid = spawn(&func);
    send(tid, "ping");
    writeln("Received message: ", receiveOnly!(string));
}

新しいコンテキスト同士の送信

これには直接Tidを渡す方法と,Tidレジストリを経由する二つの方法があります.直接Tidを渡す方法ですが,単にspawnの引数で渡すかsendで送るかという,すでに説明した方法になります.

import std.stdio, std.concurrency;

void func1()
{
    auto tid = receiveOnly!(Tid);
    send(tid, "pong");
}

void func2(Tid tid1)
{
    send(tid1, ownerTid);
}

void main()
{
    auto tid1 = spawn(&func1);
    auto tid2 = spawn(&func2, tid1);
    writeln("Received message: ", receiveOnly!(string));
}

Tidレジストリは,あるTidに対して名前をつけることが出来るregister,名前を指定してTidを取得するlocateの二つの関数を使って,動的にTidをやりとりする方法です.

import std.stdio, std.concurrency;

void func1()
{
    register("func1", thisTid);
    auto tid = receiveOnly!(Tid);
    send(tid, "pong");
}

void func2()
{
    auto func1 = locate("func1");
    auto owner = locate("owner");
    send(func1, owner);
}

void main()
{
    register("owner", thisTid);
    auto tid1 = spawn(&func1);
    auto tid2 = spawn(&func2);
    writeln("Received message: ", receiveOnly!(string));
}

基本的には最初の引数で渡す方法で十分ではないかと思います.

メッセージボックスの設定

各コンテキストが持っているメッセージボックスに対して,サイズや挙動をsetMaxMailboxSize関数を使って設定することが出来ます.シグネチャは以下のようになっています.

void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);

messagesがメッセージボックスの大きさで,0を設定すると制限がなくなります.OnCrowding doThisは,メッセージボックスが満杯になった時の挙動を指定します.

  • OnCrowding.block

メッセージボックスに空きが出来るまで,sendをブロック

  • OnCrowding.ignore

メッセージを捨てる

  • OnCrowding.throwException

MailboxFull例外を投げる

以上の3つがデフォルトで利用出来るので,アプリケーションの要件に合わせて適宜指定してください.

独自ハンドラの指定

setMaxMailBoxSizeを使って独自ハンドラを指定することも出来ます.

void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis);

シグネチャはこの通りなので,bool function(Tid)な関数を渡してあげることで,メッセージボックス満杯時の挙動をカスタマイズ出来ます.Tidはメッセージボックスを持ってるコンテキストになります.

エラー処理

今まで言及してこなかったエラーハンドリング周りについて,ここでまとめて説明します.

MessageMismatch

今までの例を見て,std.concurrencyのメッセージボックスにはGoのようにchannelそのものに型がついていないことに気づくと思います.ここで,receiveOnlyに型が違うメッセージを送ったらどうなるのかというと,MessageMismatch例外が飛びます.

// receiveOnlyの最初の例を以下のように置き換えてみる
void func()
{
    try {
        auto num = receiveOnly!(string);
        writeln("Received: ", num);
    } catch (MessageMismatch e) {
        writeln(e.msg);
    }
}

上の例を動かすとUnexpected message type: expected 'string', got 'int'というメッセージが表示されるようになります.receiveはすでにメッセージボックスに残ると説明してあるので,この例外が飛ぶのはreceiveOnlyのみです.

OwnerTerminated

新しいコンテキストは別スレッドで動いているので,場合によっては親が先に終了する時があります.この時には,receiveからOwnerTerminated例外が投げられます.適宜catchしましょう.

void func()
{
    try {
        receive((int i) { writeln("Received an int: ", i); },
                (float f) { writeln("Received a float: ", f); });
    } catch (OwnerTerminated e) {
        writeln("Owner dead");
    }
}

また,この例外は実は通常のメッセージと同じように扱われていて,ハンドラとして処理することも出来ます.

void func()
{
    receive((int i) { writeln("Received an int: ", i); },
            (float f) { writeln("Received a float: ", f); },
            (OwnerTerminated e) { writeln("Owner dead"); });
}

spawnLinked

通常,親は子となる新しいコンテキストが死んだかどうかというのには関知しません.そのため,すでに起動したコンテキストが終了しているのにreceiveしてもずっとブロックするだけです.
終了した段階で何か通知が来てくれると,各コンテキストの管理がしやすくなります.std.concurrencyにはその用途のために,spawnLinkedというspawnの派生があります.

import std.stdio, std.concurrency;

void func()
{
    auto msg = receiveOnly!(string);
    send(ownerTid, "pong");
}

void main()
{
    auto tid = spawnLinked(&func);
    send(tid, "ping");
    writeln("Received message: ", receiveOnly!(string));
    writeln("Received message again: ", receiveOnly!(string));
}

この例では,2回目のreceiveOnlyでLinkTerminated例外が投げられます.試しにspawnLinkedspawnにしてみると,処理がブロックするようになります.コンテキストがチェインになったりする場合には,spawnLinkedを使ってリンクを張り,終了をチェインさせて処理していくのがよくあるパターンだと思います.

優先メッセージ

特別な状況に陥った場合など,あるメッセージをすでにsendしたメッセージよりも優先的に送りたい時があります.そのような場合にはprioritySendが使えます.これを使うと,通常のメッセージボックスにどれだけメッセージが貯まっていても,prioritySendを使ったメッセージが先に処理されます.

import std.stdio, std.concurrency;

void func()
{
    receive((int i) { writeln("Received an int: ", i); },
            (float f) { writeln("Received a float: ", f); });
}

void main()
{
    auto tid = spawn(&func);
    send(tid, 42);
    prioritySend(tid, 42.0f);
}

上のコードを試すと”Received a float: 42”が出力されると思います.
一つ気をつけることは,通常のメッセージと違いこのメッセージはハンドラがないとPriorityMessageExceptionが飛ぶ所です.通常のsendと違い,メッセージボックスに貯まるということはありません.

利用時に注意する所

D言語はデフォルトTLSなこともそうですが,データの共有に関してはかなり厳格にチェックし,Thread同士のIsolationを保つようにしています.そのため,std.concurrencyが提供するAPIでは,共有しても大丈夫なデータか,共有していると明示されているデータしかやりとり出来ません.

send(tid, 42);   // intはコピーされるのでOK
send(tid, "42"); // stringの実体は immutable(char)[] で内部は書き換えられないのでOK
send(tid, [42]); // int[]は共有状態を作りかねないのでNG

これは実はspawnにも当てはまっており,spawnでは普通のdelegateoCallを持ったオブジェクトは起動することが出来ず,immutablesharedがついている必要があります.そうでないと,間違って状態を変更してしまう可能性があるからです.

制限を回避する

std.concurrencyのAPIは一貫しているのですが,実際コードを書いていくとmutableなデータのやりとりは普通に起きます.これに関して良い解は出ていなくて,今のところ悪魔のcastを使うしかありません!

import std.stdio, std.concurrency;

void func()
{
    receive((int i) { writeln("Received an int: ", i); },
            (immutable int[] a) { writeln("Received a array: ", cast()a); });
}

void main()
{
    auto tid = spawn(&func);
    send(tid, cast(immutable)[1, 2]);
}

つらいですが,まぁ新しいコンテキストで生成したデータなどはsendすることで所有権が親にそのまま移ったりすることが多いので,今の所はこれで我慢するしかないです.

実装

コンテキストは単にスレッドです.HaskellやGoのようにsegmented stackやその派生を使っているわけではないので,メモリ効率の点では劣ります(Rustを例に挙げようと思いましたが,Rustは最近segmented stackをやめてしまいました - togetter).

その代わりあれやこれやがなくて実装はシンプルになっているので,仕方なく受け入れましょう.ついでに言うと,D言語がsegmented stackを採用する可能性は限りなく低いです.

まとめ

ということで,std.concurrencyを一通り説明してみました.これ自身はプリミティブなAPI群なので,使い方自身はシンプルな感じになっていると思います.
将来的にはネットワーク越しのパッシングも考えているようですが,標準にシリアライゼーションモジュールが今のところないので,当分先になるのではないかと思います.

treasuredata
Customer Data Platformの開発・提供をしています。
https://www.treasuredata.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away