LoginSignup
4
4

More than 5 years have passed since last update.

D言語で始めるアクターモデル

Last updated at Posted at 2016-03-24

ごめんなさい、ちょっとしたタイトル詐欺です
実際の内容は「ちょっとD言語のstd.concurrency使ってErlangチックでアクターモデルなコード書いてみよう」程度のものです。
アクターモデルとは何なのか、とかその利点・欠点とか是非とかには全く踏み込みませんので悪しからず。

本当は「D言語で始めるリアクティブプログラミング」にしようかとも思ったのですが、怖かったので……。
後生ですから マサカリの背の部分でぐちゃぐちゃに叩き潰すのだけは勘弁して下さい お願いします、何でもしますから!

std.concurrency

D言語の標準ライブラリであるPhobosにはstd.concurrencyというメッセージパッシングな並行処理ライブラリが入っています。使い方については既に詳しく書かれた記事があります。

Dive into std.concurrency @repeatedly さん
Dive in the deep std.concurrency @mono_shoo さん

このライブラリを使ってちょっとしたアプリを作ってみましょう。

すごいD言語をおもしろおかしく学ぼう

『すごいErlangゆかいに学ぼう』には「子猫を貸し出してくれるサーバ」を作る例が出てきます。
これに倣って、「D言語くんを貸し出してくれるサーバ」を作ってみても良いのですが、D言語くんは1体ずつしか増えないらしいので、数が確保できず採算が取れないと思われます。
ですのでD言語くんサーバは諦めて、コマンドラインベースのBotっぽいものを作ってみましょう。

Bot

起動して、コマンドラインから何か入力すると、それに応じて反応を返してくれるBotを作ってみます(それってBot? という質問はとりあえず無しで)。
以下の様な命令を用意しましょう。

command description
ping pongを表示
echo 文字列 文字列を表示
set 文字列1 文字列2 文字列1を入力すると文字列2を表示
delete 文字列 setで登録した文字列命令を消す
counter start 数値 数値から0になるまで毎秒カウントダウン
counter stop counter命令のカウントダウンを中断
timer 数値 文字列 数値秒後に文字列を表示
exit quit 終了する

……もうちょっと少ない機能から始めればよかった。

設計

アクターモデルな並行処理プログラミングという事で、機能をアクターに分割して設計します。
以下の様な構造にしましょう。

main -+- io -+- reader
      |      +- writer
      |
      +- admin -+- worker
                +- worker
                :

入出力の取り扱いは、

入力 => io -> main -> admin -> worker
woker -> io => 出力

というメッセージ送信で行う事とします。

用語

std.concurrencyにてspawn関数で作成した新しい処理単位は コンテクスト と呼ぶらしいですが、この記事の中では大体がアクターモデルでいうアクターに相当するため、そのまま アクター と呼びます。

あと、一応記事内で「並列」と「並行」とは意識して使い分けております。

コード

では書いていきます。

main

app.d
module ohm.app;

import std.stdio, std.concurrency, std.algorithm.searching, std.algorithm.iteration;

import ohm.actors.io, ohm.actors.admin;

void main()
{
    auto ioHolder = spawn(&ioHolder);
    auto actorsAdmin = spawn(&actorsAdmin, ioHolder);

    for(auto loop = true; loop;)
        receive(
            (ReadMessage message) {
                with(message)
                {
                    if(msg == "exit" || msg == "quit")
                        loop = false;
                    else
                    {
                        ioHolder.send(thisTid, READCONTINUE);
                        actorsAdmin.send(message);
                    }
                }
            }
        );

    [ioHolder, actorsAdmin].each!(h => h.prioritySend(thisTid, TERMINATE));
}

struct Terminate {}
enum TERMINATE = Terminate();

main関数は終了メッセージのexitquitが入力されるまでループを続け、その終了メッセージ以外のメッセージは全てactorsAdminアクターに流していきます。
終了時には子アクターに終了命令である空の構造体Terminateを送信します。

親アクター終了時には、子アクターのreceiveメソッドからownerTerminated例外が投げられるので、これをハンドリングして終了をコントロールするという手もありますが、今回は独自の専用構造体を作って終了のサインとしています。

prioritySend関数はsend関数と同等の機能を持ちますが、メッセージを待ち行列の先頭に割り込ませ、優先的に処理させます。終了命令は真っ先に処理して欲しいので(アプリケーションが終了するなら終了用の為以外に処理を行う必要は無いですからね)この関数を使用しています。

