ElixirのGenStageに入門する #1

More than 1 year has passed since last update.

この記事は、「Elixir or Phoenix Advent Calendar 2017」の5日目です。

昨日は @tuchiro さんの「ElixirでSI開発入門 #1 Ectoで悲観的ロック」でした。


fukuoka.ex

twinbeeことenぺだーしと申します。

福岡でfukuoka.exという活動をやっております。

fukuokaex_tuchiro_enpedasi.jpg

先日こちらサイトでfukuoka.exのコアメンバーとして取材を受けました。福岡でのElixirのコミュニティー活動に興味がある方は是非ご覧ください。


FlowとGenStage

ElixirにはFlowという並列ストリームを直観的に使えるライブラリがありますが、本格的に使おうとすると、GenStageの知識が必用になってきます。名前の通りStageがどういうものかも直観的に理解ができます。

Flowの簡潔さに比べて、GenStageのコードは一見わかりにくいのですが、定義の一つ一つは簡単です。JavaやC#、Javascriptなどオブジェクト指向から入ってこられた方は、Behaiviorのコールバック実装が継承クラスのメソッド実装に似ているので、こちらのほうが馴染みやすいかもしれません。

公式ドキュメントを元にGenStageに入門してみましょう。

まずはプロジェクトを作成します。

 # mix new gentest

# cd gentest


mix.exs

  defp deps do

[
{:gen_stage, "~> 0.13.1"} # 追加
]

 # mix deps.get



実装の手順

サンプルプログラムは、数字をカウントしながら無限にリストを流すという内容です。

最後のほうにアニメーションGIFがありますので、そちらをご覧になるとイメージが掴めると思います。

今回出来上がったプログラムはA,B,Cはそれぞれ並列ですが、各工程はシングルプロセスで動作します。

ステージには3種類のタイプがあります。

ストリームの入り口から消費先まで、3段階のステージが分かれています。

A) Producer ストリーム発生元(生産者・供給者)

B) Producer-Consumer 中間加工(生産者-消費者)

C) Consumer 結果 (消費者)

今回の例では以下の流れで実装していきます。

1.上記ABCそれぞれに、コールバックを設置する。

2.ABCの購読(配管)を行う。

「ABCの各ステージを作って購読(配管)すれば、勝手にストリームが流れ出す!」それがGenStageです。(GenStageの一部です)



各モジュールの実装

以下ABCモジュールは同一ファイルに書いてもOKです。lib\gen.ex等適宜ファイルを作成してコードを書いて(コピペして)いきましょう。


Aモジュール (Producer)

defmodule直後のuse句はマクロ使用の予約語でGenStageはマクロを利用した言語拡張で書かれていることがわかります。

モジュール内の下記関数はコールバックなので、手動で呼び出すことはなく、処理の流れの中でイベントを受け取った際に勝手に実行されます。



  • init ステージのタイプ:producerの宣言、カウンター初期値を設定しています。


  • handle_demand
    引数のdemandは要求数、counterは現在値です。現在値~{現在値+要求数)のリストを作って次のタプルを返します。

{ :noreply, 結果のリスト, 次回の開始値 }

ソースを見ると一目瞭然なので、解説読むほうがしんどいかもしれませんね。

生産者なのに、需要であるdemandが引数になってるのがポイントですが、何か変だな?と感じた方は鋭いです! それは次回に解説します。


A

defmodule A do

use GenStage #「このモジュールはGenStageとして使用する」宣言

def start_link(number) do
GenStage.start_link(A, number) # 本モジュールをプロセスに登録
end

def init(counter) do
{:producer, counter} # "生産者"宣言
end

def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1) #リストを作成
{:noreply, events, counter + demand}
end
end


形式的には、終始この形で下記3種のコールバックを書いていく流れとなります。

これ以上難しいものは、もう出てきません。


  • start_link … モジュールをプロセスとして起動する際に呼び出されるコールバック

  • init … 初期化コールバック

  • handle_events / handle_demand … データの受取、需要発生時のコールバック



Bモジュール (producer-consumer)

ステージAから送られてきたデータに、numberを掛けて次のステージに流す中間加工所です



  • initで生産者-消費者タイプを登録


  • handle_eventsで、リストの各要素にnumberを掛け合わせます。


B

defmodule B do

use GenStage

def start_link(number) do
GenStage.start_link(B, number) # 本モジュールをプロセスとして登録
end

def init(number) do
{:producer_consumer, number} # 中間加工業者 宣言
end

def handle_events(events, _from, number) do # データ受取った時にする仕事を宣言
events = Enum.map(events, & &1 * number) # 加工処理の本体
{:noreply, events, number} # 次の工程へ
end
end




Cモジュール(cosumer)

initで消費者タイプを登録

handle_eventsで、1秒ごとに前のステージBから送られてきたリストを表示します


C

defmodule C do

use GenStage

def start_link() do
GenStage.start_link(C, :ok)
end

def init(:ok) do
{:consumer, :the_state_does_not_matter} # "消費者" 宣言
end

def handle_events(events, _from, state) do
Process.sleep(1000) # 1秒待つ
IO.inspect(events) # リストを表示
{:noreply, [], state} # 消費者なので、次のステージに渡すリスト(events)はなし
end
end




購読(配管)

ここも、解説することがないぐらいシンプルです。

前半3行でABCそれぞれのプロセスを引数を与えて起動、後半2行で各ステージの購読先を登録。

これだけです。

{:ok, a} = A.start_link(0)  # 0から始めます

{:ok, b} = B.start_link(2) # 2を掛けます
{:ok, c} = C.start_link() # 状態は不要

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

5行目を書き終えたら、配管が完成するのでストリームが流れ出します。

gen_stage1.gif

1秒ごとに、2倍されたカウンターのリストが表示されると思います。(gifは0.5秒)

永遠に流れ続けるので、Ctrl+Cで適当に終わってくだい。

サンプルを動かすと、どういうものか一目瞭然だと思います。



次回予告

お疲れ様でした!

次回はElixirのGenStageに入門する#2 バックプレッシャーを理解するになります。

明日はzacky1972さんによる

ZEAM開発ログv0.1.0 Flow / GenStage による並列プログラミング入門です。