GenStageとは
GenStageはプロデューサーとコンシューマーの間で、バックプレッシャー下でのイベント交換を行うElixirのbehaviourです。
コンシューマーからのdemand(要求)を受けてプロデューサーがイベントを送信するので、イベントの流量を調節することができるのが特徴です。
joseさんがGenStageを作った動機や細かい説明はこちら
https://elixir-lang.org/blog/2016/07/14/announcing-genstage/
翻訳版はこちらです(ありがとうございます)
https://qiita.com/k1complete/items/1559e9708064a23042df
また、GenStageのラッパーとしてFlowがあります。コレクションを操作するときに簡単に並行性を活用するために開発されています。
Flowの分かりやすい説明はこちらです(ありがとうございます)
https://qiita.com/shufo/items/59d1c3b0baac6751777f
今までFlowを使うことはありましたがその土台となっているGenStageについては詳しく知らなかったので、理解するためにRateLimiterとして動くワーカを実装してみました。
(DynamodbのWCUをlimitとしてそれを超えないように実装しています)
GenStageのCallback関数
use GenStage
でデフォルトのCallback関数を定義してくれます。GenStage
固有のCallbackもしくは指定するパラメータが固有のものは以下です。
init(args)
stageのtypeを指定します。以下の三つから指定します。
:producer
:consumer
:producer_consumer
# コードはhttps://hexdocs.pm/gen_stage/GenStage.html#c:init/1より引用
def init(args) do
{:producer, state}
end
返却値はoptionsパラメータも指定できて、stageのtype毎にそれぞれ指定できる内容が違います。
-
:producer
の場合は:dispatcher
を指定することでdemandを処理するdispatcherのタイプを決めることができて、デフォルトはGenStage.DemandDispatcher
です。- 他にも
GenStage.BroadcastDispatcaher
やGenStage.PartitionDispatcher
があります。
- 他にも
-
consumer
の場合は:subscribe_to
で、どのプロデューサーにsubscribeするか指定できます。
他にも指定できるパラメータがあります。詳細は公式ドキュメントを読んでください。
handle_demand/2
demandを受けて:producer
ステージで呼び出されます。引数にはdemand数が渡されます。
:producer
の場合は必ずこのCallback関数を実装します。
# コードはhttps://hexdocs.pm/gen_stage/GenStage.html#c:handle_demand/2より引用
def handle_demand(demand, state) do
# stateの中にeventを保持していると仮定する
{dispatch_events, remaining} = Enum.split(state.events, demand) # demandの数だけeventを取り出す
{:noreply, dispatch_events, %{state | events: remaining}}
end
handle_events/3
プロデューサーから送信されたeventsを処理します。:consumer
と:producer_consumer
ステージで呼び出されます。
:consumer
もしくは:producer_consumer
の場合は必ずこのCallback関数を実装します。
handle_subscribe/4
コンシューマーがプロデューサーをsubscribeした時に呼び出されます。:producer
ステージと :consumer
ステージのどちらでも呼び出されます。
:consumer
の場合、返却値でsubscribeの形をオートにするかマニュアルにするか指定できます。ここをマニュアルにすることで、例えば時間間隔ごとにイベント処理数を制限したワーカを実装することができます。
# コードはhttps://hexdocs.pm/gen_stage/GenStage.html#c:handle_subscribe/4より引用
def handle_subscribe(:producer, opts, from, state) do
# なんらかの処理
{:manual, state} # manualのsubscribeに指定。:consumerステージの場合のみ指定できる
end
:manual
を指定した場合は、GenStage.ask(from, demand)
でプロデューサーにdemand数を直接送信します。
RateLimitter
それではGenStageを使ってRateLimiterを実装してみます。公式のリポジトリにサンプルがあるのでそれも参考にします。
ここではDynamodbのWCUを超えないように、定量的に書き込みを行う想定で実装しました。
細かいチューニングは全くできていないのですが、仮でコンシューマーが4つ、それぞれ10件ずつ書き込むように実装します。
________
[Consumer] ----> | |
/ | |
/ /[Consumer] ----> | DB |
[Producer] --<-- | |
\ \[Consumer] ----> | |
\ | |
[Consumer] ----> |________|
書き込む対象はCSVファイルから読み込むこととします。
プロデューサーはdemandを受けるごとに書き込み対象をコンシューマーに送ります。
defmodule ExCdm.Contents.Producer do
use GenStage
def start_link(_) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
@impl GenStage
def init(_) do
# member_idが記録されているCSVファイルを読み込みます
member_ids =
File.stream!("member_ids.csv")
|> Stream.map(&(String.trim/1))
|> Enum.to_list
# 読み込んだidをstateとして保持します。
{:producer, member_ids}
end
@impl GenStage
def handle_demand(demand, member_ids) do
# stateからdemand数の分だけidを取り出します
{ids, remaining} = Enum.split(member_ids, demand)
# idsをコンシューマーに送ります
{:noreply, ids, remaining}
end
end
続いてコンシューマーです。
defmodule ExCdm.Contents.DynamoWriter do
use GenStage
def start_link(name) do
GenStage.start_link(__MODULE__, [], name: name)
end
@impl GenStage
def init(_) do
# Producerをsubscribeするように指定します
{:consumer, %{}, subscribe_to: [ExCdm.Contents.Producer]}
end
#この関数はsubscribe設定時に一度だけ呼び出される
@impl GenStage
def handle_subscribe(:producer, opts, from, producers) do
demand = opts[:max_demand] || 10
interval = opts[:interval] || 500
producers = Map.put(producers, from, {demand, interval})
producers = ask_and_schedule(producers, from)
# subscribeを:manual形式にします
{:manual, producers}
end
@impl GenStage
def handle_cancel(_, from, producers) do
{:noreply, [], Map.delete(producers, from)}
end
@impl GenStage
def handle_events(events, from, producers) do
# 処理を一度行うとask_and_scheduleでdemandを0にしているのでここで再度demandを追加する
producers = Map.update!(producers, from, fn({demand, interval}) ->
{demand + length(events), interval}
end)
for member_id <- events do
ExCdm.Dynamo.insert_content(String.to_integer(member_id))
end
{:noreply, [], producers}
end
@impl GenStage
def handle_info({:ask, from}, producers) do
{:noreply, [], ask_and_schedule(producers, from)}
end
defp ask_and_schedule(producers, from) do
case producers do
%{^from => {demand, interval}} ->
# ここでプロデューサーにdemand数を送信している
GenStage.ask(from, demand)
# intervalミリ秒後にhandle_info関数を呼び出すことで決められた時間ごとに処理を行うようにする
Process.send_after(self(), {:ask, from}, interval)
# 処理が終わったらstateで保持しているdemand数を0にセットする
Map.put(producers, from, {0, interval})
%{} ->
producers
end
end
end
処理の流れは次のようになっています。
-
handle_subscribe/4
が呼び出されます。プライベート関数を呼び出してその中でdemand数をプロデューサーに送信しています(GenStage.ask/2
)。subscribe形式は:manual
を指定します。 - プロデューサーからのeventsを受け取るために
handle_events/3
が呼び出されます。ここでDynamodbのテーブルにevents引数に渡された件数分だけレコードを書き込みます。 - 前に起動した
Process.send_after/3
からhandle_info/2
が呼び出されます。この中で再度プライベート関数ask_and_schedule/2
を呼び出します。 -
ask_and_schedule/2
の中でdemand数をプロデューサーに送信し、Process.send_after/3
からhandle_info/2
を呼び出します。 - 以降は2と3と4の繰り返しです
プロデューサーとコンシューマーをアプリケーション起動時に実行できるようにします。
def start(_type, _args) do
children = [
Supervisor.child_spec({ExCdm.Contents.Producer, []}, id: :producer),
Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_1}, id: :my_worker_1),
Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_2}, id: :my_worker_2),
Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_3}, id: :my_worker_3),
Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_4}, id: :my_worker_4)
]
opts = [strategy: :one_for_one, name: ExCdm.Supervisor]
Supervisor.start_link(children, opts)
end
まとめ
実行結果ですが、WCUを超えない制限下で定量的に書き込むことができました。
まだ単純な実装しかできていないですが、今後はエラー時のリトライ処理、送信するeventがない時はコンシューマからのdemandをバッファリング、コンシューマがまだ処理する準備ができていない場合はeventをバッファリングする処理を作り込んでいけば、もう少し実用的なプログラムになりそうです。
もっとGenStageを使い込んでみたいです。
Resources
- Announcing GenStage https://elixir-lang.org/blog/2016/07/14/announcing-genstage/
- [翻訳]Announcing GenStage https://qiita.com/k1complete/items/1559e9708064a23042df
- Elixir Flowでlazyな並列分散処理 https://qiita.com/shufo/items/59d1c3b0baac6751777f
- 公式リポジトリ https://github.com/elixir-lang/gen_stage/tree/master/examples
- 公式ドキュメント https://hexdocs.pm/gen_stage/GenStage.html
- 今回試したコード https://github.com/kanmo/ex_cdm/tree/master/lib/ex_cdm/contents