18
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

GenStageを使ってRateLimiterを実装する

Last updated at Posted at 2017-12-17

GenStageとは

GenStageはプロデューサーとコンシューマーの間で、バックプレッシャー下でのイベント交換を行うElixirのbehaviourです。
コンシューマーからのdemand(要求)を受けてプロデューサーがイベントを送信するので、イベントの流量を調節することができるのが特徴です。

joseさんがGenStageを作った動機や細かい説明はこちら
https://elixir-lang.org/blog/2016/07/14/announcing-genstage/
翻訳版はこちらです(ありがとうございます)
https://qiita.com/k1complete/items/1559e9708064a23042df

また、GenStageのラッパーとしてFlowがあります。コレクションを操作するときに簡単に並行性を活用するために開発されています。

Flowの分かりやすい説明はこちらです(ありがとうございます)
https://qiita.com/shufo/items/59d1c3b0baac6751777f

今までFlowを使うことはありましたがその土台となっているGenStageについては詳しく知らなかったので、理解するためにRateLimiterとして動くワーカを実装してみました。
(DynamodbのWCUをlimitとしてそれを超えないように実装しています)

GenStageのCallback関数

use GenStageでデフォルトのCallback関数を定義してくれます。GenStage固有のCallbackもしくは指定するパラメータが固有のものは以下です。

init(args)

stageのtypeを指定します。以下の三つから指定します。

  • :producer
  • :consumer
  • :producer_consumer
# コードはhttps://hexdocs.pm/gen_stage/GenStage.html#c:init/1より引用

def init(args) do
  {:producer, state}
end

返却値はoptionsパラメータも指定できて、stageのtype毎にそれぞれ指定できる内容が違います。

  • :producerの場合は:dispatcherを指定することでdemandを処理するdispatcherのタイプを決めることができて、デフォルトはGenStage.DemandDispatcherです。
    • 他にもGenStage.BroadcastDispatcaherGenStage.PartitionDispatcherがあります。
  • consumerの場合は:subscribe_toで、どのプロデューサーにsubscribeするか指定できます。

他にも指定できるパラメータがあります。詳細は公式ドキュメントを読んでください。

handle_demand/2

demandを受けて:producerステージで呼び出されます。引数にはdemand数が渡されます。
:producerの場合は必ずこのCallback関数を実装します。

# コードはhttps://hexdocs.pm/gen_stage/GenStage.html#c:handle_demand/2より引用

def handle_demand(demand, state) do
  # stateの中にeventを保持していると仮定する
  {dispatch_events, remaining} = Enum.split(state.events, demand) # demandの数だけeventを取り出す

  {:noreply, dispatch_events, %{state | events: remaining}}
end

handle_events/3

プロデューサーから送信されたeventsを処理します。:consumer:producer_consumerステージで呼び出されます。
:consumerもしくは:producer_consumerの場合は必ずこのCallback関数を実装します。

handle_subscribe/4

コンシューマーがプロデューサーをsubscribeした時に呼び出されます。:producerステージと :consumerステージのどちらでも呼び出されます。
:consumerの場合、返却値でsubscribeの形をオートにするかマニュアルにするか指定できます。ここをマニュアルにすることで、例えば時間間隔ごとにイベント処理数を制限したワーカを実装することができます。

# コードはhttps://hexdocs.pm/gen_stage/GenStage.html#c:handle_subscribe/4より引用

def handle_subscribe(:producer, opts, from, state) do
  # なんらかの処理

  {:manual, state} # manualのsubscribeに指定。:consumerステージの場合のみ指定できる
end

:manualを指定した場合は、GenStage.ask(from, demand)でプロデューサーにdemand数を直接送信します。

RateLimitter

それではGenStageを使ってRateLimiterを実装してみます。公式のリポジトリにサンプルがあるのでそれも参考にします。
ここではDynamodbのWCUを超えないように、定量的に書き込みを行う想定で実装しました。
細かいチューニングは全くできていないのですが、仮でコンシューマーが4つ、それぞれ10件ずつ書き込むように実装します。

                                     ________
                   [Consumer] ----> |        |
                 /                  |        |
                / /[Consumer] ----> |   DB   |
  [Producer] --<--                  |        |
                \ \[Consumer] ----> |        |
                 \                  |        |
                   [Consumer] ----> |________|
                                    

書き込む対象はCSVファイルから読み込むこととします。

プロデューサーはdemandを受けるごとに書き込み対象をコンシューマーに送ります。

