すごいErlangゆかいに学ぼう!
第10章 並行性とは
並行 と 並列 を区別しましょう。
- 並行
- concurrency
- 複数のアクターが 独立 して動いていること
- 並列
- parallelism
- たくさんのアクターを 同時 に動かすこと
スケーラビリティについて
まず、Erlang はGPUが行うような数値処理に関しては向かない。
Erlang が得意とする領域は、数値処理のようなプリミティブな問題ではなく、スイッチングだとかウェブのクローリングなど、論理的に高度な階層での分野。
すべてが線形スケールするわけではない
-
並列化しても、直列操作で一番遅いものよりはは早くならない
- 直列操作がひとつでもあるとスピードでないということ
- 例えば、処理中に何らかの待ち行列処理があると、その待ち時間より短くなることはない
アムダールの法則
-
システムを並列化したときに、どれくらい早くなるか
-
どれくらい並列化したのか
を表す法則
-
どれくらい並列化したのか
-
50% の並列化で2倍ちょい早くなる
-
85% の並列化で最大20倍くらい早くなる
-
なお、直列操作が多い処理をマルチコアで分散するとかえって遅くなるので注意
- 負荷分散のための処理が逆に遅くなる
- Erlang では
smp
オプションというものがあるので調べると良い。
Erlang のプロセス
- Erlang プロセス = 関数
- プロセスは関数を実行する
- 実行終了後消える
- 受信したメッセージを メールボックス に保存する
- 状態を持つ
- メッセージは到着した順に保存する
%% この例は関数の実行結果 (2) が出力されない.
%% なぜなら **プロセス** が何も返さないから
F = fun() -> 1 + 1 end.
spanw(F). % <0.44.0>
-
spawn
でプロセスを生成 - 返り値は プロセスID
%% 以下の例は10個のプロセスを並列実行
%% io:format の 1~10 の順番は保証されない
G = fun(X) -> timer:sleep(10), io:format("~p~n", [X]) end.
[spawn(fun() -> G(X) end) || X <- lists:seq(10)].
シェルもプロセス
%% シェルで以下実行すればわかる
self(). % シェル自身の Pid 取得
exit(self()). %% 終了
self(). % Pid が変わってるはず
メッセージの送信
self() ! hello. % 返り値は 'hello'
self() ! self() ! hello % 複数のプロセスに 'hello' を送信
self() ! (self() ! hello). % 上と等価
flush() % 受け取ったメッセージをメールボックスから取得
メッセージの受信
送られてきたメッセージ受け取りたい!
-
receive を使う
- case ... of は式を書く
- receive はメッセージを変数に束縛する
- ガード式も使える
%% イルカ(プロセス)とコミュニケーションを取る
-module(dolphins).
-compile(export_all).
dolphine() ->
receive
do_a_flip -> io:format("How about no?~m");
fish -> io:format("So long and thanks for all the fish!~n");
_ -> io:format("Heh, we're smarter than you humans.~n")
end.
上記のイルカモジュールをシェルで試す
c(dolphin).
Dolphin = spawn(dolphins, dolphin1, []).
Dolphin ! "hello, dolphin". % Heh, we're smarter than you humans.
プロセスから返り値を受け取る
- メッセージの送信元の Pid を receive 側で受け取り、その Pid へ向かって結果を
! (send)
すればよい。- 当然、送信側はメッセージと一緒に自分の Pid も送信する
%% From には Pid が入ってる
dolphin2() ->
receive
{From, do_a_flip}} ->
From ! "How about no?";
{From, fish} ->
From ! "So long and thanks than you humans.";
_ ->
io:format("Heh, we're sumarter than you humans.~n")
end.
使用例
c(dolphin2).
Dolphin2 = spawn(dolphins, dolphin2, []).
Dolphin2 ! {self(), do_a_flip}.
%% 相手側も send してくるので、自分のメールボックスを確認しないと返り値が取得できない
flush(). % Shell got "How about no?"
ok
- receive の中で再起すれば、複数回メッセージを投げれる
dolphin3() ->
receive
{From, do_a_flip} ->
From ! "How about no?",
dolphin3();
{From, fish} ->
%% 再起の終了
From ! "So long and thanks for all the fish!";
_ ->
io:format("Heh, we're smarter than you humans.~n"),
dolphin3()
end.
第11章 マルチプロセスについて詳しく
- 関数とメッセージだけでは、アクターを使いこなせない
- プロセスの状態を保持できるようにしないといけない
状態を持つプロセスを作ってみる
- 冷蔵庫を例とする
- 有限個の食べ物を保存したり取り出したりする
- 最初の例
- まだ保存も取り出しも未実装
-module(kitchen).
-compile(export_all).
fridge1() ->
receive
%% 保存処理 ok 返すだけ
{From, {store, _Food}} ->
From ! {self(), ok},
fridge1();
%% 取り出し処理 未実装
{From, {take, _Food}} ->
From ! {self(), not_found},
fridfe1();
terminate ->
ok
end.
関数に状態を追加する
- 再起を使って、呼び出し時のパラメータに関数の状態を保存させる
- 関数のパラメータがアキュムレータ
- リストを使ってみる
%% FoodList は食べ物を格納するリスト
fridge2(FoodList) ->
receive
{From, {store, Food}} ->
From ! {self(), ok},
fridge2([Food|FoodList]);
{From, {take, Food}} ->
case lists:member(Food, FoodList) of
true ->
From ! {self(), {ok, Food}},
fridge2(lists:delete(Food, FoodList));
false ->
From ! {self(), not_found},
fridge2(FoodList)
end;
terminate ->
ok
end.
メッセージの送受信関数を隠蔽化したい
- 実装した関数ごとに独自プロトコルを覚えるのはダルい
- メッセージを送受信する関数を作る
- ついでにプロセスを spawn する関数も作る
store(Pid, Food) ->
Pid ! {self(), {store, Food}},
receive
{Pid, Msg} -> Msg
end.
take(Pid, Food) ->
Pid ! {self(), {take, Food}},
receive
{Pid, Msg} -> Msg
end.
start(FoodList) ->
spawn(?MODULE, fridge2, [FoodList]).
タイムアウト
-
プロセスからの応答が帰ってこないことがある。
- デッドロックが発生しうる
-
pid(A,B,C)
というBIFをつかうと、A.B,Cな Pid を生成できるので試してみる。
% 適当なPidを待ち受けるとシェルが固まる...
kitchen:take(pid(0, 250, 0), dog).
after節
- タイムアウトする時間を設定できる
%% 3秒まってメッセージが来なかったら timeout
start2(Pid, Food) ->
Pid ! {self(), {store, Food}},
receive
{Pid, Msg} -> Msg
after 3000 ->
timeout
end.
take2(Pid, Food) ->
Pid ! {self(), {take, Food}},
receive
{Pid, Msg} -> Msg
after 3000 ->
timeout
end.
- after を利用して sleep を実装できる
%% 必ず T秒待つ。
sleep(T) ->
receive
after T -> ok
end.
選択的受信
- 受け取るメッセージに優先順位をつけること
- フラッシュ という概念で、選択受信を実装できる
%% Priority が 10未満のメッセージは normal へ受け流して receive させる
%% important から normal の順なので、`選択的受信` によって 10以上のメッセージが先にくる
important() ->
receive
{Priority, Message} when Priority > 10 ->
[Message | important()]
after 0 ->
normal()
end.
normal() ->
receive
{_, Message} ->
[Message | normal()]
after 0 ->
[]
end.
選択的受信の罠
-
メッセージは、パターンマッチされるまでプロセス内のメールボックスに保存される
- 送信元が死んでも保存される
- receive はメッセージ受け取るたびメールボックス内のメッセージをすべてチェックする
- 古いもの(受信した)順にチェックする
- マッチしなかったら次へ.. を繰り返し、マッチしたものはメールボックスから削除される
- あとで処理するメッセージは無視する ことが選択的受信の本質的な動作
-
よって、プロセス内にメッセージが溜まり過ぎると動作が遅くなる
- 選択的受信によって、 あとで処理する メッセージが多くなる
- receive のたびに、それら全部にパターンマッチが走る
- もし あとで処理するメッセージ がメールボックス内に 1000個あったら・・?
-
そんなときは
- メッセージ形式は正しい?
- そのメッセージ本当に必要?
- パターンマッチの定義間違ってない?
- 受信プロセス1個だけでいいの?
- とか考えるといいかも。
メールボックスの罠
- 予期せぬメッセージを検知し警告を出させることがある
%% 予想されうるすべてのメッセージをパターンマッチ
receive
hoge -> "hoge!";
fuga -> "fuga!";
piyo -> "piyo!"
%% 予期せぬメッセージには警告
Unexpected ->
io:format("unexpected message ~p~n", [Unexpected])
end.
第12章 エラーとプロセス
try ... catch
例外処理の問題点
- すべての階層で普通ではないエラー処理をしないといけない
- (正常動作とは異なるフローということ?)
- またはより上位層にエラー処理を移譲する
-
try ... catch
地獄が待っている・・ - python で何も考えず
try .. except Exception:
とかやっちゃって全部のエラー隠蔽して地獄見るのと似てる
-
- しかし・・
- Erlangはエラー処理を別の並行プロセスへエラー処理を追い出せる
- 通常のコードは、上手く行った場合のみを実装できる
- コードがクリーンに保てる
- 正常系と異常系の実装を綺麗に分けられるということか(?)
- Erlangの並行エラー処理を実現するツールは
- リンク
- モニター
- 名前付きプロセス
1. リンク
-
2つのプロセス間の特殊な関係
- プロセスの生死を共にする運命共同体
- 予期せぬ throw, error, exitしたとき、リンクされた方も死ぬ
-
エラーを止めるために可能な限り早く失敗させる
- これは有益な概念
- エラーの依存先を止める
- プロセスを素早く再起動させる
-
link/1
とunlink/1
を使う- 片方のプロセスが死んだ時、原因や付随する情報が送られる
- 形式は
{'EXIT', B, Reason}
- 形式は
- 正常に終了したときは何も送られない
- 片方のプロセスが死んだ時、原因や付随する情報が送られる
myproc() ->
timer:sleep(1000),
exit(reason).
%% 以下をシェルで操作
spawn(fun hoge:myproc/0). %% なにも起きない
link(spawn(fun hoge:myproc/0)). %% 1秒後に exception error が起きる
%% シェルからプロセスを殺す
Pid = spawn(fun hoge:myproc/0).
exit(Pid, reason).
- プロセス間のリンクは、一緒に死ぬプロセスのグループを作るのに使われる。
- 一緒に死んでもわらないと困る運命共同体を定義する
%% 互いに link するN個のプロセスを生成する
%% chain(0) からの死亡シグナルが呼び出し元まで伝播していく
chain(0) ->
receive
_ -> ok.
after 2000 ->
exit(chain_dies_here)
end;
chain(N) ->
Pid = spawn(fun() -> chain(N-1) end),
link(Pid),
receive
_ -> ok
end.
spawn_link
-
link(spawn(Func))
とlink(spawn(M, F, A))
- 動作に1ステップ以上かかる
- link を生成される前にクラッシュする可能性あり
- 予期せぬ動作が起きうる
-
spawn_link/1-3
というBIFがある- プロセスの生成とリンクを同時にする
- アトミックに処理する
- 失敗か成功かしかない
- 一般的にはこっちの関数のほうが安全
エラーシグナル
-
プロセスを超えたエラーの伝搬
- メッセージパッシングと同様にプロセス間で行われる
-
シグナル という特別なメッセージを使う
- プロセス上で自動的に動作して、そのプロセスを殺す
-
どうやって再起動させるか
- 信頼性のため、素早い終了と再起動は必須
-
link
でプロセスを殺せる- 再起動はどうする?
- まずプロセスが死んだかどうかを知らないと再起動もできない
- システムプロセス をリンク上に追加すればよい
-
システムプロセスとは
- 終了シグナルを普通のメッセージとして受け取れるプロセスのこと
-
process_flag(trap_exit, true).
を使う
% シェルで実行
process_flag(trap_exit, true).
spawn_link(fun() -> hoge:chain(3) end).
receive X -> X end.
{'EXIT', <0.49.0>, "chain dies here"}
例外の種類
-
spawn_link
やexit/2
で大概の例外は補足できる - 補足させず、無理やり Kill したい時もある。
-
kill
シグナルを送る- プロセスを殺す最終手段
- 決して補足されない
- プロセスがこのメッセージを受け取ったら
killed
に変更されなければならない-
killed
に変更されないと、他のリンクされたプロセスにもkill
が伝播する- 死の連鎖
- 他のリンクされたプロセスから受け取った
exit(kill)
がkilled
に見える理由- 伝播しないよう変わる
- ローカルで補足されたときは依然として
kill
のように見える
-
-
2. モニター
- リンクされたプロセスは一緒に死ぬ運命共同体
- 必ずしも死ぬ必要はない
- プロセスをストーキングできれば充分な場合もある
- モニター をつかう
モニターとは
- 一方向の監視
- 2つのプロセス間で複数のモニターを持てる
- スタックができて、それぞれに識別子を持つ
- リンクはスタックしない
- ライブラリでリンク設定・削除をするとき、関係ないリンクを変更してしまう可能性
- 一つのプロセスに複数のリンク設定時、特定のリンクだけ
unlink
する、というのができない(?)
- モニターはスタックできる一方向のリンク、みたいなもの (?)
- 他のプロセスで何が起きているかを知る必要がある時に便利
モニターを使う
- モニターは
MRef
識別子で一意に補足できる- MRef でモニターを解除できる
- モニターをスタックできる
- ひとつ以上のモニターを停止できる
- 参照先が死ぬと、
{'DOWN', MRef, process, Pid, Reason}
なメッセージを受け取る
1> erlang:monitor(process, spawn(fun() -> timer:sleep(500) end)).
# Ref<0.0.0.42>
2> flush().
Shell got {'DOWN',#Ref<0.0.0.42>,process,<0.35.0>,normal}
ok
-
link
と同様にプロセスの生成とモニタリングをアトミックに行うBIFがある- 以下の例では、プロセスが死ぬ前に
demonitor
してる- プロセスの死について追跡はしていない
- 以下の例では、プロセスが死ぬ前に
1> {Pid, Ref} = spawn_monitor(fun() -> receive _ -> exit(boom) end end).
{<0.34.0>,#Ref<0.0.0.26>}
2> erlang:demonitor(Ref).
true
3> Pid ! die.
die
4> flush().
ok
-
demonitor/2
もある- オプションは2つ
- info
- モニター削除時に、モニターが存在したかどうかを true/false で返す
- flush
- メールボックスに
DOWN
メッセージがあった場合にそれを削除する
- メールボックスに
6> {Pid, Ref} = spawn_monitor(fun() -> receive _ -> exit(boom) end end).
{<0.40.0>,#Ref<0.0.0.47>}
7> Pid ! die.
die
8> erlang:demonitor(Ref, [flush, info]).
false
9> flush().
ok
3. 名前付きプロセス
- リンクとモニターでも解決できていない問題
- リンク先の死亡を検出した時にできることは何か?
%% CDの評判を知りたいので批評家(critic) を生成した
start_critic() ->
spawn(?MODULE, critic, []).
%% 批評家に批評を聞く
judge(Pid, Band, Album) ->
Pid ! {self(), {Band, Album}},
receive
{Pid, Criticism} -> Criticism
after 2000 ->
timeout
end.
%% 批評家を表す関数
critic() ->
receive
{From, {"Rage Against the Turing Machine", "Unit Testify"}} ->
From ! {self(), "They are great!"};
{From, {"System of a Downtime", "Memoize"}} ->
From ! {self(), "They're not Johnny Crash but they're good."};
{From, {"Johnny Crash", "The Token RIng of Fire"}} ->
From ! {self(), "Simply incredible."};
{From, {_Band, _Album}} ->
From ! {self(), "They are terrible!"}
end,
critic().
- 上記のコードは、何らかの理由で
critic
が死ぬと- ずっと timeout が起きる
- プロセスを再起動させるスーパーバイザを実装する!
start_critic2() ->
spawn(?MODULE, restarter, []).
%% 批評家プロセスとリンクし、死活監視をする
%% 批評家から終了シグナルを受け取り、再起動したりしなかったりする
restarter() ->
process_flag(trap_exit, true),
Pid = spawn_link(?MODULE, critic, []),
receive
{'EXIT', Pid, normal} -> % クラッシュではない
ok;
{'EXIT', Pid, shutdown} -> % 手動終了でクラッシュではない
ok;
{'EXIT', Pid, _} ->
restarter() %% 恐らくクラッシュしたので批評家を再生成する
end.
- 上記のソースの問題点
-
start_critic2
してもPid
を取得できない!- Pidを知らないので評価に意見を聞けない (Send できない)
-
名前付きプロセスを使う
restarter() ->
process_flag(trap_exit, true),
Pid = spawn_link(?MODULE, critic, []),
%% 名前付きプロセスにする
register(critic, Pid),
receive
{'EXIT', Pid, normal} -> % クラッシュではない
ok;
{'EXIT', Pid, shutdown} -> % 手動終了
ok;
{'EXIT', Pid, _} ->
restarter()
end.
judge2(Band, Album) ->
%% 名前付きプロセスからPidを検索
critic ! {self(), {Band, Album}},
Pid = whereis(critic),
receive
{Pid, Criticism} -> Criticism
end.
プロセスの競合状態
- 以下のコードには問題が発生しうる可能性がある
- たった一行だが、この間に軽量プロセスのメッセージパッシングが走っていることに注意
- 異なるノード/プロセスで実行された結果を待ちけている,という状況
critic ! {self(), {Band, Album}},
Pid = whereis(critic),
-
critic プロセスがクラッシュしたとき
- whereis が失敗してクラッシュ
- critic が再起動して Pid が変化したとき
- receive が絶対に失敗する
-
critic プロセスは、複数の他のプロセスから参照されている
-
Share State と呼ばれる状況
- critic プロセスが他のプロセスからアクセス・修正可能な状況
- 情報の一貫性が失われ、エラーにつながりかねない
-
競業状態 とも
- 特に危険
- 発生確率がそれなりにある
- あらゆる並列言語、並行言語で起こる
- プロセッサの負荷やプロセスの方向とか、予期せぬ要因に依存してるため
-
Share State と呼ばれる状況
Erlang は競合状態と縁がないのか?
- 多くの場合ではそう
- メールボックスを介したメッセージパッシングのおかげ
- イベントにある種の順序が矯正されるため
- Shared State が言語に寄って厳格に制限される
- Erlangコードが競合状態と無縁だと仮定するべきではない
- メールボックスを介したメッセージパッシングのおかげ
名前付きプロセスによる競合状態を回避する
- 名前付きプロセスが同じままなら
-
make_ref
を使う - 変化しうる Pid ではなく ErlangVM が生成する一意参照を識別子にする
-
judge2(Band, Album) ->
%% 一意参照を取得
Ref = make_ref(),
critic ! {self(), Ref, {Band, Album}},
receive
%% Pid ではなく一意参照が帰ってきてるかで正しい応答先かを判別する
{Ref, Criticism} -> Criticism
after 2000 ->
timeout
end.
critic2() ->
receive
{From, Ref, {"Rage Against the Turing Machine", "Unit Testify"}} ->
From ! {Ref, "They are great!"};
{From, Ref, {"System of a Downtime", "Memoize"}} ->
From ! {Ref, "They're not Johnny Crash but they're good."};
{From, Ref, {"Johnny Crash", "The Token RIng of Fire"}} ->
From ! {Ref, "Simply incredible."};
{From, Ref, {_Band, _Album}} ->
From ! {Ref, "They are terrible!"}
end,
critic2().
第13章 並行アプリケーションを設計する
- 並行Erlang で小さいアプリケーションを書く
- 題材はイベントリマインダーアプリケーション
問題を理解する
- つくりたいものは?
- リマインダーアプリケーション
- ソフトウェアとのプロトコルは?
- 機能をどのようにプロセスという形に落としこむ?
- 送るメッセージをどうやって知る?
-
以上を踏まえ仕様を以下のように決める
- イベントを追加できる
- イベントには以下の要素がある
- 締め切り(警告時間)
- 名前
- 詳細
- イベントの時間が来たら警告を表示
- 名前を指定してキャンセル可能
- CUI経由でリマインダーと通信する
構成要素
- クライアント
- イベントサーバー
- イベントプロセス
アプリケーションの基礎構造
- ディレクトリ構造
フォルダ名 | 役割 |
---|---|
ebin/ | コンパイルされたファイル置き場 |
include/ | 他のアプリケーションからinclude される .hrl 置き場 |
priv/ | Erlangとやりとりする必要がある実行ファイル, ドライバなど |
src/ | すべての .erl 置き場, プライベートな .hrl もここ |
イベントモジュールの作成
- 現時点でのプロトコルは不完全
- 大体
{Pid, Ref, Message}
という形式にはなるだろうと- Pid(送信元), Ref(一意参照) の組み合わせて送受信の関係を特定できる
- 大体
イベンプロセスのメインループ
- おおまかな構造は以下
%% 変数Stateにループの状態を保存
loop(State) ->
receive
%% イベントサーバーからのキャンセルメッセージを受け取れるようにする
{Server, Ref, cancel} ->
event_cancel;
%% 指定時間経過したら、期限が来た処理をする
after Delay ->
event_timelimit
end.
- これを踏まえ、コードは大体以下
-module(event).
-compile(export_all). %% 開発のため
-record(state, {server, name="", to_go=0}).
%% Server には Pid が入っている
loop(S = #state{server=Server}) ->
receive
{Server, Ref, cancel} ->
Server ! {Ref, ok}
after S#state.to_go * 1000 ->
Server ! {done, S#state.name}
end.
- ところで
- after に設定できるミリ秒は、 50日 (3600 * 24 * 50) まで
- 仕様上の制限なのでしかたない
- しかしこれだと、締め切りを1年後とかに設定できない
- 仕様上の制限なのでしかたない
- after に設定できるミリ秒は、 50日 (3600 * 24 * 50) まで
%% Erlang ではタイムアウトが 49日まで
%% それ以上のタイムアウトを設定したいとき
%% [4233600, 4233600,...] なリストを生成して返す
normalize(N) ->
Limit = 49 * 24 * 60 * 60,
[N rem Limit | lists:duplicate(N div Limit, Limit)].
loop(S = #state{server=Server, to_go=[T|Next]}) ->
receive
{Server, Ref, cancel} ->
Server ! {Ref, ok}
after T*1000 ->
if Next =:= [] ->
Server ! {done, S#state.name};
Next =/= [] ->
loop(S#state{to_go=Next})
end
end.
ちょっとした改善
- 締め切りを秒で指定するのは面倒
- Erlang の datetime フォーマットで指定できるようにする
time_to_go(TiemOut={{_,_,_}, {_,_,_}}) ->
Now = calendar:local_time(),
ToGo = calendar:datetime_to_greforian_seconds(TimeOut) - calendar:datetime_to_gregorian_seconds(Now),
Secs = if ToGo > 0 -> ToGo;
ToGo =< 0 -> 0;
end,
normalize(Secs).
インターフェースの追加
- プログラムの利便性を上げるため、呼び出しI/F関数を追加する
- 上記loop関数をうまく動作させるのに必要なデータを初期化する関数を書く
%% 第3引数は init に渡される
start(EventName, Delay) ->
spawn(?MODULE, init, [self(), EventName, Delay]).
start_link(EventName, Delay) ->
spawn_link(?MODULE, init, [self(), EventName, Delay]).
%% イベントの内部処理
init(Server, EventName, DateTime) ->
loop(#state{server=Server, name=EventName, to_go=tiem_to_go(DateTime)}).
%% cancel メッセージ送信用関数(I/F) を書く
cancel(Pid) ->
%% プロセスが既に死んでるか確認しておくためのモニター
Ref = erlang:monitor(process, Pid),
Pid ! {self(), Ref, cancel},
receive
{Ref, ok} ->
erlang:demontor(Ref, [flush]),
ok;
{'DOWN', Ref, process, Pid, _Reason} ->
ok
end.
イベントサーバー
- クライアントと各イベントプロセスの仲介者
- 大体の概要動作は以下のコードで
-module(evserv).
-compile(export_all).
loop(State) ->
%% 返信が必要な処理は {Pid, Ref, Msg} の形式
receive
{Pid, MsgRef, {subscribe, Client}} ->
%% イベントに登録する処理
{Pid, MsgRef, {add, Name, Description, TimeOut}} ->
%% イベント追加
{Pid, MsgRef, {cancel, Name}} ->
%% キャンセル処理
{done, Name} ->
%% イベント完了処理
shutdown ->
%% サーバー終了処理
{'DOWN', Ref, process, _Pid, _Reason} ->
%% モニター先が落ちてた
code_change ->
%% ホットアップグレード
Unknown ->
io:format("Unknown message: ~p~n", [Unknown]),
loop(State)
end.
状態変数 State に保存するべき状態はなにか
- イベントサーバーが持つべき状態
- サブスクライブしているクライアントのリスト
- spawn したすべてのイベントプロセスのリスト
-record(state, {events, %% eventレコードのリスト
clients). %% Pid のリスト
-record(event, {name="", description="", pid, timeout={{1970, 1, 1}, {0,0,0}}).
-
#state
のevent
とclient
はorddict
で保持するものとする- 同時に何百ものクライアントを持つことがなさそうなので…
%% orddict を使う init関数を定義
init() ->
%% 制定ファイルからのイベント読み込みはこの関数で完了する.
%% そのため、イベントをどこから読み込むを伝えるための引数を init に渡す必要がある。
%% 渡された引数を通じてイベントをここで読み込む.
%% もう一つの方法として、この関数からサーバーへ直接イベントを渡すだけでも良い
loop(#state{events=orddict:new(),
clients=orddict:new()}).
メッセージの処理
- イベントのサブスクライブ処理
- イベント終了時に通知
- すべてのサブスクライバーを保存する必要あり
- 監視も必要
- クラッシュしたクライアントにメッセージ送信しても無駄なので…
- イベント終了時に通知
{Pid, MsgRef, {subscribe, Client}} ->
%% 監視のためMRef 取得
Ref = erlang:monitor(process, Client),
%% クライアントのリストを更新
NewClients = orddict:store(Ref, Client, S#state.clients),
Pid ! {MsgRef, ok},
loop(S#state{clients=NewClients});
- イベントの追加処理
- エラーステータスを返せる
- タイムスタンプが正しいかのバリデーションが必要
- しかしErlangの calender モジュールにはけTimeを検証する関数がない
- 自前実装
- Erlang の BIF は偏りがある
- しかしErlangの calender モジュールにはけTimeを検証する関数がない
valid_datetime({Date, Time}) ->
try
calender:valid_date(Date) andalso valid_time(Time)
catch
error:function_clause -> %% 正しい {{Y,M,D},{H,M,S}} ではない
false
end;
valid_time({H, M, S}) -> valid_time(H,M,S).
valid_time(H, M, S) -> when H >= 0, H < 24,
M >= 0, M < 60,
S >= 0, S < 60 -> true;
valid_time(_, _, _) ->false.
%% メッセージ処理部分
{Pid, MsgRef, {add, Name, Description, TimeOut}} ->
%% TimeOutの仕様が正しいか
case valid_datetime(TimeOut) of
true ->
%% Eventプロセスを生成し link する
EventPid = event:start_link(Name, TimeOut),
NewEvents = orddict:store(Name,
#event{name=Name,
description=Description,
pid=EventPid,
timeout=TimeOut},
S#state.events),
Pid ! {MsgRef, ok},
loop(S#state{events=NewEvents}); % 状態更新
false ->
Pid ! {MsgRef, {error, bad_timeout}},
loop(S)
end;
- キャンセルメッセージの処理
{Pid, MsgRef, {cancel, Name}} ->
%% Name なイベントがあれば削除し、なければ現在の状態がそのままEventsへ返る
Events = case orddict:find(Name, S#state.events) of
{ok, E} ->
event:cancel(E#event.pid),
orddict:erase(Name, S#state.events);
error ->
S#state.events
end,
Pid ! {MsgRef, ok},
loop(S#state{events=Events});
- イベントプロセスが、イベントサーバーへ返すメッセージ
- done メッセージ
send_to_clients(Msg, ClientDict) ->
orddict:map(fun(_Ref, Pid) -> Pid ! Msg end, ClientDict).
%% ----------
{doen, Name} ->
case orddict:find(Name, S#state.events) of
{ok, E} ->
send_to_clients({done, E#event.name, E#event.description}, S#state.clients),
NewEvents = orddict:erase(Name, S#state.events),
loop(S#state{events=NewEvents});
error ->
%% イベントのキャンセルと同時に、イベントがメッセージを発することがある(ありうる)
loop(S)
end;
- その他諸々のユーティリティメッセージ
- シャットダウン
- コードのアップグレード
- 各種ステータスメッセージ
shutdown ->
exit(shutdown);
{'DOWN', Ref, process, _Pid, _Reason} ->
loop(S#state{clients=orddict:erase(Ref, S#state.clients)});
code_change ->
?MODULE:loop(S);
Unknown ->
io:format("Unknown message: ~p~n", [Unknwon]),
loop(S);
ホットコードローティング
-
コードの無停止アップグレード
-
Erlangの コードサーバー なる機能を使う
- ETSのテーブルを関しているVMプロセス
- メモリ上で1つのモジュールの2つのバージョンを保持
- 両方を実行できる
-
c(Mod)
,l(Mod)
,code:
などでロードすると、自動で新しいバージョンがロードされる
- メモリ上で1つのモジュールの2つのバージョンを保持
- ETSのテーブルを関しているVMプロセス
-
Erlang の関数呼び出し方法
- ローカル呼び出し
-
Func(Args)
な呼び出し- 現在実行中のバージョンから実行される
-
- 外部呼び出し (完全修飾呼び出し)
-
Mod:Func(Args)
な呼び出し- export されたもの
-
常に コードサーバーから最新のバージョンが実行される
- 完全修飾呼び出し内のローカル呼び出しされた関数も最新版で実行される
-
- 再起している時
- 完全修飾呼び出しすると最新版をロードできる
-
?MODULE:loop(S)
みたいに呼び出せばよい
-
- 完全修飾呼び出しすると最新版をロードできる
- ローカル呼び出し
%% 大体の仕組みは以下
-module(hotload).
-export([server/1, upgrade/1]).
server(State) ->
receive
update ->
NewState = ?MODULE:upgrade(State),
%% 新しいバージョンのモジュールでループする
?MODULE:server(NewState);
SomeMessage ->
%% なにか処理
server(State) %% 同じバージョンのままループ
end.
upgrade(OldState) ->
%% 新バージョンへ対応したメッセージに状態を変換したり
メッセージは隠す
- 他の開発者に作ったコードやプロセスで何か処理を書いてもらう場合
- I/F関数内にメッセージを隠匿すべき
%% サーバーは同時に一つだけ起動すると想定し register で登録
start() ->
register(?MODULE, Pid=spawn(?MODULE, init, [])),
Pid.
start_link() ->
register(?MODULE, Pid=spawn_link(?MODULE, init, [])),
Pid.
terminate() ->
?MODULE ! shutdown.
- もしサーバーを複数起動するつもりなら
- register の代わりに global モジュールを使おう
- gproc モジュールを使うと尚良いかも
- register の代わりに global モジュールを使おう
その他メッセージの抽象化
- サブスクライブメッセージの抽象化
subscribe(Pid) ->
Ref = erlang:monitor(process, whereis(?MODULE)),
?MODULE ! {self(), Ref, {subscribe, Pid}},
reveive
{Ref, ok} ->
{ok, Ref};
{'DOWN', Ref, process, _Pid, Reason} ->
{error, Reason}
after 5000 ->
{error, timeout}
end.
- イベントの追加
add_event(Name, Description, TimeOut) ->
Ref = make_ref(),
?MODULE ! {self(), Ref, {add, Name, Description, TimeOut}},
receive
{Ref, Msg} -> Msg
%% クライアントが受診するはずの {error, bad_timeout} メッセージもそのまま転送することに注意
%% クライアントをクラッシュさせるなら以下のように例外出すパターンマッチを先に追加する
%% {Ref, {error, Reason}} -> erlang:error(Reason);
after 5000 ->
{error, timeout}
end.
- イベントのキャンセル
cancel(Name) ->
Ref = make_ref(),
?MODULE ! {self(), Ref, {cancel, Name}},
receive
{Ref, ok} -> ok
after 5000 ->
{error, timeout}
end.
- クライアントのためユーティリティ関数
%% 一定期間すべてのメッセージを蓄える
listen(Delay) ->
receive
M = {done, _Name, _Description} ->
[M | listen(0)]
adter Delay * 1000 ->
[]
end.
プログラムをビルドしてみる
- EMakefile
- Erlang用のMakefile
- Erlang項でMakeの動作を指定できる
%% debug_infoファイル はまず必須だと思って構わない
{'src/*', [debug_info,
{i, "src"}, % src ディレクトリからソース検索
{i, "include"}, % include ディレクトリからヘッダ検索
{outdir, "ebin"}]}. % 出力先
あとは erl -make
とコマンドを打てばMakeが走る
成功したら `erl -pa ebin/' と検索パスを追加するオプションと共にシェルを起動すれば、モジュールをロードできる。
だいたい以下のように動く
evserv:start(). % 起動
evserv:subscribe(self())
evserv:add_event("hey", "test", FutureDateTime).
evserv:listen(5).
evserv:cancel("hey").
evserv:add_event("hey2", "test", NextFutureDateTiem).
event:listen(2000). % [{done, "hey2", "test}] が返る
監視を追加する
- 監視し、落ちたら自動で再起動する
restarter
を実装するべき
-module(sup).
-export([start/2], start_link/2, init/1, loop/1]).
start(Mod, Args) ->
spawn(?MODULE, init, [{Mod, Args}]).
spawn_link(Mod, Args) ->
spawn_link(?MODULE, init, [{Mod, Args}]).
init({Mod, Args}) ->
process_flag(trap_exit, true),
loop({Mod, start_link, Args}).
loop({M, F, A}) ->
Pid = apply(M, F, A),
receive
{'EXIT', _From, shutdown} ->
%% スーパーバイザが死ぬと監視対象も死ぬ
exit(shutdown); % 子プロセスも同様に殺す
{'EXIT', Pid, Reason} ->
io:format("Process ~p exited for reason ~p~n", [Pid, Reason]),
loop({M, F, A})
end.
だいたい以下のように動く
SubPid = sup:start(evserv, []). % スーパーバイザ経由でリマインダーを動かす
exit(whereis(evserv), die). % -> Process <0.44.0> exited for reason die
whereis(evserv) % -> 再起動してるので Pid が返る
exit(SupPid, shutdown). % スーパーバイザを止める
whereis(evserv). % -> undefined が返る
名前空間
-
Erlang はフラットなモジュール構造
- 階層がない
- アプリケーションの衝突が起きる
-
user
みたいなありがちな名前とか
- 階層がない
-
code:clash/0
でテストできる -
このような理由から、Erlangではプロジェクト名を接頭辞につける慣習がある
- reminder_evserv
- reminder_sup
- reminder_event など
-
とにかく名前空間には注意しましょう