IO

io.d
module ohm.actors.io;

import std.stdio, std.string, std.concurrency, std.algorithm.searching, std.algorithm.iteration, std.variant;

import ohm.app;

void ioHolder()
{
    auto writer = spawn(&ioWriter);
    auto reader = spawn(&ioReader);

    for(auto loop = true; loop;)
        receive(
            (WritingMessage msg) { writer.send(msg); },
            (ReadMessage msg) { ownerTid.send(msg); },
            (Tid tid, ReadContinue rc) { if(tid == ownerTid) reader.send(thisTid, rc); },
            (Tid tid, Terminate terminate) { if(tid == ownerTid) loop = false; }
        );

    [writer, reader].each!(c => c.prioritySend(thisTid, TERMINATE));
}

void ioWriter()
{
    for(auto loop = true; loop;)
        receive(
            (WritingMessage msg) { writeln(msg.msg); },
            (Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; }
        );
}

void ioReader()
{
    for(auto loop = true; loop;)
    {
        ownerTid.send(new ReadMessage(readln.chomp));
        for(auto innerLoop = true; innerLoop;)
            receive(
                (Tid tid, ReadContinue _r) { if(tid == ownerTid) innerLoop = false; },
                (Tid tid, Terminate _t) { if(tid == ownerTid) loop = innerLoop = false; },
                (Variant _any) {}
            );
    }
}

immutable abstract
class IOMessage
{
    string msg;

    this(string msg)
    {
        this.msg = msg;
    }
}

template Constructors()
{
    this(string msg = "")
    {
        super(msg);
    }
}

immutable
class MutableWritingMessage : IOMessage
{
    mixin Constructors;
}
alias WritingMessage = immutable MutableWritingMessage;

immutable
class MutableReadMessage : IOMessage
{
    mixin Constructors;
}
alias ReadMessage = immutable MutableReadMessage;

struct ReadContinue {}
enum READCONTINUE = ReadContinue();

IOを取り扱うアクターは、子アクターとしてioWriterとioReaderを持ちます。入出力メッセージはimmutable classに格納してメッセージとして送られる事になります。

ioWriterは受け取った文字列を表示するだけですが、ioReaderはreadln関数がブロックしてしまう為ちょっと厄介です。終了命令を受け取るにはreceive関数で待ち受ける必要がありますが、readln関数での待ち受けと同時に行う事はできないからです。
ですので、読み込んだ後、読み込み継続命令を受け取ってから次の行の読み込みを行うようにします。終了する場合は、この読み込み継続命令を受け取らず終了命令を受け取る事になります。

見ての通り、アクター同士のメッセージのやり取りにはimmutable classが非常に便利です。今回は単純に文字列をラップしているだけですが、送信する値が増えても、immutable classならポインタ1つ分のサイズでメッセージを送る事ができます(実はsendで送れるメッセージには大きさの制限があるのです……)。
また中の値を変更しない事が保証されていますので、他のスレッドで値が変更されて意図しない挙動になるような再現しにくいバグを防ぐ事にもなります。

ただ、敢えて値を変更したい事もあるでしょうから、そういう場合はshared classを使う事になります。今回はそこまで複雑な事を行わない為、全てのメッセージは単純な値かimmutable classとして送っています。

Admin

admin.d
module ohm.actors.admin;

import
    std.concurrency, std.array, std.regex, std.variant,
    std.typecons, std.algorithm.searching, std.algorithm.iteration;

import ohm.app, ohm.actors.io, ohm.actors.workers;

enum keywords = "exit quit echo ping set delete counter timer".split(" ");

enum setState = ctRegex!r"^set (\w+) (.+)$";
enum deleteState = ctRegex!r"^delete (\w+)$";
enum echoState = ctRegex!r"^echo (.+)$";
enum timerState = ctRegex!r"^timer (.+)$";
enum counterState = ctRegex!r"^counter (.+)$";
enum commandState = ctRegex!r"^\w+$";

alias Capt = Captures!(string, size_t);
alias t = tuple;

