(この記事は、「Elixir or Phoenix Advent Calendar 2017」の10日目です)
前日は@tuchiroさんのElixirでSI開発入門 #2 Ectoで楽観的ロック でした。
本掲載はElixirのGenStageに入門する#1の続きです。
前回は:producer
, :producer_consumer
, :consumer
の三段階のステージで整数カウンターのストリーム処理を行いました。
バックプレッシャー (1背圧)
さて、前回:producer
のところで、handle_demandというコールバックが登場したのを覚えてらっしゃいますでしょうか? 生産者なのに需要がトリガーとなっています。
つまり、GenStageは需要トリガーを元にストリームイベントを処理するライブラリなのです。需要トリガーで供給を引っ張ることを、ストリーム界隈(?)ではバックプレッシャー(1背圧)といいます。
「ストリームって、Twitterのタイムラインみたいに勝手に流れてくるもんじゃないの?消費者の都合で処理してたら取りこぼすよ?」と思われた方は鋭どいです。こういった場合は、:producer
でのキューイングを自前で実装する必要があります。
GenStage公式ではキューイング実装のサンプルも載っているので、今回はこれを読み解きながらバックプレッシャー主体のストリームについて理解を深めていきます。
ストリームを支える仕組み
ここまでは快調だったGenStage入門ですが、実際足を踏み入れるとサンプルが極端に少ない手探りの世界です。ストリームを支える仕組みを知るべく、まずは一番煩雑に見えるinitコールバックを図表現にしてみました。
公式のinit/1を、多少ビジュアルにしただけですが、少しは先に進むモチベーションが上がるのではないでしょうか?
ディスパッチャー
:producer
と:producer_consumer
のみで指定する、ディスパッチャーの種類を解説します。生産系ステージと消費系ステージ間でイベントのディスパッチ(配布)を行うのに、3種類の方法が用意されています。
DemandDispatcher
デフォルトのDemandDispatcher
です。demandはポンプの記号で表現しているつもりです。バックプレッシャー(背圧)によって、イベントが引っ張られる様子を図解しています。
PartitionDispatcher
イベントデータの分類によって処理を変えたい場合は、PartitionDispacherの出番です。
PartitionDispatcherを選択した場合は、Hash関数を自作してイベント内容によって使用する消費者割り当てをカスタマイズできます。Hash関数を省略すると、デフォルトの:erlang.phash2が使われます。
(実はElixir Flowで登場するpartition関数はこのDispatcherのことだったりします。)
これは一つの例ですが、ディスパッチ途中でConsumer#2が落ちると、何らかの方法で#2配布分のイベント消費が行われない限り、本ステージ全体の消費が止まってしまいます。実装する場合は、こういったケースも考慮する必用があります。
BroadcastDispatcher
Broadcastでは、購読している消費者に対してeventの同時配信を行います。
動かしてみる
BroadcastDispatcherのサンプル
BroadcastDispatcherは公式にサンプルがあるので、動かしてみます。
このサンプルは単なるブロードキャストだけでなく、先に説明したproducer側キューイングを含んでいます。
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
このコードの重要ポイントは、handle_callコールバックです。handle_demand
とは別の入り口を作り、生産者からのイベントを自前でキューイングするMyイベントを定義しています。第一引数{:notify, events}でパターンマッチングを掛けて、Myイベントである:notify
とペアにしたeventsを取り出してキューイングを行ってます。そのまま外部からGenStage.callで呼び出すのは、使いづらいのでsync_notify関数でラップをしています。
出典コードは[公式のQueueBroadcaster
と Printer
の2つのモジュール]
(https://hexdocs.pm/gen_stage/GenStage.html#module-buffering-demand)以下です。
最後のQueueBroadcaster.sync_notify(:hello_world)
でキューイングを行います。
次のアニメーションGIFでは、イベントとして文字列とリストを順次投下してます。
sync_notifyで投入したイベントが、購読しているステージにブロードキャストされたのがわかります。
次は、キューイングを試してみます。
購読する消費者を登録せずに、イベントを投入してみます。
sync_notify
は同期関数なので、イベントが消費されるまで待ちます(そういう仕様です)が、消費者不在ですのでタイムアウトで抜けてきます。(タイムアウトをキャッチする場合はtry catch で行いますが、今回は扱いません。)
その後で、Printer.start_link()で消費者を登録すると、先ほど投入したイベントが配信されてるのが判ります。
PartitionDispatcherのサンプル
PartitionDispatcherはGenStageのプロトタイプ版のサンプルしか見つからなかったので、現行世代のGenStageで動かしてみました。
1..10の整数を、3つに振り分ける簡単なHash関数を定義して3つの消費者に1秒毎に振り分けています。
冒頭のchcp 65001
は、WindowsのUTF-8ワイド文字の文字化け対策です
Consumerがtypoってますが、見逃して下さい (*´ڡ`●)
コードのポイントは、initコールバック中のpartitionsとhash関数定義部分です。partiotionの設定を見てわかるとおり、:producer
側で自由にAtomのリストを設定しています。こうするとで、Hashによるパーティション分類を直観的なものにしています。この部分は単純な整数にすることも可能で、柔軟な設定が可能です。
def init(:ok) do
hash_fun = fn event ->
{event, case rem(event, 3) do
0 -> :第一
1 -> :第二
2 -> :第三
end
} end
{:producer_consumer, %{},
dispatcher: {GenStage.PartitionDispatcher,
partitions: [:第一, :第二, :第三],
hash: hash_fun}
}
end
Gistに全体をUPしました。
並列処理
前回の例では、生産者から消費者まで1プロセスずつで、各生産タイプは並列に動作するものの、シングルプロセスでした。 今回のBoadcastDispatcherやPartitionDispatcherでは、上流に対して複数の購読を行っており、その場合は生産者が並列動作します。もちろん、デフォルトであるDemandDispatcherでも並列で動作します。
アニメーションGIFでは、すべて違うPID(プロセスID)が割り当てられてるのが見て取れます。
さらにGenStageを深めたい方
@kanmo さんのGenStageを使ってRateLimiterを実装するをご一読下さい。公式のRateLimiterをAWSのDynamoDb用に実装した実用的事例が投稿されています。本記事を書く上でも、かなり参考にさせて頂きました。ありがとうございます!
終わりに
ストリーミング中に消費者の一部が落ちた場合など、いろんなケースが考えられると思いますが、ドキュメントに言及してあるものの、実際はソースコードを読みながら解決していく必要があります。徐々に解読を進めたいと思います。
コミュニティfukuoka.exではElixirを発展・進化させるべく定期的にイベントを行っています。興味のある皆様は気軽にご参加下さい!
GenStage入門は今回で一旦終了して、次回は「Elixir並列処理「Flow」の2段ステージ構造を理解する」になります。
明日は @zacky1972 さんの「ZEAM開発ログv0.1.1 AI/MLを爆速にしたい! Flow / GenStage でGPUを駆動できないの?」です。