Edited at

[翻訳]Announcing GenStage

More than 3 years have passed since last update.


はじめに

http://elixir-lang.org/blog/2016/07/14/announcing-genstage/

の翻訳です。これまでみんな大好き pipeline 演算子は後味の悪いシンタックスシュガーに思えててあまり使ってなかったのですが、このGenStage(正確にはGenStage.Flow)は、後味の悪さを解消してくれそうです。

GenStageは、現在 https://github.com/elixir-lang/gen_stage にあります。

本当にElixir(というかBeam)の良さを引き出すためには必須となるフレームワークだと思い、勢いで翻訳しました。


Announcing GenStage


layout: post

title: Announcing GenStage

author: José Valim

category: Announcements

excerpt: GenStage is a new Elixir behaviour for exchanging events with back-pressure between Elixir processes. In this blog post we will cover the background that led us to GenStage, some example use cases, and what we are exploring for future releases.


今日のGenStageの公式リリースのアナウンスを嬉しく思います。GenStageは、Elixirプロセス間のback-pressureの下でのイベントを交換する新しいElixirのbehaviourです。

端的には、GenStageがサードパーティシステムからのデータを消費するコンポーネント可能な抽象物を提供している同様なGenEventのユースケースを置き換えるものと期待しています。

このブログポストで私達がGenStageへ辿り着いた背景と、いくつかのユースケース例、そして、将来のリリースのために調査していることをカバーします。そうではなく、クイックリファレンスを探しているなら、project source codedocumentation をチェックしてみてください。


Background

もともとの動機の一つは、Elixirでのコレクションの作業のためのよりよい抽象の導入の為の実装と設計(creating and designing Elixir was to introduce better abstractions for working with collections) です。それだけでなく、コレクションの操作に関心を持つ開発者に対し、コードをeagerからlazyへ、さらに並列から分散へと移行するパスを提供したかったのです。

簡単ですが、実際的な例: ワードカウンティング で議論しましょう。ワードカウンティングは、一つのファイルを受取り、ドキュメントに表れるそれぞれのワードの登場回数を数えます。Enumモジュールを使って以下のように実装できるでしょう:

File.read!("path/to/some/file")

|> String.split("\n")
|> Enum.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

上のソリューションは問題なく機能し、小さなファイルに対しては効率的である一方、ファイル全てをメモリにロードするので大きな入力については全く限定的です。

上のソリューションにはもう一つ問題があります。それは効率的にそれらを数え始めるまえに、Enum.flat_map/2の段階で、ファイルの全てのワードのための巨大なリストを構築することです。繰り返すと、大きなドキュメントでは、後で横断されるリストを作るために多くのメモリの使用と処理時間の無駄を意味します。

幸運にも、Elixirはこの問題の解決策を提供しています(そして結構まえから提供していました): streamです。streamの利点の一つはそれらがlazyであることで、コレクションをアイテム毎にトラバースすること、この場合だと、データ全体をメモりにロードする代わりに行毎ですが、を許します。上の例をstreamを使って書き換えてみましょう:

File.stream!("path/to/some/file")

|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

File.stream!Stream.flat_mapを使うことで、私達は、そして列挙するときに巨大なリストをメモリに構築することなく、一行毎に発出し、行をワードに分解し、一つずつ発出する怠惰な計算(lazy computation)を構築しました。Stream moduleの関数は丁度私達が実行する計算を表現します。ファイルのトラバースやflat_mapでのワードの分解のような計算自身、Enumモジュールの関数を呼出したときだけに起ります。もう一つのアーティクルthe foundation for Enum and Streams にカバーしてます。

上の解決策は、巨大なデータセットに対してそれを全てメモリにロードすることなく動かすことを私達に許します。巨大なファイルについては、eagerバージョンより良いパフォーマンスを提供するでしょう。しかしながら、上の解決策は並行性をまだ活用していません。私達が利用可能な非常に多くのマシンがそうですが、マルチコアマシンでは、準最適解なのです。

