24
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ElixirのGenStageに入門する#2 バックプレッシャーを理解する

Last updated at Posted at 2018-05-03

(この記事は、「Elixir or Phoenix Advent Calendar 2017」の10日目です)

前日は@tuchiroさんのElixirでSI開発入門 #2 Ectoで楽観的ロック でした。

本掲載はElixirのGenStageに入門する#1の続きです。

前回は:producer, :producer_consumer, :consumer の三段階のステージで整数カウンターのストリーム処理を行いました。

バックプレッシャー (1背圧)

さて、前回:producerのところで、handle_demandというコールバックが登場したのを覚えてらっしゃいますでしょうか? 生産者なのに需要がトリガーとなっています。

つまり、GenStageは需要トリガーを元にストリームイベントを処理するライブラリなのです。需要トリガーで供給を引っ張ることを、ストリーム界隈(?)ではバックプレッシャー(1背圧)といいます。

「ストリームって、Twitterのタイムラインみたいに勝手に流れてくるもんじゃないの?消費者の都合で処理してたら取りこぼすよ?」と思われた方は鋭どいです。こういった場合は、:producerでのキューイングを自前で実装する必要があります。

GenStage公式ではキューイング実装のサンプルも載っているので、今回はこれを読み解きながらバックプレッシャー主体のストリームについて理解を深めていきます。


ストリームを支える仕組み

ここまでは快調だったGenStage入門ですが、実際足を踏み入れるとサンプルが極端に少ない手探りの世界です。ストリームを支える仕組みを知るべく、まずは一番煩雑に見えるinitコールバックを図表現にしてみました。

公式のinit/1を、多少ビジュアルにしただけですが、少しは先に進むモチベーションが上がるのではないでしょうか?


ディスパッチャー

:producer:producer_consumerのみで指定する、ディスパッチャーの種類を解説します。生産系ステージと消費系ステージ間でイベントのディスパッチ(配布)を行うのに、3種類の方法が用意されています。

DemandDispatcher

デフォルトのDemandDispatcherです。demandはポンプの記号で表現しているつもりです。バックプレッシャー(背圧)によって、イベントが引っ張られる様子を図解しています。

GenStage#1 Demand.png


PartitionDispatcher

イベントデータの分類によって処理を変えたい場合は、PartitionDispacherの出番です。

PartitionDispatcherを選択した場合は、Hash関数を自作してイベント内容によって使用する消費者割り当てをカスタマイズできます。Hash関数を省略すると、デフォルトの:erlang.phash2が使われます。

(実はElixir Flowで登場するpartition関数はこのDispatcherのことだったりします。)

GenStage#PartitionDISP.png

これは一つの例ですが、ディスパッチ途中でConsumer#2が落ちると、何らかの方法で#2配布分のイベント消費が行われない限り、本ステージ全体の消費が止まってしまいます。実装する場合は、こういったケースも考慮する必用があります。


BroadcastDispatcher

Broadcastでは、購読している消費者に対してeventの同時配信を行います。

GenStage#BoadDISP.png


動かしてみる

BroadcastDispatcherのサンプル

BroadcastDispatcherは公式にサンプルがあるので、動かしてみます。