void actorsAdmin(Tid ioHolder)
{
    auto workers = [
        "ping": spawn(&pingPong, ioHolder),
        "echo": spawn(&echoWorker, ioHolder),
        "counter": spawn(&counterHolder, ioHolder),
        "timer": spawn(&timerHolder, ioHolder)
    ];

    void receiveMessage(string msg)
    {
        void deleteIfExist(string com)
        {
            auto tid = (com in workers);
            if(tid !is null)
            {
                workers.remove(com);
                (*tid).prioritySend(thisTid, TERMINATE);
            }
        }

        [
            t(setState, (Capt c) {
                auto com = c[1];
                if(keywords.any!(e => e == com)) return;

                deleteIfExist(com);
                workers[com] = spawn(&speaker, ioHolder, c[2]);
            }),
            t(deleteState, (Capt c) { deleteIfExist(c[1]); }),
            t(echoState, (Capt c) { workers["echo"].send(new WorkersArgument(c[1])); }),
            t(counterState, (Capt c) { workers["counter"].send(new ReadMessage(c[1])); }),
            t(timerState, (Capt c) { workers["timer"].send(new ReadMessage(c[1])); }),
            t(commandState, (Capt c) {
                auto com = c.hit;
                auto tid = (com in workers);
                if(tid !is null) (*tid).send(thisTid, RUNCOMMAND);
            })
        ].each!((tup) {
            if(auto cap = msg.matchFirst(tup[0]))
            {
                tup[1](cap);
                return;
            }
        });
    }

    for(auto loop = true; loop;)
        receive(
            (ReadMessage message) { receiveMessage(message.msg); },
            (Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; }
        );

    workers.values.each!(c => c.prioritySend(thisTid, TERMINATE));
}

struct RunCommand {}
enum RUNCOMMAND = RunCommand();

immutable
class MutableWorkersArgument
{
    string arg;

    this(string arg) immutable
    {
        this.arg = arg;
    }
}
alias WorkersArgument = immutable MutableWorkersArgument;

一番ややこしい部分ですね。
先頭の部分でset命令の際に無視する予約後と、入力をパーズする為の正規表現を定義しています。
入力がReadMessageとして送られてくると、適切な子アクターにメッセージを送って処理を委譲しています。

概ね見ての通りの処理ですが、set命令だけちょっと見てみましょう。

t(setState, (Capt c) {
    auto com = c[1];
    if(keywords.any!(e => e == com)) return;

    deleteIfExist(com);
    workers[com] = spawn(&speaker, ioHolder, c[2]);
}),

set 文字列1 文字列2を受け取ると、workers連想配列に文字列1をキーとして新しいアクターを登録します。アクターはこのように動的に作る事もできます。

次に、adminの子アクター達を見ていきましょう。

ping

pingpong.d
module ohm.actors.workers.pingpong;

import std.concurrency, std.variant;

import ohm.app, ohm.actors.io, ohm.actors.admin;