では、上の例でどのように並行性を活用することができるというのでしょうか?

ElixirConf 2015 のキーノートで、I discussed one of the most immediate(私はこの問題の最も即時的な解決策の一つを検討しました)。それはパイプラインを複数のプロセスに分解することをカバーします。

File.stream!("path/to/some/file")

|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Stream.async() # NEW!
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

そのアイディアはStream.asyncで、Enum.reduceを呼出したプロセスへ、別々のプロセスで前の計算を走らせてメッセージを送るというものです。残念なことに、上の解決策は少しも理想的ではありません。

先ず最初に、私達は出来る限りプロセス間でデータを動かすことを避けたいです。その代わりに、並列に同じ計算を実行する複数のプロセスを始めたいのです。それだけでなく、私達が開発者に手でStream.asyncを置くことを要求しているなら、それは効率が悪さやエラーを起しやすい解決に至るかもしません。

上の解決策には多くの欠陥がありますが、私達が右のような疑問をもつことの助けになりました:


  • Stream.asyncが新しいプロセスを導入するなら、私達はどのようにそのプロセスがsuperviseされることを保証できるだろうか?


  • プロセス間でメッセージを交換するとき、多過ぎるメッセージの受信からプロセスをどのようにして防ぐのか? 私達は、受信側プロセスが送信側プロセスからどのくらい取り扱えるのかについてを指定できるback-pressureメカニズムを必要としています。


それらの疑問に答えようとしている異る抽象概念を通して最終的にGenStageに着地しました。


GenStage

GenStageは、Elixirプロセス間のback-pressure付のイベントを交換するための、新しいElixirのビヘイビアです。GenStageを使う開発者は、データがどのように生み出されて、操作されて、消費されるかに取り組むことだけが必要です。データを送って、back-pressureを提供する行為は開発者から離れて完全に抽象化されます。

簡単な例として、数を増やしながらイベントを発生させて、それを2倍し、それから端末に印字するという単純なパイプラインを記述しましょう。私達は3つのステージで実装するでしょう、:producer:producer_consumer、そして:consumerです。私達はそれらをそれぞれABCと呼びます。私達は、このポストの最後にワードカウントの例に戻るつもりです。

私達がAとよぶ供給者からスタートしましょう。Aは供給者で、その主な責任は、消費者が取り扱う積りのあるイベントの数の需要を受信することと、イベントの生成です。それらのイベントはメモリまたは外部データソースにあるでしょう。これから、init/1に受信されたcounterの値を所与として、シンプルなカウンターの実装を開始しましょう。

Note: GenStageプロジェクトの全てのモジュールはExperimental名前空間にプレフィックスされています。それが以下の例とあなたのコードでファイルの最初にalias Experimental.GenStageとしなければならない理由です。

alias Experimental.GenStage

defmodule A do
use GenStage

def init(counter) do
{:producer, counter}
end

def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)

# The events to emit is the second element of the tuple,
# the third being the state.
{:noreply, events, counter + demand}
end
end

Bは供給者-消費者です。これは常に需要は常にその供給者へフォワードされるので、需要を明示的に取り扱わないことを意味します。ABから需要を受信した時、Bへイベントを送信し、Bは要望どおりBにより変換され、それからCへ送信されます。私達の場合、Bはイベントを受信し、それを初期化時に与えられ、ステートとして保存された数で掛けます:

alias Experimental.GenStage

defmodule B do
use GenStage

def init(number) do
{:producer_consumer, number}
end

def handle_events(events, _from, number) do
events = Enum.map(events, & &1 * number)
{:noreply, events, number}
end
end

Cはそれらのイベントを最終的に受信する消費者で、毎秒端末へ印字します:

alias Experimental.GenStage

defmodule C do
use GenStage

def init(sleeping_time) do
{:consumer, sleeping_time}
end

