LoginSignup
15

More than 5 years have passed since last update.

ElixirのGenStageに入門する #1

Last updated at Posted at 2018-04-28

この記事は、「Elixir or Phoenix Advent Calendar 2017」の5日目です。
昨日は @tuchiro さんの「ElixirでSI開発入門 #1 Ectoで悲観的ロック」でした。

fukuoka.ex

twinbeeことenぺだーしと申します。
福岡でfukuoka.exという活動をやっております。

fukuokaex_tuchiro_enpedasi.jpg

先日こちらサイトでfukuoka.exのコアメンバーとして取材を受けました。福岡でのElixirのコミュニティー活動に興味がある方は是非ご覧ください。

FlowとGenStage

ElixirにはFlowという並列ストリームを直観的に使えるライブラリがありますが、本格的に使おうとすると、GenStageの知識が必用になってきます。名前の通りStageがどういうものかも直観的に理解ができます。

Flowの簡潔さに比べて、GenStageのコードは一見わかりにくいのですが、定義の一つ一つは簡単です。JavaやC#、Javascriptなどオブジェクト指向から入ってこられた方は、Behaiviorのコールバック実装が継承クラスのメソッド実装に似ているので、こちらのほうが馴染みやすいかもしれません。

公式ドキュメントを元にGenStageに入門してみましょう。

まずはプロジェクトを作成します。

 # mix new gentest
 # cd gentest
mix.exs
  defp deps do
    [
      {:gen_stage, "~> 0.13.1"} # 追加
    ]
 # mix deps.get

実装の手順

サンプルプログラムは、数字をカウントしながら無限にリストを流すという内容です。
最後のほうにアニメーションGIFがありますので、そちらをご覧になるとイメージが掴めると思います。
今回出来上がったプログラムはA,B,Cはそれぞれ並列ですが、各工程はシングルプロセスで動作します。

ステージには3種類のタイプがあります。
ストリームの入り口から消費先まで、3段階のステージが分かれています。

A) Producer ストリーム発生元(生産者・供給者)
B) Producer-Consumer 中間加工(生産者-消費者)
C) Consumer 結果 (消費者)

今回の例では以下の流れで実装していきます。
1.上記ABCそれぞれに、コールバックを設置する。
2.ABCの購読(配管)を行う。

「ABCの各ステージを作って購読(配管)すれば、勝手にストリームが流れ出す!」それがGenStageです。(GenStageの一部です)


各モジュールの実装

以下ABCモジュールは同一ファイルに書いてもOKです。lib\gen.ex等適宜ファイルを作成してコードを書いて(コピペして)いきましょう。

Aモジュール (Producer)

defmodule直後のuse句はマクロ使用の予約語でGenStageはマクロを利用した言語拡張で書かれていることがわかります。

モジュール内の下記関数はコールバックなので、手動で呼び出すことはなく、処理の流れの中でイベントを受け取った際に勝手に実行されます。

  • init ステージのタイプ:producerの宣言、カウンター初期値を設定しています。
  • handle_demand 引数のdemandは要求数、counterは現在値です。現在値~{現在値+要求数)のリストを作って次のタプルを返します。
{ :noreply, 結果のリスト, 次回の開始値 }

ソースを見ると一目瞭然なので、解説読むほうがしんどいかもしれませんね。

生産者なのに、需要であるdemandが引数になってるのがポイントですが、何か変だな?と感じた方は鋭いです! それは次回に解説します。

A
defmodule A do
  use GenStage #「このモジュールはGenStageとして使用する」宣言

  def start_link(number) do
    GenStage.start_link(A, number) # 本モジュールをプロセスに登録
  end

  def init(counter) do
    {:producer, counter}           # "生産者"宣言
  end

  def handle_demand(demand, counter) when demand > 0 do
    events = Enum.to_list(counter..counter+demand-1) #リストを作成
    {:noreply, events, counter + demand}
  end
end

形式的には、終始この形で下記3種のコールバックを書いていく流れとなります。
これ以上難しいものは、もう出てきません。

  • start_link … モジュールをプロセスとして起動する際に呼び出されるコールバック
  • init … 初期化コールバック
  • handle_events / handle_demand … データの受取、需要発生時のコールバック

Bモジュール (producer-consumer)

ステージAから送られてきたデータに、numberを掛けて次のステージに流す中間加工所です

  • initで生産者-消費者タイプを登録
  • handle_eventsで、リストの各要素にnumberを掛け合わせます。
B
defmodule B do
  use GenStage

  def start_link(number) do
    GenStage.start_link(B, number) # 本モジュールをプロセスとして登録
  end

  def init(number) do
    {:producer_consumer, number} # 中間加工業者 宣言
  end

  def handle_events(events, _from, number) do # データ受取った時にする仕事を宣言
    events = Enum.map(events, & &1 * number)  # 加工処理の本体 
    {:noreply, events, number}                # 次の工程へ
  end
end

Cモジュール(cosumer)

initで消費者タイプを登録
handle_eventsで、1秒ごとに前のステージBから送られてきたリストを表示します

C
defmodule C do
  use GenStage

  def start_link() do
    GenStage.start_link(C, :ok)
  end

  def init(:ok) do
    {:consumer, :the_state_does_not_matter} # "消費者" 宣言
  end

  def handle_events(events, _from, state) do
    Process.sleep(1000)   # 1秒待つ
    IO.inspect(events)    # リストを表示
    {:noreply, [], state} # 消費者なので、次のステージに渡すリスト(events)はなし
  end
end

購読(配管)

ここも、解説することがないぐらいシンプルです。
前半3行でABCそれぞれのプロセスを引数を与えて起動、後半2行で各ステージの購読先を登録。

これだけです。

{:ok, a} = A.start_link(0)  # 0から始めます
{:ok, b} = B.start_link(2)  # 2を掛けます
{:ok, c} = C.start_link()   # 状態は不要

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

5行目を書き終えたら、配管が完成するのでストリームが流れ出します。

gen_stage1.gif

1秒ごとに、2倍されたカウンターのリストが表示されると思います。(gifは0.5秒)
永遠に流れ続けるので、Ctrl+Cで適当に終わってくだい。

サンプルを動かすと、どういうものか一目瞭然だと思います。


次回予告

お疲れ様でした!
次回はElixirのGenStageに入門する#2 バックプレッシャーを理解するになります。

明日はzacky1972さんによる
ZEAM開発ログv0.1.0 Flow / GenStage による並列プログラミング入門です。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15