void pingPong(Tid ioHolder)
{
    for(auto loop = true; loop;)
        receive(
            (Tid tid, RunCommand _rc) { if(tid == ownerTid) ioHolder.send(new WritingMessage("pong!")); },
            (Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
            (Variant _any) {}
        );
}

pingを受け取ったらpongを返すだけです。
receive関数の最後の(Variant _any) {}で、どれにもマッチしなかったメッセージを切り捨てています。メッセージを受け取る箱(メッセージボックスとかメールボックスとか呼ばれます)には容量の上限があるのです。勿論、このメッセージボックスが満杯になった時の挙動もコントロールする事ができます。

echo

echo.d
module ohm.actors.workers.echo;

import std.concurrency, std.variant;

import ohm.app, ohm.actors.io, ohm.actors.admin;

void echoWorker(Tid ioHolder)
{
    for(auto loop = true; loop;)
        receive(
            (WorkersArgument arg) { ioHolder.send(new WritingMessage(arg.arg)); },
            (Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
            (Variant _any) {}
        );
}

こちらも単純で、受け取った文字列をIOに投げているだけです。

speaker

speaker.d
module ohm.actors.workers.speaker;

import std.concurrency, std.variant;

import ohm.app, ohm.actors.admin, ohm.actors.io;

void speaker(Tid ioHolder, string msg) {
    for(auto loop = true; loop;)
        receive(
            (Tid tid, RunCommand _rc) { if(tid == ownerTid) ioHolder.send(new WritingMessage(msg)); },
            (Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
            (Variant _any) {}
        );
}

これはset 文字列1 文字列2命令を登録する為の関数です。書き出す文字列2は引数として受け取り、保持しておきます。
RunCommandという命令が親から来たら、引数として保存してある文字列をIOに投げるだけの処理を持ちます。

counter

counter.d
module ohm.actors.workers.counter;

import std.concurrency, std.variant, std.conv, std.regex;
import core.time : dur;

import ohm.app, ohm.actors.io, ohm.actors.admin;

enum startReg = ctRegex!r"^start (\d+)$";
enum stopReg = ctRegex!r"^stop$";

void counterHolder(Tid ioHolder)
{
    bool hasCounter;
    Tid counter;

    void stopIfExist()
    {
        if(hasCounter) counter.prioritySend(thisTid, TERMINATE);
        hasCounter = false;
    }

    for(auto loop = true; loop;)
        receive(
            (ReadMessage rm) {
                with(rm)
                {
                    if(auto cap = msg.matchFirst(startReg))
                    {
                        stopIfExist();
                        counter = spawnLinked(&countWorker, ioHolder, cap[1].to!int);
                        hasCounter = true;
                    }
                    else if(msg.matchFirst(stopReg))
                        stopIfExist();
                }
            },
            (Tid tid, Terminate _t) {
                if(tid == ownerTid)
                {
                    stopIfExist();
                    loop = false;
                }
            },
            (LinkTerminated _lt) { hasCounter = false; },
            (Variant _any) {}
        );
}

void countWorker(Tid ioHolder, int cnt)
{
    while(cnt-- > 0)
    {
        ioHolder.send(new WritingMessage(cnt.to!string));
        receiveTimeout(
            1.dur!"seconds",
            (Tid tid, Terminate _t) { if(tid == ownerTid) cnt = 0; }
        );
    }
}

カウントダウンを行うアクターです。2つの関数から成ります。1つはカウンターを保持するアクターで、もう1つが実際にカウントダウンを行うアクターです。

countWorkerを作成する時、spawnLinkedなる関数を使っていますが、この関数でアクターを新規作成すると、終了した際に親アクターのreceive関数にLinkTerminatedを投げるようになります。
counterHolderは今カウンターが動いているかいないかの情報を保持していますから、カウントダウンが終了した場合、それを通知してもらわないといけません。今回は、LinkTerminatedをハンドルする事で状態の管理を行っています。

また、countWorker内でreceiveTimeoutを使っています。これはreceive関数と同等の機能を持ちますが、1つ目の引数に与えた時間だけ待ち受けし、時間が来たら待つのをやめて次の処理に進みます。
この機能を使って1秒単位でカウントダウンしつつ、途中で終了命令が来た場合はカウントダウンを放棄して終了する仕組みを作っています。

timer

timer.d
module ohm.actors.workers.timer;

import
    std.concurrency, std.variant, std.regex,
    std.conv, std.algorithm.iteration;
import core.time;

import ohm.app, ohm.actors.io, ohm.actors.admin;

enum counterReg = ctRegex!r"^(\d+) (.+)$";

void timerHolder(Tid ioHolder)
{
    Tid[Timer] table;

    for(auto loop = true; loop;)
        receive(
            (ReadMessage rm) {
                with(rm)
                {
                    if(auto cap = msg.matchFirst(counterReg))
                    {
                        auto timer = new Timer(cap[1].to!int, cap[2]);
                        table[timer] = spawn(timer, ioHolder);
                    }
                }
            },
            (Timer timer) {
                if((timer in table) !is null)
                {
                    table[timer].send(thisTid, TERMINATE);
                    table.remove(timer);
                }
            },
            (Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
            (Variant _any) {}
        );

    table.values.each!(c => c.prioritySend(thisTid, TERMINATE));
}

immutable
class MutableTimer
{
    long len;
    string msg;

    this(long len, string msg) immutable
    {
        this.len = len;
        this.msg = msg;
    }

    void opCall(Tid ioHolder) immutable
    {
        receiveTimeout(
            len.dur!"seconds",
            (Tid tid, Terminate _t) { if(tid == ownerTid) return; }
        );
        ioHolder.send(new WritingMessage(msg));
        ownerTid.send(this);
    }
}
alias Timer = immutable MutableTimer;

指定された時間だけ待ってからメッセージを表示するアクターです。
この機能も、アクターを保持するアクターと、実際にタイマー機能を提供するアクターから成ります。
今回、タイマー機能を持つアクターは関数ではなくオブジェクトを用いて作成しました。spawnに与えられるのは関数ポインタだけではなく、opCallメソッドを持つ構造体やクラスのインスタンス等でも渡す事ができます。

timerHolderTid[Timer]型の連想配列でタイマー一覧を管理していますので、Timerが終了した時は、その事実をtimerHolderに伝えなければなりません。ここでは、自分自身を送る事で終了を表現しています。Timerクラス自体もimmutableなのでメッセージとして送る事ができます。

指定時間分だけ待つ機能には、カウンターと同様にreceiveTimeoutを利用しています。

動作確認

動かしてみましょう!

ping
pong!

echo Hello, D
Hello, D

set hey ho
hey
ho

set hey yo
hey
yo

set Dman cool!!!!
Dman
cool!!!!

delete Dman
Dman

いい感じですね。
(どれが入力でどれが出力か判り難いですが……)。

カウンターやタイマーはどうでしょうか。

counter start 5
4
3
2
1
0

timer 180 have it!
(3分後)
have it!

timer 5 go!     
counter start 10
9
8
7
6
5
go!
4
3
2
1
0

ちゃんと動いているようです!

注目していただきたいのはカウンターとタイマーが同時に機能している点です。ちゃんと並行処理ができています!

展望

まぁこれだけだと手元で動かして面白いで終わってしまうのですが、もう少し今後の事を考えてみましょう。
今回のアプリでは、IO部分をその他ときっちり分離し、「入力はIOからappへReadMessageとして飛んで来る」「出力はIOへWritingMessageに包んで送る」「終了する時はTerminate構造体を送る」という形に抽象化しました。現在の実装では標準入出力をIOとして利用していますが、上の形式に則ったものなら、何をIOとしても良いわけです。

これは何を意味するでしょうか。

例えばWebSocketと繋いでWebアプリとして稼働させるとか、またはチャットに繋いでチャットBotにするなどといった派生が、このアーキテクチャのまま作れるという事です。しかもやる事は、IOの部分を入れ替えるだけです!

アクターモデルで並行処理を行う利点は、このように疎結合で再利用性の高いプログラミングが自ずからできてしまうという点です。メッセージパッシングという手段でアクターとアクターとのデータをやり取りする以上、必然的に共有するデータは最小となり、またやり取りするデータも可能な限りコンパクトなものとなって、お互いへの依存が少なくなります。
(まぁ現実世界ではニッチでピーキーな処理もたくさん必要なので、そう上手くはいかないでしょうけど……)。

感想

D言語の軽量スレッドは言語組み込みの機能というわけではないですが、標準ライブラリに綺麗な感じで入っているので、上のように簡単に使う事ができます。所謂Erlang風に使えるので戸惑う点も少なく良いです(!演算子は使えませんが……)。

ただErlang風とはいっても、D言語のspawnは標準では単に新しいスレッドを立てて処理を任せるだけですので、Erlangくらい気軽に新しいコンテクストを作るのは避けた方が良いかもしれません。軽量スレッドらしくすぽんすぽんとspawnしたいなら自作のSchedulerを登録してよしなに並行処理をコントロールしてやる必要があるでしょう。

単に並列処理がしたいだけでしたらstd.parallelismがありますし、コルーチンを使って並行処理がしたければFiberもあります。既存の処理の一部をちょっと並行処理・並列処理したいな、という程度であれば、これらのライブラリを利用した方が楽に書けると思います。

std.concurrencyを使って並行・並列処理を行う場合、明確にアクターモデルを意識して、利点を活かすような書き方を心掛けると良いのではないかと思います。上で述べたような疎結合性は当然として、流行りの言い方をすると「スケーラブルでノンブロッキングでイベントドリブンでフォルトトレラントな設計」にしましょう。
(今回私が書いたコードはフォルトトレランスがかなりアレですね)。

あと、immutable classが超便利です。Scalaにおけるcase class的な使い方ができるので、メッセージとして最適です。しかもcase classよりも強力な不変の保証があるので、取り扱いも安全に行えるというおまけ付き。

D言語がこうだったらいいなぁ

D言語大好きですが「これがあったらもっと使いやすいのに」な部分も出てきますね。
こんなこといいな、できたらいいな。

  • D言語にもパターンマッチが欲しい、としみじみ思います……
  • あとタプルリテラルも欲しい(alias t = tuple;でかなりそれっぽくはなりましたが)
  • immutableってちょっと長すぎる
  • だんだんセミコロン置くのが面倒になってきた……
4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4