def handle_events(events, _from, sleeping_time) do
# Print events to terminal.
IO.inspect(events)

# Sleep the configured time.
Process.sleep(sleeping_time)

# We are a consumer, so we never emit events.
{:noreply, [], sleeping_time}
end
end

stageが定義されたので、それらを開始して継げることができます:

{:ok, a} = GenStage.start_link(A, 0)    # starting from zero

{:ok, b} = GenStage.start_link(B, 2) # multiply by 2
{:ok, c} = GenStage.start_link(C, 1000) # sleep for a second

GenStage.sync_subscribe(c, to: b)
GenStage.sync_subscribe(b, to: a)

# Sleep so we see events printed.
Process.sleep(:infinity)

stageに参加すると直ぐに、私達は端末にアイテムが印字されるのを見ることになるはずです。例え消費者にスリープコマンドを送ったとしても、生産者は消費者をデータで溢れ出させないことに気が付くでしょう。それは、stage間のコミュニケーションがデマンドドリブンだからです。生産者は、消費者が需要を上流へ送信した後にだけ、アイテムを消費者へ送信できるのです。生産者は、消費者が指定したより多くのアイテムを、決して送ってはなりません。

このデザイン決定の一つの結果は、上記の消費者のようなステートレスなstageの並列化は本当に直接的だ、ということです:

{:ok, a} = GenStage.start_link(A, 0)     # starting from zero

{:ok, b} = GenStage.start_link(B, 2) # multiply by 2

{:ok, c1} = GenStage.start_link(C, 1000) # sleep for a second
{:ok, c2} = GenStage.start_link(C, 1000) # sleep for a second
{:ok, c3} = GenStage.start_link(C, 1000) # sleep for a second
{:ok, c4} = GenStage.start_link(C, 1000) # sleep for a second

GenStage.sync_subscribe(c1, to: b)
GenStage.sync_subscribe(c2, to: b)
GenStage.sync_subscribe(c3, to: b)
GenStage.sync_subscribe(c4, to: b)
GenStage.sync_subscribe(b, to: a)

# Sleep so we see events printed.
Process.sleep(:infinity)

単に複数の消費者の開始によって、stage B は今や複数のstageから需要を受信し、それらの並列で走っているstageへ、アイテムを送ることができるプロセスを常にピックアップして、イベントを送ることができます。私達は反対方向から並行性の活用をすることもできます: もしパイプライン中で生産者が遲いstageなら、複数の生産者を開始し、それぞれの消費者がそれに参加させることができます。

どの消費者が特定のイベントを受信すべきかについて知るために、生産者のstageはGenStage.Dispatcherと呼ばれるビヘイビアに依存します。デフォルトのディスパッチャは私達が上で簡単に説明したGenStage.DemandDispatcherです: それは異る消費者から需要を集めて、最も高い需要のものへディスパッチします。これは、もし私達がそのスリープタイムを10秒に増やしてしまうかもしれないとかで、一つの消費者が遲いなら、その消費者はアイテムを受け取らないでしょう。


GenStage for data-ingestion

GenStageのユースケースの一つは、サードパーティからのデータの消費です。back-pressure付のデマンドシステムは、私達が効果的に取り扱うことが出来るより多くのデータをインポートしないだろうことを、保証します。デマンドディスパッチャは、私達に単にもっと消費者を追加することで、データを処理するときの並行性の活用を容易にします。

Elixir London Meetupで、私はキューとしてPostgreSQLデータベースに保存されたデータを並行して処理するために、GenStagをどのように使うかを示した短い例を、liveコーディングしました。

https://www.youtube.com/embed/aZuY5-2lwW4


GenStage for event dispatching