producer.ex
defmodule ExCdm.Contents.Producer do
  use GenStage

  def start_link(_) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  @impl GenStage
  def init(_) do
    # member_idが記録されているCSVファイルを読み込みます
    member_ids =
      File.stream!("member_ids.csv")
      |> Stream.map(&(String.trim/1))
      |> Enum.to_list

    # 読み込んだidをstateとして保持します。
    {:producer, member_ids}
  end

  @impl GenStage
  def handle_demand(demand, member_ids) do
    # stateからdemand数の分だけidを取り出します
    {ids, remaining} = Enum.split(member_ids, demand)

    # idsをコンシューマーに送ります
    {:noreply, ids, remaining}
  end
end

続いてコンシューマーです。

dynamo_writer.ex
defmodule ExCdm.Contents.DynamoWriter do
  use GenStage

  def start_link(name) do
    GenStage.start_link(__MODULE__, [], name: name)
  end

  @impl GenStage
  def init(_) do
    # Producerをsubscribeするように指定します
    {:consumer, %{}, subscribe_to: [ExCdm.Contents.Producer]}
  end

  #この関数はsubscribe設定時に一度だけ呼び出される
  @impl GenStage
  def handle_subscribe(:producer, opts, from, producers) do    
    demand = opts[:max_demand] || 10
    interval = opts[:interval] || 500

    producers = Map.put(producers, from, {demand, interval})
    producers = ask_and_schedule(producers, from)

    # subscribeを:manual形式にします
    {:manual, producers}
  end

  @impl GenStage
  def handle_cancel(_, from, producers) do
    {:noreply, [], Map.delete(producers, from)}
  end

  @impl GenStage
  def handle_events(events, from, producers) do
    # 処理を一度行うとask_and_scheduleでdemandを0にしているのでここで再度demandを追加する
    producers = Map.update!(producers, from, fn({demand, interval}) ->
      {demand + length(events), interval}
    end)

    for member_id <- events do
      ExCdm.Dynamo.insert_content(String.to_integer(member_id))
    end

    {:noreply, [], producers}
  end

  @impl GenStage
  def handle_info({:ask, from}, producers) do
    {:noreply, [], ask_and_schedule(producers, from)}
  end

  defp ask_and_schedule(producers, from) do
    case producers do
      %{^from => {demand, interval}} ->
        # ここでプロデューサーにdemand数を送信している
        GenStage.ask(from, demand)
        
        # intervalミリ秒後にhandle_info関数を呼び出すことで決められた時間ごとに処理を行うようにする
        Process.send_after(self(), {:ask, from}, interval)

        # 処理が終わったらstateで保持しているdemand数を0にセットする
        Map.put(producers, from, {0, interval})

      %{} ->
        producers
    end
  end
end

処理の流れは次のようになっています。

  1. handle_subscribe/4が呼び出されます。プライベート関数を呼び出してその中でdemand数をプロデューサーに送信しています(GenStage.ask/2)。subscribe形式は:manualを指定します。
  2. プロデューサーからのeventsを受け取るためにhandle_events/3が呼び出されます。ここでDynamodbのテーブルにevents引数に渡された件数分だけレコードを書き込みます。
  3. 前に起動したProcess.send_after/3からhandle_info/2が呼び出されます。この中で再度プライベート関数ask_and_schedule/2を呼び出します。
  4. ask_and_schedule/2の中でdemand数をプロデューサーに送信し、Process.send_after/3からhandle_info/2を呼び出します。
  5. 以降は2と3と4の繰り返しです

プロデューサーとコンシューマーをアプリケーション起動時に実行できるようにします。

application.ex
def start(_type, _args) do
  children = [
    Supervisor.child_spec({ExCdm.Contents.Producer, []}, id: :producer),
    Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_1}, id: :my_worker_1),
    Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_2}, id: :my_worker_2),
    Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_3}, id: :my_worker_3),
    Supervisor.child_spec({ExCdm.Contents.DynamoWriter, :my_worker_4}, id: :my_worker_4) 
  ]

  opts = [strategy: :one_for_one, name: ExCdm.Supervisor]
  Supervisor.start_link(children, opts)
end

まとめ

実行結果ですが、WCUを超えない制限下で定量的に書き込むことができました。

Screenshot 2017-12-17 22.09.57.png

まだ単純な実装しかできていないですが、今後はエラー時のリトライ処理、送信するeventがない時はコンシューマからのdemandをバッファリング、コンシューマがまだ処理する準備ができていない場合はeventをバッファリングする処理を作り込んでいけば、もう少し実用的なプログラムになりそうです。

もっとGenStageを使い込んでみたいです。

Resources

18
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?