はじめに
以前、Akka Streams でたこ焼き屋さんという投稿で、ScalaのAkka Streamを使い、たこ焼き屋さんを題材としたバックプレッシャーを効かせたストリーム処理を実装しましたが、今回はそれをElixirのGenStageを使って実装してみました。
なお、今回のコードは下記で公開しています。
https://github.com/imahiro-t/takoyaki-genstage
お題
- たこ焼き屋さんの店員は注文担当、焼き担当、レジ・パッキング担当の3人いる
- たこ焼きを焼く鉄板には1度に8個焼けるレーンが4レーンある
- 注文はパック単位で1パック8個入りで、最大6パックまでランダムに注文が入る
- たこ焼きには1/100の確率で虫が混入し、その場合は、パックのたこ焼きを全て破棄し焼き直す必要がある
- お客さんを効率よく捌くため最初に鉄板一面(4パック)分を作り置く
実装
GenStageを使用しないと
GenStageを使用しない単純な実装としては、注文をインプットに、注文数分たこ焼きを焼き、検品し、パック詰して提供する感じだと思います。
ただ、この場合だと、最初に作り置く要件を満たせないし、例えば3パックの注文が入った際に、焼かないレーンが1レーン残り、仮に次のお客さんが1パックの注文だった場合などに、効率が良くないです。
ですので、最初に作り置く実装を考えてみますが、お客さんには注文分ができたらそこで提供する必要があるため、注文分が揃っているのに作り置きが完了するまでお客さんを待たせるような処理は組めず、焼き担当は別スレッドでの管理となると思います。
また、注文を受けた際にも、作り置きが完了しているかどうかを確認して待つ必要があるため、焼き担当の別ワーカーに加え、レジ・パッキング担当も別スレッドでの管理となると思います。
などと、諸々考えてみても、GenStageを使用しない実装はかなり大変だということがわかります。
Elixirの場合は、GenServerである程度簡単になりますが、バックプレッシャー等の仕組みを入れるとなると、GenStageにした方が複雑にならずスッキリとした実装になると思います。
GenStageを使用すると
ダイアグラム
GenStageを使用するとどうなるのかというところですが、まず、全体のダイアグラムについて考えてみます。
- 注文を受けた注文担当が焼き担当とレジ・パッキング担当に注文を伝える
- 焼き担当は注文数分の空パックを用意し、空パック分のたこ焼きを焼いていき、焼き終わったらたこ焼きの入ったパックをレジ・パッキング担当に渡していく
- 虫が入っていた場合は、レーン毎に焼き直す
- 最初に4パックたこ焼きを焼いていたり、焼き直しが発生するので注文時の空パックと提供するパックは異なる
- レジ・パッキング担当は、注文書にしたがってお会計を行った後、焼き担当から渡されたたこ焼き入りのパックを注文数分パッキングしたらお客さんに商品を提供する
プロジェクト作成&初期設定
mix new takoyaki --sup
mix new
でプロジェクトを作成します。
defmodule Takoyaki.MixProject do
use Mix.Project
def project do
[
app: :takoyaki,
version: "0.1.0",
elixir: "~> 1.12",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: {Takoyaki.Application, []}
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:gen_stage, "~> 1.1.0"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
end
gen_stage
を依存に追加します。
データモデル
defmodule Order do
defstruct order_number: 1, order_count: 0
end
defmodule Takoball do
defstruct value: "●"
end
defmodule Pack do
defstruct takoballs: []
end
defimpl String.Chars, for: Pack do
def to_string(pack) do
"[#{pack.takoballs |> Enum.map(& &1.value) |> Enum.join(",")}]"
end
end
各ステージ間で渡される情報については別途データモデルとして切り出しておきます。
注文を受ける(注文担当)
defmodule OrderTaker do
use GenStage
def start_link(initial_order_number) do
GenStage.start_link(__MODULE__, initial_order_number, name: __MODULE__)
end
def init(initial_order_number) do
{:producer, initial_order_number, dispatcher: GenStage.BroadcastDispatcher}
end
def order(order_count) do
GenStage.call(__MODULE__, {:order, order_count}, 5000)
end
def handle_call({:order, order_count}, _from, order_number) do
IO.puts(
"#{order_count} pack(s) ordered [#{order_number |> to_string |> String.pad_leading(10, "0")}]"
)
{:reply, :ok, [%Order{order_number: order_number, order_count: order_count}],
order_number + 1}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
end
OrderTaker
ステージはProducer(Source)になりますが、外部からorder/1
関数が呼ばれることで注文が入るようにするため、handle_demand/2
関数ではなくhandle_call/3
関数でイベントを蓄積していくことになります。
注文が入ると、焼き担当とレジ・パッキング担当に同じ注文情報を流すことになるため、デフォルトのdispatcherであるGenStage.DemandDispatcher
ではなくGenStage.BroadcastDispatcher
を使用してブロードキャストするようにしています。
イベントは注文番号と注文数が入っている%Order{}
の配列になり、現在の注文番号をステートで管理しています。
最初の4つの空パックを用意する(焼き担当)
defmodule InitEmptyPackPreparation do
use GenStage
def start_link(initial_cook_count) do
GenStage.start_link(__MODULE__, initial_cook_count, name: __MODULE__)
end
def init(initial_cook_count) do
{:producer, initial_cook_count}
end
def handle_demand(demand, rest_pack_count) when demand > 0 and rest_pack_count > 0 do
pack_count = if rest_pack_count > demand, do: demand, else: rest_pack_count
empty_packs = pack_count |> prepare_empty_packs()
{:noreply, empty_packs, rest_pack_count - pack_count}
end
def handle_demand(_demand, rest_pack_count) do
{:noreply, [], rest_pack_count}
end
defp prepare_empty_packs(pack_count) do
1..pack_count |> Enum.map(fn _ -> %Pack{} end)
end
end
InitEmptyPackPreparation
ステージは、最初に作り置くため、注文とは別で鉄板に4つの空のパックを流すProducer(Source)になります。
start_link/1
で渡された数量を上限として、handle_demand/2
で要求数に従って空の%Pack{}
の配列をイベントとして蓄積し、残りの作成可能数量をステートで管理しています。
空パックを用意する(焼き担当)
defmodule EmptyPackPreparation do
use GenStage
def start_link(_opts) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer_consumer, :non_use, subscribe_to: [{OrderTaker, max_demand: 1, min_demand: 0}]}
end
def handle_events([%Order{order_count: order_count}], _from, state) do
empty_packs = order_count |> prepare_empty_packs()
{:noreply, empty_packs, state}
end
defp prepare_empty_packs(pack_count) do
1..pack_count |> Enum.map(fn _ -> %Pack{} end)
end
end
EmptyPackPreparation
ステージはOrderTaker
ステージをsubscribeしています。
handle_events/3
で注文担当から注文が流れてくると、注文数に従って空の%Pack{}
の配列をイベントとして蓄積します。
たこ焼きを焼く(焼き担当)
この部分はPreCook
、Cook
、PostCook
3つのステージに分けて実現しています。
defmodule PreCook do
use GenStage
def start_link(initial_cook_count) do
GenStage.start_link(__MODULE__, initial_cook_count, name: __MODULE__)
end
def init(initial_cook_count) do
{:producer_consumer, initial_cook_count,
subscribe_to: [EmptyPackPreparation, InitEmptyPackPreparation]}
end
def handle_subscribe(:producer, _opts, {pid, _reference} = from, initial_cook_count = state) do
if pid == Process.whereis(InitEmptyPackPreparation) do
GenStage.ask(from, initial_cook_count)
{:manual, state}
else
{:automatic, state}
end
end
def handle_subscribe(:consumer, _opts, _from, state) do
{:automatic, state}
end
def handle_events(empty_packs, _from, state) do
{:noreply, empty_packs, state}
end
end
PreCook
ステージはInitEmptyPackPreparation
ステージとEmptyPackPreparation
ステージをsubscribeしています。
ただ、InitEmptyPackPreparation
ステージについては、最初に要求を投げるだけでいいので、InitEmptyPackPreparation
ステージからのhandle_subscribe/4
が呼ばれた際に{:manual, state}
を返して手動モードにするとともにGenStage.ask/2
を呼んで最初に焼く分の4つの空の%Pack{}
を要求するようにしています。
defmodule Cook do
use GenStage
@takoball_count 8
def start_link(name) do
GenStage.start_link(__MODULE__, :ok, name: name)
end
def init(:ok) do
{:producer_consumer, :non_use, subscribe_to: [{PreCook, max_demand: 1, min_demand: 0}]}
end
def handle_events([%Pack{} = empty_pack], _from, state) do
pack = empty_pack |> cook()
{:noreply, [pack], state}
end
defp cook(%Pack{} = empty_pack) do
Process.sleep(3000)
try do
takoballs =
1..@takoball_count
|> Enum.map(fn _ -> cook_takoball() end)
%Pack{empty_pack | takoballs: takoballs}
rescue
_ ->
IO.puts("sorry... bug inside... retrying to cook...")
empty_pack |> cook()
end
end
defp cook_takoball() do
if 1..100 |> Enum.random() == 1 do
raise "bug"
else
%Takoball{}
end
end
end
Cook
ステージは一つのレーンを表現していて、レーンの数である複数のCook
ステージがPreCook
ステージをsubscribeすることになります。
空の%Pack{}
を受けて、たこ焼きを詰めた%Pack{}
をイベントとして登録します。たこ焼き1パック8個を3秒で焼き、虫が入っていると焼き直しています。
defmodule PostCook do
use GenStage
def start_link(lane_names) do
GenStage.start_link(__MODULE__, lane_names, name: __MODULE__)
end
def init(lane_names) do
{:producer_consumer, :non_use, subscribe_to: lane_names}
end
def handle_events([%Pack{} = pack], _from, state) do
{:noreply, [pack], state}
end
end
PostCook
ステージはstart_link/1
で受け取ったレーンの数である複数のCook
ステージをsubscribeしています。
PreCook
ステージでfan-outされた処理をPostCook
ステージでfan-inしていることになります。
全体としては、焼き担当は、注文内容を意識せずに用意された空パック分のたこ焼きをひたすら焼いていき、パック詰できたものからパック単位でレジ・パッキング担当に渡しているだけ、というところがポイントになるかと思います。
お会計する(レジ・パッキング担当)
defmodule Cashier do
use GenStage
def start_link(_opts) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer_consumer, :non_use, subscribe_to: [{OrderTaker, max_demand: 1, min_demand: 0}]}
end
def handle_events([%Order{} = order], _from, state) do
{:noreply, [order], state}
end
end
Cashier
ステージはOrderTaker
ステージをsubscribeしています。
今回、お会計の部分は特に何もせずに、受け取った注文情報のイベントをそのままパッキング担当に流しています。
商品を提供する(レジ・パッキング担当)
defmodule Packer do
use GenStage
defmodule State do
defstruct order: nil,
packs: [],
producers: %{Cashier => nil, PostCook => nil}
end
def start_link(_opts) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:consumer, %State{}, subscribe_to: [Cashier, PostCook]}
end
def handle_subscribe(
:producer,
_opts,
{pid, _reference} = from,
%State{producers: producers} = state
) do
producers =
if pid == Process.whereis(Cashier) do
GenStage.ask(from, 1)
%{producers | Cashier => from}
else
%{producers | PostCook => from}
end
{:manual, %State{state | producers: producers}}
end
def handle_events(
[%Order{} = order],
{pid, _reference} = _from,
%State{order: nil, packs: [], producers: %{PostCook => post_cook}} = state
) do
state =
if pid == Process.whereis(Cashier) do
GenStage.ask(post_cook, 1)
%State{state | order: order}
else
state
end
{:noreply, [], state}
end
def handle_events(
[%Pack{} = pack],
{pid, _reference} = from,
%State{
order: %Order{
order_number: order_number,
order_count: order_count
},
packs: packs,
producers: %{Cashier => cashier}
} = state
) do
state =
if pid == Process.whereis(PostCook) do
packs = [pack | packs]
1..length(packs) |> Enum.map(fn _ -> "." end) |> Enum.join() |> IO.puts()
if order_count == packs |> length() do
serve(order_number, packs)
GenStage.ask(cashier, 1)
%State{state | packs: [], order: nil}
else
GenStage.ask(from, 1)
%State{state | packs: packs}
end
else
state
end
{:noreply, [], state}
end
defp serve(order_number, packs) do
IO.puts(
"[#{order_number |> to_string |> String.pad_leading(10, "0")}]" <>
(packs |> Enum.map(&(&1 |> to_string)) |> Enum.join())
)
end
end
Packer
ステージはCashier
ステージとPostCook
ステージをsubscribeしています。
Cashier
ステージからは%Order{}
が、PostCook
ステージからは%Pack{}
が、といったように、それぞれ異なるイベントが流れてきます。
Packer
ステージはCashier
ステージから受け取った%Order{}
に記載されている注文数分の%Pack{}
をPostCook
ステージから受け取ってお客様に提供するような仕組みにする必要があります。
これを実現するため、まずはhandle_subscribe/4
で{:manual, state}
を返して上流への要求を手動で管理するようにします。
handle_events/3
ではCashier
ステージから受け取った%Order{}
とPostCook
ステージから受け取った%Pack{}
をステートとして保持し、必要に応じてGenStage.ask/2
でそれぞれのステージに要求を出しています。
呼び出し
defmodule Takoyaki do
use GenServer
@max_order_count 6
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
send_next_order()
{:ok, :non_use}
end
def handle_info(:order, state) do
order()
send_next_order()
{:noreply, state}
end
defp send_next_order() do
Process.send_after(self(), :order, (10..50 |> Enum.random()) * 100)
end
defp order() do
OrderTaker.order(1..@max_order_count |> Enum.random())
end
end
ここではGenServerを使って1秒から5秒までのランダムな間隔で1パックから6パックまでのランダムな数量で注文を行なっています。
defmodule Takoyaki.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
@initial_order_number 1
@initial_cook_count 4
@impl true
def start(_type, _args) do
children = [
{OrderTaker, @initial_order_number},
{Cashier, []},
{InitEmptyPackPreparation, @initial_cook_count},
{EmptyPackPreparation, []},
{PreCook, @initial_cook_count},
Supervisor.child_spec({Cook, :lane1}, id: :lane1),
Supervisor.child_spec({Cook, :lane2}, id: :lane2),
Supervisor.child_spec({Cook, :lane3}, id: :lane3),
Supervisor.child_spec({Cook, :lane4}, id: :lane4),
{PostCook, [:lane1, :lane2, :lane3, :lane4]},
{Packer, []},
{Takoyaki, []}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :rest_for_one, name: Takoyaki.Supervisor]
Supervisor.start_link(children, opts)
end
end
Takoyaki.Application
モジュールに各モジュールをワーカーとして登録して起動するようにして完成です。
テスト実行
iex -S mix
下記のように並列にバックプレッシャーがかかった状態で動作します。
4 pack(s) ordered [0000000042]
.
..
...
....
[0000000040][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
6 pack(s) ordered [0000000043]
.
..
...
[0000000041][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
.
5 pack(s) ordered [0000000044]
sorry... bug inside... retrying to cook...
..
...
....
[0000000042][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
1 pack(s) ordered [0000000045]
.
..
...
....
1 pack(s) ordered [0000000046]
.....
......
[0000000043][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
さいごに
以前GenStageでたこ焼き屋さんを実装しようとして、Akka Streamのように実装できずに断念していたのですが、単に私の理解が不足していただけだったみたいでした。
Concurrent Data Processing in Elixirという本の3章のGenStageの章を読んだことで、理解の幅が広がり、今回、再チャレンジしてみました。
仕組みが理解できるとAkka Streamよりもシンプルで分かりやすい気がしています。
Akka Streamの時にも思ったのですが、今回のような処理が GenStageに向いているのだろうなというのが感想です。
オフィシャルのドキュメントにも書かれていますが、ステージの粒度については、分岐するようなルーティングやバックプレッシャーを実現するときのみステージを分けるのがベストプラクティスだそうなので、今回はステージを細かく分け過ぎているかと思います。
ただ、ブロードキャストやfan-in、fan-out、手動での上流への要求等々、GenStageでどのように実装するかがわかるサンプルプログラムとしては、いい感じなのではと思います。