GenStageが有用なもう一つのシナリオは、開発者が過去にGenEvent を使用していたであろう例を置き換えることです。GenEventに馴染みがない人のために、それは、イベントは"event manager"へ送られ、それから各イベントの"event handler"を呼出すというビヘイビアです。GenEventはしかしながら一つの大きな欠陥を持っています: event managerと全てのevent handlerは同じプロセスで動作します。これはGenEventハンドラは、開発者にそれら自身へメカニズムを実装することを強制しないかぎり、並行性を活用できないことを意味します。また、GentEnventハンドラは非常に厄介なエラーセマンティクスを持っています。なぜならevent handlerはプロセスを分離していないため、私達は単にそれらを再起動するsupervisorに頼ることは出来ません。

GenStageはevent managerとして生産者を持つことによりこの問題を解決します。生産者はそれ自身がディスパッチャとしてGenStage.BroadcastDispatcherを使うように設定する必要があります。そのbroadcast ディスパッチャは、イベントを消費者の需要を超えないように、全ての消費者へディスパッチすることを保証します。これは私達に並行性の活用を許し、生産者として"event manager"を持つことは、バッファリングと失敗時の対応の面で、私達に遥かに柔軟性を与えます。

生産者としてevent managerを構成する例を見てみましょう:

alias Experimental.GenStage

defmodule EventManager do
use GenStage

@doc """
Starts the manager.
"""

def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end

@doc """
Sends an event and returns only after the event is dispatched.
"""

def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end

## Callbacks

def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end

def handle_call({:notify, event}, from, {queue, demand}) do
dispatch_events(:queue.in({from, event}, queue), demand, [])
end

def handle_demand(incoming_demand, {queue, demand}) do
dispatch_events(queue, incoming_demand + demand, [])
end

defp dispatch_events(queue, demand, events) do
with d when d > 0 <- demand,
{item, queue} = :queue.out(queue),
{:value, {from, event}} <- item do
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
else
_ -> {:noreply, Enum.reverse(events), {queue, demand}}
end
end
end

EventManagerはバッファとして動作します。イベントの送信でなく、需要があれば、私達はその需要を保存します。もし需要でなくイベントがあれば、そのイベントをキューに保存します。もしクライアントがeventをbroadcastしようとすると、イベントが効果的にbroadcastされるまで、sync_notifyコールはブロックされます。ロジックの大部分は、需要がある間、キューからイベントを取り出すdispatch_event/3関数にあります。

生産者としてのevent managerの実装により、私達は単なるGenEventでは不可能な振る舞いの全ての種類を設定することができます。それらは、どのくらい多くの(あるいはどのくらい長い間)データをキューに入れたいか、そして、消費者が存在しないとき、イベントをバッファリングするか否かといったものです(handle_subscribe/4handle_cancel/3 コールバックを通して行います).

event handlerの実装は他の消費者を畫くのと同じくらい簡単です。私達は事実、以前に実装したC消費者を使うことができます。しかしながら、与えられたevent managerは多くの場合、起動するときにmanagerにサブスクライブする推奨のハンドラが、事前に定義されています。

alias Experimental.GenStage

defmodule EventHandler do
use GenStage

def start_link() do
GenStage.start_link(__MODULE__, :ok)
end

# Callbacks

def init(:ok) do
# Starts a permanent subscription to the broadcaster
# which will automatically start requesting items.
{:consumer, :ok, subscribe_to: [EventManager]}
end

def handle_events(events, _from, state) do
IO.inspect events
{:noreply, [], state}
end
end

それは以下を保証します。スーパバイズされたEventHandlerがクラッシュしたら、スーパバイザは新しいevent handlerをスタートし、おなじマネージャにすぐにサブスクライブするでしょう。これがGenEventで私達がみたぎこちないエラーハンドリングを解くことです。


The path forward

GenStage v0.3.0のリリースで、イベントマネージャと、しばしばback-pressureのある外部データソースプロセス間のイベントの交換に使えるという、GenStageとしての重要なマイルストーンに到達しました。

