この記事は、「Elixir or Phoenix Advent Calendar 2017」の5日目です。
昨日は @tuchiro さんの「ElixirでSI開発入門 #1 Ectoで悲観的ロック」でした。
fukuoka.ex
twinbeeことenぺだーしと申します。
福岡でfukuoka.exという活動をやっております。
先日こちらサイトでfukuoka.exのコアメンバーとして取材を受けました。福岡でのElixirのコミュニティー活動に興味がある方は是非ご覧ください。
FlowとGenStage
ElixirにはFlowという並列ストリームを直観的に使えるライブラリがありますが、本格的に使おうとすると、GenStageの知識が必用になってきます。名前の通りStageがどういうものかも直観的に理解ができます。
Flowの簡潔さに比べて、GenStageのコードは一見わかりにくいのですが、定義の一つ一つは簡単です。JavaやC#、Javascriptなどオブジェクト指向から入ってこられた方は、Behaiviorのコールバック実装が継承クラスのメソッド実装に似ているので、こちらのほうが馴染みやすいかもしれません。
公式ドキュメントを元にGenStageに入門してみましょう。
まずはプロジェクトを作成します。
# mix new gentest
# cd gentest
defp deps do
[
{:gen_stage, "~> 0.13.1"} # 追加
]
# mix deps.get
実装の手順
サンプルプログラムは、数字をカウントしながら無限にリストを流すという内容です。
最後のほうにアニメーションGIFがありますので、そちらをご覧になるとイメージが掴めると思います。
今回出来上がったプログラムはA,B,Cはそれぞれ並列ですが、各工程はシングルプロセスで動作します。
ステージには3種類のタイプがあります。
ストリームの入り口から消費先まで、3段階のステージが分かれています。
A) Producer ストリーム発生元(生産者・供給者)
B) Producer-Consumer 中間加工(生産者-消費者)
C) Consumer 結果 (消費者)
今回の例では以下の流れで実装していきます。
1.上記ABCそれぞれに、コールバックを設置する。
2.ABCの購読(配管)を行う。
「ABCの各ステージを作って購読(配管)すれば、勝手にストリームが流れ出す!」それがGenStageです。(GenStageの一部です)
各モジュールの実装
以下ABCモジュールは同一ファイルに書いてもOKです。lib\gen.ex等適宜ファイルを作成してコードを書いて(コピペして)いきましょう。
Aモジュール (Producer)
defmodule直後のuse句はマクロ使用の予約語でGenStageはマクロを利用した言語拡張で書かれていることがわかります。
モジュール内の下記関数はコールバックなので、手動で呼び出すことはなく、処理の流れの中でイベントを受け取った際に勝手に実行されます。
-
init
ステージのタイプ:producer
の宣言、カウンター初期値を設定しています。 -
handle_demand
引数のdemandは要求数、counterは現在値です。現在値~{現在値+要求数)のリストを作って次のタプルを返します。
{ :noreply, 結果のリスト, 次回の開始値 }
ソースを見ると一目瞭然なので、解説読むほうがしんどいかもしれませんね。
生産者なのに、需要であるdemand
が引数になってるのがポイントですが、何か変だな?と感じた方は鋭いです! それは次回に解説します。
defmodule A do
use GenStage #「このモジュールはGenStageとして使用する」宣言
def start_link(number) do
GenStage.start_link(A, number) # 本モジュールをプロセスに登録
end
def init(counter) do
{:producer, counter} # "生産者"宣言
end
def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1) #リストを作成
{:noreply, events, counter + demand}
end
end
形式的には、終始この形で下記3種のコールバックを書いていく流れとなります。
これ以上難しいものは、もう出てきません。
- start_link … モジュールをプロセスとして起動する際に呼び出されるコールバック
- init … 初期化コールバック
- handle_events / handle_demand … データの受取、需要発生時のコールバック
Bモジュール (producer-consumer)
ステージAから送られてきたデータに、numberを掛けて次のステージに流す中間加工所です
-
init
で生産者-消費者タイプを登録 -
handle_events
で、リストの各要素にnumberを掛け合わせます。
defmodule B do
use GenStage
def start_link(number) do
GenStage.start_link(B, number) # 本モジュールをプロセスとして登録
end
def init(number) do
{:producer_consumer, number} # 中間加工業者 宣言
end
def handle_events(events, _from, number) do # データ受取った時にする仕事を宣言
events = Enum.map(events, & &1 * number) # 加工処理の本体
{:noreply, events, number} # 次の工程へ
end
end
Cモジュール(cosumer)
init
で消費者タイプを登録
handle_events
で、1秒ごとに前のステージBから送られてきたリストを表示します
defmodule C do
use GenStage
def start_link() do
GenStage.start_link(C, :ok)
end
def init(:ok) do
{:consumer, :the_state_does_not_matter} # "消費者" 宣言
end
def handle_events(events, _from, state) do
Process.sleep(1000) # 1秒待つ
IO.inspect(events) # リストを表示
{:noreply, [], state} # 消費者なので、次のステージに渡すリスト(events)はなし
end
end
購読(配管)
ここも、解説することがないぐらいシンプルです。
前半3行でABCそれぞれのプロセスを引数を与えて起動、後半2行で各ステージの購読先を登録。
これだけです。
{:ok, a} = A.start_link(0) # 0から始めます
{:ok, b} = B.start_link(2) # 2を掛けます
{:ok, c} = C.start_link() # 状態は不要
GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)
5行目を書き終えたら、配管が完成するのでストリームが流れ出します。
1秒ごとに、2倍されたカウンターのリストが表示されると思います。(gifは0.5秒)
永遠に流れ続けるので、Ctrl+Cで適当に終わってくだい。
サンプルを動かすと、どういうものか一目瞭然だと思います。
次回予告
お疲れ様でした!
次回はElixirのGenStageに入門する#2 バックプレッシャーを理解するになります。
明日はzacky1972さんによる
ZEAM開発ログv0.1.0 Flow / GenStage による並列プログラミング入門です。