ごめんなさい、ちょっとしたタイトル詐欺です
実際の内容は「ちょっとD言語のstd.concurrency
使ってErlangチックでアクターモデルなコード書いてみよう」程度のものです。
アクターモデルとは何なのか、とかその利点・欠点とか是非とかには全く踏み込みませんので悪しからず。
本当は「D言語で始めるリアクティブプログラミング」にしようかとも思ったのですが、怖かったので……。
後生ですから マサカリの背の部分でぐちゃぐちゃに叩き潰すのだけは勘弁して下さい お願いします、何でもしますから!
std.concurrency
D言語の標準ライブラリであるPhobosにはstd.concurrency
というメッセージパッシングな並行処理ライブラリが入っています。使い方については既に詳しく書かれた記事があります。
Dive into std.concurrency @repeatedly さん
Dive in the deep std.concurrency @mono_shoo さん
このライブラリを使ってちょっとしたアプリを作ってみましょう。
すごいD言語をおもしろおかしく学ぼう
『すごいErlangゆかいに学ぼう』には「子猫を貸し出してくれるサーバ」を作る例が出てきます。
これに倣って、「D言語くんを貸し出してくれるサーバ」を作ってみても良いのですが、D言語くんは1体ずつしか増えないらしいので、数が確保できず採算が取れないと思われます。
ですのでD言語くんサーバは諦めて、コマンドラインベースのBotっぽいものを作ってみましょう。
Bot
起動して、コマンドラインから何か入力すると、それに応じて反応を返してくれるBotを作ってみます(それってBot? という質問はとりあえず無しで)。
以下の様な命令を用意しましょう。
command | description |
---|---|
ping |
pong を表示 |
echo 文字列 |
文字列 を表示 |
set 文字列1 文字列2 |
文字列1 を入力すると文字列2 を表示 |
delete 文字列 |
set で登録した文字列 命令を消す |
counter start 数値 |
数値 から0になるまで毎秒カウントダウン |
counter stop |
counter 命令のカウントダウンを中断 |
timer 数値 文字列 |
数値 秒後に文字列 を表示 |
exit quit
|
終了する |
……もうちょっと少ない機能から始めればよかった。
設計
アクターモデルな並行処理プログラミングという事で、機能をアクターに分割して設計します。
以下の様な構造にしましょう。
main -+- io -+- reader
| +- writer
|
+- admin -+- worker
+- worker
:
入出力の取り扱いは、
入力 => io -> main -> admin -> worker
woker -> io => 出力
というメッセージ送信で行う事とします。
用語
std.concurrency
にてspawn
関数で作成した新しい処理単位は コンテクスト と呼ぶらしいですが、この記事の中では大体がアクターモデルでいうアクターに相当するため、そのまま アクター と呼びます。
あと、一応記事内で「並列」と「並行」とは意識して使い分けております。
コード
では書いていきます。
main
module ohm.app;
import std.stdio, std.concurrency, std.algorithm.searching, std.algorithm.iteration;
import ohm.actors.io, ohm.actors.admin;
void main()
{
auto ioHolder = spawn(&ioHolder);
auto actorsAdmin = spawn(&actorsAdmin, ioHolder);
for(auto loop = true; loop;)
receive(
(ReadMessage message) {
with(message)
{
if(msg == "exit" || msg == "quit")
loop = false;
else
{
ioHolder.send(thisTid, READCONTINUE);
actorsAdmin.send(message);
}
}
}
);
[ioHolder, actorsAdmin].each!(h => h.prioritySend(thisTid, TERMINATE));
}
struct Terminate {}
enum TERMINATE = Terminate();
main関数は終了メッセージのexit
かquit
が入力されるまでループを続け、その終了メッセージ以外のメッセージは全てactorsAdminアクターに流していきます。
終了時には子アクターに終了命令である空の構造体Terminate
を送信します。
親アクター終了時には、子アクターのreceive
メソッドからownerTerminated
例外が投げられるので、これをハンドリングして終了をコントロールするという手もありますが、今回は独自の専用構造体を作って終了のサインとしています。
prioritySend
関数はsend
関数と同等の機能を持ちますが、メッセージを待ち行列の先頭に割り込ませ、優先的に処理させます。終了命令は真っ先に処理して欲しいので(アプリケーションが終了するなら終了用の為以外に処理を行う必要は無いですからね)この関数を使用しています。
IO
module ohm.actors.io;
import std.stdio, std.string, std.concurrency, std.algorithm.searching, std.algorithm.iteration, std.variant;
import ohm.app;
void ioHolder()
{
auto writer = spawn(&ioWriter);
auto reader = spawn(&ioReader);
for(auto loop = true; loop;)
receive(
(WritingMessage msg) { writer.send(msg); },
(ReadMessage msg) { ownerTid.send(msg); },
(Tid tid, ReadContinue rc) { if(tid == ownerTid) reader.send(thisTid, rc); },
(Tid tid, Terminate terminate) { if(tid == ownerTid) loop = false; }
);
[writer, reader].each!(c => c.prioritySend(thisTid, TERMINATE));
}
void ioWriter()
{
for(auto loop = true; loop;)
receive(
(WritingMessage msg) { writeln(msg.msg); },
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; }
);
}
void ioReader()
{
for(auto loop = true; loop;)
{
ownerTid.send(new ReadMessage(readln.chomp));
for(auto innerLoop = true; innerLoop;)
receive(
(Tid tid, ReadContinue _r) { if(tid == ownerTid) innerLoop = false; },
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = innerLoop = false; },
(Variant _any) {}
);
}
}
immutable abstract
class IOMessage
{
string msg;
this(string msg)
{
this.msg = msg;
}
}
template Constructors()
{
this(string msg = "")
{
super(msg);
}
}
immutable
class MutableWritingMessage : IOMessage
{
mixin Constructors;
}
alias WritingMessage = immutable MutableWritingMessage;
immutable
class MutableReadMessage : IOMessage
{
mixin Constructors;
}
alias ReadMessage = immutable MutableReadMessage;
struct ReadContinue {}
enum READCONTINUE = ReadContinue();
IOを取り扱うアクターは、子アクターとしてioWriterとioReaderを持ちます。入出力メッセージはimmutable classに格納してメッセージとして送られる事になります。
ioWriterは受け取った文字列を表示するだけですが、ioReaderはreadln
関数がブロックしてしまう為ちょっと厄介です。終了命令を受け取るにはreceive
関数で待ち受ける必要がありますが、readln
関数での待ち受けと同時に行う事はできないからです。
ですので、読み込んだ後、読み込み継続命令を受け取ってから次の行の読み込みを行うようにします。終了する場合は、この読み込み継続命令を受け取らず終了命令を受け取る事になります。
見ての通り、アクター同士のメッセージのやり取りにはimmutable class
が非常に便利です。今回は単純に文字列をラップしているだけですが、送信する値が増えても、immutable class
ならポインタ1つ分のサイズでメッセージを送る事ができます(実はsend
で送れるメッセージには大きさの制限があるのです……)。
また中の値を変更しない事が保証されていますので、他のスレッドで値が変更されて意図しない挙動になるような再現しにくいバグを防ぐ事にもなります。
ただ、敢えて値を変更したい事もあるでしょうから、そういう場合はshared class
を使う事になります。今回はそこまで複雑な事を行わない為、全てのメッセージは単純な値かimmutable class
として送っています。
Admin
module ohm.actors.admin;
import
std.concurrency, std.array, std.regex, std.variant,
std.typecons, std.algorithm.searching, std.algorithm.iteration;
import ohm.app, ohm.actors.io, ohm.actors.workers;
enum keywords = "exit quit echo ping set delete counter timer".split(" ");
enum setState = ctRegex!r"^set (\w+) (.+)$";
enum deleteState = ctRegex!r"^delete (\w+)$";
enum echoState = ctRegex!r"^echo (.+)$";
enum timerState = ctRegex!r"^timer (.+)$";
enum counterState = ctRegex!r"^counter (.+)$";
enum commandState = ctRegex!r"^\w+$";
alias Capt = Captures!(string, size_t);
alias t = tuple;
void actorsAdmin(Tid ioHolder)
{
auto workers = [
"ping": spawn(&pingPong, ioHolder),
"echo": spawn(&echoWorker, ioHolder),
"counter": spawn(&counterHolder, ioHolder),
"timer": spawn(&timerHolder, ioHolder)
];
void receiveMessage(string msg)
{
void deleteIfExist(string com)
{
auto tid = (com in workers);
if(tid !is null)
{
workers.remove(com);
(*tid).prioritySend(thisTid, TERMINATE);
}
}
[
t(setState, (Capt c) {
auto com = c[1];
if(keywords.any!(e => e == com)) return;
deleteIfExist(com);
workers[com] = spawn(&speaker, ioHolder, c[2]);
}),
t(deleteState, (Capt c) { deleteIfExist(c[1]); }),
t(echoState, (Capt c) { workers["echo"].send(new WorkersArgument(c[1])); }),
t(counterState, (Capt c) { workers["counter"].send(new ReadMessage(c[1])); }),
t(timerState, (Capt c) { workers["timer"].send(new ReadMessage(c[1])); }),
t(commandState, (Capt c) {
auto com = c.hit;
auto tid = (com in workers);
if(tid !is null) (*tid).send(thisTid, RUNCOMMAND);
})
].each!((tup) {
if(auto cap = msg.matchFirst(tup[0]))
{
tup[1](cap);
return;
}
});
}
for(auto loop = true; loop;)
receive(
(ReadMessage message) { receiveMessage(message.msg); },
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; }
);
workers.values.each!(c => c.prioritySend(thisTid, TERMINATE));
}
struct RunCommand {}
enum RUNCOMMAND = RunCommand();
immutable
class MutableWorkersArgument
{
string arg;
this(string arg) immutable
{
this.arg = arg;
}
}
alias WorkersArgument = immutable MutableWorkersArgument;
一番ややこしい部分ですね。
先頭の部分でset
命令の際に無視する予約後と、入力をパーズする為の正規表現を定義しています。
入力がReadMessageとして送られてくると、適切な子アクターにメッセージを送って処理を委譲しています。
概ね見ての通りの処理ですが、set
命令だけちょっと見てみましょう。
t(setState, (Capt c) {
auto com = c[1];
if(keywords.any!(e => e == com)) return;
deleteIfExist(com);
workers[com] = spawn(&speaker, ioHolder, c[2]);
}),
set 文字列1 文字列2
を受け取ると、workers
連想配列に文字列1
をキーとして新しいアクターを登録します。アクターはこのように動的に作る事もできます。
次に、adminの子アクター達を見ていきましょう。
ping
module ohm.actors.workers.pingpong;
import std.concurrency, std.variant;
import ohm.app, ohm.actors.io, ohm.actors.admin;
void pingPong(Tid ioHolder)
{
for(auto loop = true; loop;)
receive(
(Tid tid, RunCommand _rc) { if(tid == ownerTid) ioHolder.send(new WritingMessage("pong!")); },
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
(Variant _any) {}
);
}
ping
を受け取ったらpong
を返すだけです。
receive
関数の最後の(Variant _any) {}
で、どれにもマッチしなかったメッセージを切り捨てています。メッセージを受け取る箱(メッセージボックスとかメールボックスとか呼ばれます)には容量の上限があるのです。勿論、このメッセージボックスが満杯になった時の挙動もコントロールする事ができます。
echo
module ohm.actors.workers.echo;
import std.concurrency, std.variant;
import ohm.app, ohm.actors.io, ohm.actors.admin;
void echoWorker(Tid ioHolder)
{
for(auto loop = true; loop;)
receive(
(WorkersArgument arg) { ioHolder.send(new WritingMessage(arg.arg)); },
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
(Variant _any) {}
);
}
こちらも単純で、受け取った文字列をIOに投げているだけです。
speaker
module ohm.actors.workers.speaker;
import std.concurrency, std.variant;
import ohm.app, ohm.actors.admin, ohm.actors.io;
void speaker(Tid ioHolder, string msg) {
for(auto loop = true; loop;)
receive(
(Tid tid, RunCommand _rc) { if(tid == ownerTid) ioHolder.send(new WritingMessage(msg)); },
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
(Variant _any) {}
);
}
これはset 文字列1 文字列2
命令を登録する為の関数です。書き出す文字列2
は引数として受け取り、保持しておきます。
RunCommand
という命令が親から来たら、引数として保存してある文字列をIOに投げるだけの処理を持ちます。
counter
module ohm.actors.workers.counter;
import std.concurrency, std.variant, std.conv, std.regex;
import core.time : dur;
import ohm.app, ohm.actors.io, ohm.actors.admin;
enum startReg = ctRegex!r"^start (\d+)$";
enum stopReg = ctRegex!r"^stop$";
void counterHolder(Tid ioHolder)
{
bool hasCounter;
Tid counter;
void stopIfExist()
{
if(hasCounter) counter.prioritySend(thisTid, TERMINATE);
hasCounter = false;
}
for(auto loop = true; loop;)
receive(
(ReadMessage rm) {
with(rm)
{
if(auto cap = msg.matchFirst(startReg))
{
stopIfExist();
counter = spawnLinked(&countWorker, ioHolder, cap[1].to!int);
hasCounter = true;
}
else if(msg.matchFirst(stopReg))
stopIfExist();
}
},
(Tid tid, Terminate _t) {
if(tid == ownerTid)
{
stopIfExist();
loop = false;
}
},
(LinkTerminated _lt) { hasCounter = false; },
(Variant _any) {}
);
}
void countWorker(Tid ioHolder, int cnt)
{
while(cnt-- > 0)
{
ioHolder.send(new WritingMessage(cnt.to!string));
receiveTimeout(
1.dur!"seconds",
(Tid tid, Terminate _t) { if(tid == ownerTid) cnt = 0; }
);
}
}
カウントダウンを行うアクターです。2つの関数から成ります。1つはカウンターを保持するアクターで、もう1つが実際にカウントダウンを行うアクターです。
countWorker
を作成する時、spawnLinked
なる関数を使っていますが、この関数でアクターを新規作成すると、終了した際に親アクターのreceive
関数にLinkTerminated
を投げるようになります。
counterHolder
は今カウンターが動いているかいないかの情報を保持していますから、カウントダウンが終了した場合、それを通知してもらわないといけません。今回は、LinkTerminated
をハンドルする事で状態の管理を行っています。
また、countWorker
内でreceiveTimeout
を使っています。これはreceive
関数と同等の機能を持ちますが、1つ目の引数に与えた時間だけ待ち受けし、時間が来たら待つのをやめて次の処理に進みます。
この機能を使って1秒単位でカウントダウンしつつ、途中で終了命令が来た場合はカウントダウンを放棄して終了する仕組みを作っています。
timer
module ohm.actors.workers.timer;
import
std.concurrency, std.variant, std.regex,
std.conv, std.algorithm.iteration;
import core.time;
import ohm.app, ohm.actors.io, ohm.actors.admin;
enum counterReg = ctRegex!r"^(\d+) (.+)$";
void timerHolder(Tid ioHolder)
{
Tid[Timer] table;
for(auto loop = true; loop;)
receive(
(ReadMessage rm) {
with(rm)
{
if(auto cap = msg.matchFirst(counterReg))
{
auto timer = new Timer(cap[1].to!int, cap[2]);
table[timer] = spawn(timer, ioHolder);
}
}
},
(Timer timer) {
if((timer in table) !is null)
{
table[timer].send(thisTid, TERMINATE);
table.remove(timer);
}
},
(Tid tid, Terminate _t) { if(tid == ownerTid) loop = false; },
(Variant _any) {}
);
table.values.each!(c => c.prioritySend(thisTid, TERMINATE));
}
immutable
class MutableTimer
{
long len;
string msg;
this(long len, string msg) immutable
{
this.len = len;
this.msg = msg;
}
void opCall(Tid ioHolder) immutable
{
receiveTimeout(
len.dur!"seconds",
(Tid tid, Terminate _t) { if(tid == ownerTid) return; }
);
ioHolder.send(new WritingMessage(msg));
ownerTid.send(this);
}
}
alias Timer = immutable MutableTimer;
指定された時間だけ待ってからメッセージを表示するアクターです。
この機能も、アクターを保持するアクターと、実際にタイマー機能を提供するアクターから成ります。
今回、タイマー機能を持つアクターは関数ではなくオブジェクトを用いて作成しました。spawn
に与えられるのは関数ポインタだけではなく、opCall
メソッドを持つ構造体やクラスのインスタンス等でも渡す事ができます。
timerHolder
はTid[Timer]
型の連想配列でタイマー一覧を管理していますので、Timer
が終了した時は、その事実をtimerHolder
に伝えなければなりません。ここでは、自分自身を送る事で終了を表現しています。Timer
クラス自体もimmutableなのでメッセージとして送る事ができます。
指定時間分だけ待つ機能には、カウンターと同様にreceiveTimeout
を利用しています。
動作確認
動かしてみましょう!
ping
pong!
echo Hello, D
Hello, D
set hey ho
hey
ho
set hey yo
hey
yo
set Dman cool!!!!
Dman
cool!!!!
delete Dman
Dman
いい感じですね。
(どれが入力でどれが出力か判り難いですが……)。
カウンターやタイマーはどうでしょうか。
counter start 5
4
3
2
1
0
timer 180 have it!
(3分後)
have it!
timer 5 go!
counter start 10
9
8
7
6
5
go!
4
3
2
1
0
ちゃんと動いているようです!
注目していただきたいのはカウンターとタイマーが同時に機能している点です。ちゃんと並行処理ができています!
展望
まぁこれだけだと手元で動かして面白いで終わってしまうのですが、もう少し今後の事を考えてみましょう。
今回のアプリでは、IO部分をその他ときっちり分離し、「入力はIOからappへReadMessageとして飛んで来る」「出力はIOへWritingMessageに包んで送る」「終了する時はTerminate
構造体を送る」という形に抽象化しました。現在の実装では標準入出力をIOとして利用していますが、上の形式に則ったものなら、何をIOとしても良いわけです。
これは何を意味するでしょうか。
例えばWebSocketと繋いでWebアプリとして稼働させるとか、またはチャットに繋いでチャットBotにするなどといった派生が、このアーキテクチャのまま作れるという事です。しかもやる事は、IOの部分を入れ替えるだけです!
アクターモデルで並行処理を行う利点は、このように疎結合で再利用性の高いプログラミングが自ずからできてしまうという点です。メッセージパッシングという手段でアクターとアクターとのデータをやり取りする以上、必然的に共有するデータは最小となり、またやり取りするデータも可能な限りコンパクトなものとなって、お互いへの依存が少なくなります。
(まぁ現実世界ではニッチでピーキーな処理もたくさん必要なので、そう上手くはいかないでしょうけど……)。
感想
D言語の軽量スレッドは言語組み込みの機能というわけではないですが、標準ライブラリに綺麗な感じで入っているので、上のように簡単に使う事ができます。所謂Erlang風に使えるので戸惑う点も少なく良いです(!
演算子は使えませんが……)。
ただErlang風とはいっても、D言語のspawn
は標準では単に新しいスレッドを立てて処理を任せるだけですので、Erlangくらい気軽に新しいコンテクストを作るのは避けた方が良いかもしれません。軽量スレッドらしくすぽんすぽんとspawn
したいなら自作のScheduler
を登録してよしなに並行処理をコントロールしてやる必要があるでしょう。
単に並列処理がしたいだけでしたらstd.parallelismがありますし、コルーチンを使って並行処理がしたければFiber
もあります。既存の処理の一部をちょっと並行処理・並列処理したいな、という程度であれば、これらのライブラリを利用した方が楽に書けると思います。
std.concurrencyを使って並行・並列処理を行う場合、明確にアクターモデルを意識して、利点を活かすような書き方を心掛けると良いのではないかと思います。上で述べたような疎結合性は当然として、流行りの言い方をすると「スケーラブルでノンブロッキングでイベントドリブンでフォルトトレラントな設計」にしましょう。
(今回私が書いたコードはフォルトトレランスがかなりアレですね)。
あと、immutable classが超便利です。Scalaにおけるcase class的な使い方ができるので、メッセージとして最適です。しかもcase classよりも強力な不変の保証があるので、取り扱いも安全に行えるというおまけ付き。
D言語がこうだったらいいなぁ
D言語大好きですが「これがあったらもっと使いやすいのに」な部分も出てきますね。
こんなこといいな、できたらいいな。
- D言語にもパターンマッチが欲しい、としみじみ思います……
- あとタプルリテラルも欲しい(
alias t = tuple;
でかなりそれっぽくはなりましたが) -
immutable
ってちょっと長すぎる - だんだんセミコロン置くのが面倒になってきた……