v0.3.0リリースは、ストリームとしてGenStageからデータを消費することを可能にするGenStage.stream関数、生産者としてenumerableやFile.stream!のようなストリームを使うことを可能にする、GenStage.from_enumerableも含んでいます。Stageとストリームの間のギャップを閉じることです。

しかしながら、私達は決して終わっていません!

最初は、今はコミュニティがGenStageを試してみるための時期です。もしあなたが過去にGenEventを使っていたら、GenStageに置き換えることができますか? 同様に、もしあなたがイベントハンドリングシステムを実装しようと計画していたら、GenEventをトライしてください。

RabbitMQ, Redis や Apache Kafkaといった、外部データソースとの統合ライブラリを管理する開発者は、これらのソースからのデータを消費するための抽象化としてGenStageを探索することができます。その利用者が消費者stageを設定するために、ライブラリの開発者は生産者を実装しなければなりません。

私達は十分なフィードバックを得たら、GenStageは標準ライブラリの一部としていくつかの形状に含まれるでしょう。目標は、長期的には、GenStageを導入して、GenEventは段階的に排除することです。

Elixirチームの私達は、以下のことも丁度はじめています。GenStageの次のマイルストーンは、最初の問題に再訪して、並行性(と分散性)のため、コレクションを処理するコードにeagerからlazyへの開発者に明確なパスを提供することです。

以前に見られるように、私達はストリームを導入することで、開発者に、eagerなコードをlazyに変換することを可能にしました。

File.stream!("path/to/some/file")

|> Stream.flat_map(fn line ->
String.split(line, " ")
end)
|> Enum.reduce(%{}, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

多数あるいは無限のコレクションで動くとき、上記は役に立ちますが、まだ、並行性を活用していません。それに対処するために、私達は現在、GenStage.Flowと名付けられた解決策を調査しています。それは、一つのプロセスの代りに、複数のstageを横切って走ること以外はstreamと良く似た計算を表現することを許すものです:

alias Experimental.GenStage.Flow

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(fn line ->
for word <- String.split(" "), do: {word, 1}
end)
|> Flow.reduce_by_key(& &1 + &2)
|> Enum.to_list()

そして高度に最適化されたバージョン:

alias Experimental.GenStage.Flow

# Let's compile common patterns for performance
empty_space = :binary.compile_pattern(" ") # NEW!

File.stream!("path/to/some/file", read_ahead: 100_000) # NEW!
|> Flow.from_enumerable()
|> Flow.flat_map(fn line ->
for word <- String.split(empty_space), do: {word, 1}
end)
|> Flow.partition_with(storage: :ets) # NEW!
|> Flow.reduce_by_key(& &1 + &2)
|> Enum.to_list()

Flowは、プロセス間のデータ転送量を最小にしながら、私達のコードを実行するための一連のstageを開始し、実行する計算になります。もしGenStage.Flowに、そして、どのように上記の計算が複数のstageに分散しているか興味を持っているなら、私達はこれまでに構築したプロトタイプに基いていくつかのドキュメントを書いてます. コード自体は将来のGenStageのリリースで出来ています。また私達はGenStage.FLow APIがeagerから並行性の明確性へのパスを作るためにEnumStreamの関数をどのようにミラーするかを考えなければなりません。

一定のデータのワードカウント問題では、早期の実験では、20%の固定のオーバヘッドでパフォーマンスの線形増加を示します。言い替えると、シングルコアで60sかかるデータセットで、2コアの機械では36sで、4コアでは18sです。これらの全ては、単にストリームからFlowへあなたの処理を移動させるだけで得られます。もうすぐ私達は40以上のコアでの機械でベンチマークをする計画です。

私達はGenStageが開発者にもたらす可能性と、私達が調査研究する新しいパスの全てに、非常に興奮しています。だから、それを試して、私達にしらせてほしい! GenStage, Flows, and more will also be the topic of my keynote at ElixirConf 2016 も見てほしいです。

Happy coding!