このサンプルは単なるブロードキャストだけでなく、先に説明したproducer側キューイングを含んでいます。


  @doc "Sends an event and returns only after the event is dispatched."
  def sync_notify(event, timeout \\ 5000) do
    GenStage.call(__MODULE__, {:notify, event}, timeout)
  end

  ## Callbacks

  def init(:ok) do
    {:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_call({:notify, event}, from, {queue, pending_demand}) do
    queue = :queue.in({from, event}, queue)
    dispatch_events(queue, pending_demand, [])
  end

このコードの重要ポイントは、handle_callコールバックです。handle_demandとは別の入り口を作り、生産者からのイベントを自前でキューイングするMyイベントを定義しています。第一引数{:notify, events}でパターンマッチングを掛けて、Myイベントである:notifyとペアにしたeventsを取り出してキューイングを行ってます。そのまま外部からGenStage.callで呼び出すのは、使いづらいのでsync_notify関数でラップをしています。

出典コードは[公式のQueueBroadcasterPrinterの2つのモジュール]
(https://hexdocs.pm/gen_stage/GenStage.html#module-buffering-demand)以下です。
最後のQueueBroadcaster.sync_notify(:hello_world)でキューイングを行います。

次のアニメーションGIFでは、イベントとして文字列とリストを順次投下してます。

sync_notifyで投入したイベントが、購読しているステージにブロードキャストされたのがわかります。

次は、キューイングを試してみます。
購読する消費者を登録せずに、イベントを投入してみます。

sync_notifyは同期関数なので、イベントが消費されるまで待ちます(そういう仕様です)が、消費者不在ですのでタイムアウトで抜けてきます。(タイムアウトをキャッチする場合はtry catch で行いますが、今回は扱いません。)

その後で、Printer.start_link()で消費者を登録すると、先ほど投入したイベントが配信されてるのが判ります。
 


PartitionDispatcherのサンプル

PartitionDispatcherはGenStageのプロトタイプ版のサンプルしか見つからなかったので、現行世代のGenStageで動かしてみました。

1..10の整数を、3つに振り分ける簡単なHash関数を定義して3つの消費者に1秒毎に振り分けています。

冒頭のchcp 65001は、WindowsのUTF-8ワイド文字の文字化け対策です
Consumerがtypoってますが、見逃して下さい (*´ڡ`●)

コードのポイントは、initコールバック中のpartitionsとhash関数定義部分です。partiotionの設定を見てわかるとおり、:producer側で自由にAtomのリストを設定しています。こうするとで、Hashによるパーティション分類を直観的なものにしています。この部分は単純な整数にすることも可能で、柔軟な設定が可能です。

  def init(:ok) do
    hash_fun = fn event ->
      {event, case rem(event, 3) do
                 0 -> :第一
                 1 -> :第二
                 2 -> :第三
               end
      } end
    {:producer_consumer, %{},
      dispatcher: {GenStage.PartitionDispatcher,
                    partitions: [:第一, :第二, :第三],
                    hash: hash_fun}
    }
  end

Gistに全体をUPしました。


並列処理

前回の例では、生産者から消費者まで1プロセスずつで、各生産タイプは並列に動作するものの、シングルプロセスでした。 今回のBoadcastDispatcherやPartitionDispatcherでは、上流に対して複数の購読を行っており、その場合は生産者が並列動作します。もちろん、デフォルトであるDemandDispatcherでも並列で動作します。

アニメーションGIFでは、すべて違うPID(プロセスID)が割り当てられてるのが見て取れます。

さらにGenStageを深めたい方

@kanmo さんのGenStageを使ってRateLimiterを実装するをご一読下さい。公式のRateLimiterをAWSのDynamoDb用に実装した実用的事例が投稿されています。本記事を書く上でも、かなり参考にさせて頂きました。ありがとうございます!

終わりに

ストリーミング中に消費者の一部が落ちた場合など、いろんなケースが考えられると思いますが、ドキュメントに言及してあるものの、実際はソースコードを読みながら解決していく必要があります。徐々に解読を進めたいと思います。

:stars: :stars: :stars: :stars: :stars:

コミュニティfukuoka.exではElixirを発展・進化させるべく定期的にイベントを行っています。興味のある皆様は気軽にご参加下さい!

:stars: :stars: :stars: :stars: :stars:

GenStage入門は今回で一旦終了して、次回は「Elixir並列処理「Flow」の2段ステージ構造を理解する」になります。

明日は @zacky1972 さんの「ZEAM開発ログv0.1.1 AI/MLを爆速にしたい! Flow / GenStage でGPUを駆動できないの?」です。

  1. 日本語の背圧は本来の用語はストリームのバックプレッシャーとは意味が異なるのですが、近年この意味も加わりつつあるので併記としています。 2

24
9
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?