LoginSignup
9
4

More than 1 year has passed since last update.

GenStage でたこ焼き屋さん

Last updated at Posted at 2021-06-12

はじめに

以前、Akka Streams でたこ焼き屋さんという投稿で、ScalaのAkka Streamを使い、たこ焼き屋さんを題材としたバックプレッシャーを効かせたストリーム処理を実装しましたが、今回はそれをElixirのGenStageを使って実装してみました。

なお、今回のコードは下記で公開しています。
https://github.com/imahiro-t/takoyaki-genstage

お題

  • たこ焼き屋さんの店員は注文担当、焼き担当、レジ・パッキング担当の3人いる
  • たこ焼きを焼く鉄板には1度に8個焼けるレーンが4レーンある
  • 注文はパック単位で1パック8個入りで、最大6パックまでランダムに注文が入る
  • たこ焼きには1/100の確率で虫が混入し、その場合は、パックのたこ焼きを全て破棄し焼き直す必要がある
  • お客さんを効率よく捌くため最初に鉄板一面(4パック)分を作り置く

実装

GenStageを使用しないと

GenStageを使用しない単純な実装としては、注文をインプットに、注文数分たこ焼きを焼き、検品し、パック詰して提供する感じだと思います。

ただ、この場合だと、最初に作り置く要件を満たせないし、例えば3パックの注文が入った際に、焼かないレーンが1レーン残り、仮に次のお客さんが1パックの注文だった場合などに、効率が良くないです。

ですので、最初に作り置く実装を考えてみますが、お客さんには注文分ができたらそこで提供する必要があるため、注文分が揃っているのに作り置きが完了するまでお客さんを待たせるような処理は組めず、焼き担当は別スレッドでの管理となると思います。

また、注文を受けた際にも、作り置きが完了しているかどうかを確認して待つ必要があるため、焼き担当の別ワーカーに加え、レジ・パッキング担当も別スレッドでの管理となると思います。

などと、諸々考えてみても、GenStageを使用しない実装はかなり大変だということがわかります。

Elixirの場合は、GenServerである程度簡単になりますが、バックプレッシャー等の仕組みを入れるとなると、GenStageにした方が複雑にならずスッキリとした実装になると思います。

GenStageを使用すると

ダイアグラム

GenStageを使用するとどうなるのかというところですが、まず、全体のダイアグラムについて考えてみます。

takoyaki_diagram.png

  • 注文を受けた注文担当が焼き担当とレジ・パッキング担当に注文を伝える
  • 焼き担当は注文数分の空パックを用意し、空パック分のたこ焼きを焼いていき、焼き終わったらたこ焼きの入ったパックをレジ・パッキング担当に渡していく
    • 虫が入っていた場合は、レーン毎に焼き直す
    • 最初に4パックたこ焼きを焼いていたり、焼き直しが発生するので注文時の空パックと提供するパックは異なる
  • レジ・パッキング担当は、注文書にしたがってお会計を行った後、焼き担当から渡されたたこ焼き入りのパックを注文数分パッキングしたらお客さんに商品を提供する

プロジェクト作成&初期設定

mix new takoyaki --sup

mix newでプロジェクトを作成します。

mix.exs
defmodule Takoyaki.MixProject do
  use Mix.Project

  def project do
    [
      app: :takoyaki,
      version: "0.1.0",
      elixir: "~> 1.12",
      start_permanent: Mix.env() == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: {Takoyaki.Application, []}
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      {:gen_stage, "~> 1.1.0"}
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
    ]
  end
end

gen_stageを依存に追加します。

データモデル

model.ex
defmodule Order do
  defstruct order_number: 1, order_count: 0
end

defmodule Takoball do
  defstruct value: "●"
end

defmodule Pack do
  defstruct takoballs: []
end

defimpl String.Chars, for: Pack do
  def to_string(pack) do
    "[#{pack.takoballs |> Enum.map(& &1.value) |> Enum.join(",")}]"
  end
end

各ステージ間で渡される情報については別途データモデルとして切り出しておきます。

注文を受ける(注文担当)

order_taker.ex
defmodule OrderTaker do
  use GenStage

  def start_link(initial_order_number) do
    GenStage.start_link(__MODULE__, initial_order_number, name: __MODULE__)
  end

  def init(initial_order_number) do
    {:producer, initial_order_number, dispatcher: GenStage.BroadcastDispatcher}
  end

  def order(order_count) do
    GenStage.call(__MODULE__, {:order, order_count}, 5000)
  end

  def handle_call({:order, order_count}, _from, order_number) do
    IO.puts(
      "#{order_count} pack(s) ordered [#{order_number |> to_string |> String.pad_leading(10, "0")}]"
    )

    {:reply, :ok, [%Order{order_number: order_number, order_count: order_count}],
     order_number + 1}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end
end

OrderTakerステージはProducer(Source)になりますが、外部からorder/1関数が呼ばれることで注文が入るようにするため、handle_demand/2関数ではなくhandle_call/3関数でイベントを蓄積していくことになります。

注文が入ると、焼き担当とレジ・パッキング担当に同じ注文情報を流すことになるため、デフォルトのdispatcherであるGenStage.DemandDispatcherではなくGenStage.BroadcastDispatcherを使用してブロードキャストするようにしています。

イベントは注文番号と注文数が入っている%Order{}の配列になり、現在の注文番号をステートで管理しています。

最初の4つの空パックを用意する(焼き担当)

init_empty_pack_preparation.ex
defmodule InitEmptyPackPreparation do
  use GenStage

  def start_link(initial_cook_count) do
    GenStage.start_link(__MODULE__, initial_cook_count, name: __MODULE__)
  end

  def init(initial_cook_count) do
    {:producer, initial_cook_count}
  end

  def handle_demand(demand, rest_pack_count) when demand > 0 and rest_pack_count > 0 do
    pack_count = if rest_pack_count > demand, do: demand, else: rest_pack_count
    empty_packs = pack_count |> prepare_empty_packs()
    {:noreply, empty_packs, rest_pack_count - pack_count}
  end

  def handle_demand(_demand, rest_pack_count) do
    {:noreply, [], rest_pack_count}
  end

  defp prepare_empty_packs(pack_count) do
    1..pack_count |> Enum.map(fn _ -> %Pack{} end)
  end
end

InitEmptyPackPreparationステージは、最初に作り置くため、注文とは別で鉄板に4つの空のパックを流すProducer(Source)になります。

start_link/1で渡された数量を上限として、handle_demand/2で要求数に従って空の%Pack{}の配列をイベントとして蓄積し、残りの作成可能数量をステートで管理しています。

空パックを用意する(焼き担当)

empty_pack_preparation.ex
defmodule EmptyPackPreparation do
  use GenStage

  def start_link(_opts) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer_consumer, :non_use, subscribe_to: [{OrderTaker, max_demand: 1, min_demand: 0}]}
  end

  def handle_events([%Order{order_count: order_count}], _from, state) do
    empty_packs = order_count |> prepare_empty_packs()
    {:noreply, empty_packs, state}
  end

  defp prepare_empty_packs(pack_count) do
    1..pack_count |> Enum.map(fn _ -> %Pack{} end)
  end
end

EmptyPackPreparationステージはOrderTakerステージをsubscribeしています。

handle_events/3で注文担当から注文が流れてくると、注文数に従って空の%Pack{}の配列をイベントとして蓄積します。

たこ焼きを焼く(焼き担当)

この部分はPreCookCookPostCook3つのステージに分けて実現しています。

pre_cook.ex
defmodule PreCook do
  use GenStage

  def start_link(initial_cook_count) do
    GenStage.start_link(__MODULE__, initial_cook_count, name: __MODULE__)
  end

  def init(initial_cook_count) do
    {:producer_consumer, initial_cook_count,
     subscribe_to: [EmptyPackPreparation, InitEmptyPackPreparation]}
  end

  def handle_subscribe(:producer, _opts, {pid, _reference} = from, initial_cook_count = state) do
    if pid == Process.whereis(InitEmptyPackPreparation) do
      GenStage.ask(from, initial_cook_count)
      {:manual, state}
    else
      {:automatic, state}
    end
  end

  def handle_subscribe(:consumer, _opts, _from, state) do
    {:automatic, state}
  end

  def handle_events(empty_packs, _from, state) do
    {:noreply, empty_packs, state}
  end
end

PreCookステージはInitEmptyPackPreparationステージとEmptyPackPreparationステージをsubscribeしています。

ただ、InitEmptyPackPreparationステージについては、最初に要求を投げるだけでいいので、InitEmptyPackPreparationステージからのhandle_subscribe/4が呼ばれた際に{:manual, state}を返して手動モードにするとともにGenStage.ask/2を呼んで最初に焼く分の4つの空の%Pack{}を要求するようにしています。

cook.ex
defmodule Cook do
  use GenStage

  @takoball_count 8

  def start_link(name) do
    GenStage.start_link(__MODULE__, :ok, name: name)
  end

  def init(:ok) do
    {:producer_consumer, :non_use, subscribe_to: [{PreCook, max_demand: 1, min_demand: 0}]}
  end

  def handle_events([%Pack{} = empty_pack], _from, state) do
    pack = empty_pack |> cook()
    {:noreply, [pack], state}
  end

  defp cook(%Pack{} = empty_pack) do
    Process.sleep(3000)

    try do
      takoballs =
        1..@takoball_count
        |> Enum.map(fn _ -> cook_takoball() end)

      %Pack{empty_pack | takoballs: takoballs}
    rescue
      _ ->
        IO.puts("sorry... bug inside... retrying to cook...")
        empty_pack |> cook()
    end
  end

  defp cook_takoball() do
    if 1..100 |> Enum.random() == 1 do
      raise "bug"
    else
      %Takoball{}
    end
  end
end

Cookステージは一つのレーンを表現していて、レーンの数である複数のCookステージがPreCookステージをsubscribeすることになります。

空の%Pack{}を受けて、たこ焼きを詰めた%Pack{}をイベントとして登録します。たこ焼き1パック8個を3秒で焼き、虫が入っていると焼き直しています。

post_cook.ex
defmodule PostCook do
  use GenStage

  def start_link(lane_names) do
    GenStage.start_link(__MODULE__, lane_names, name: __MODULE__)
  end

  def init(lane_names) do
    {:producer_consumer, :non_use, subscribe_to: lane_names}
  end

  def handle_events([%Pack{} = pack], _from, state) do
    {:noreply, [pack], state}
  end
end

PostCookステージはstart_link/1で受け取ったレーンの数である複数のCookステージをsubscribeしています。

PreCookステージでfan-outされた処理をPostCookステージでfan-inしていることになります。

全体としては、焼き担当は、注文内容を意識せずに用意された空パック分のたこ焼きをひたすら焼いていき、パック詰できたものからパック単位でレジ・パッキング担当に渡しているだけ、というところがポイントになるかと思います。

お会計する(レジ・パッキング担当)

cashier.ex
defmodule Cashier do
  use GenStage

  def start_link(_opts) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer_consumer, :non_use, subscribe_to: [{OrderTaker, max_demand: 1, min_demand: 0}]}
  end

  def handle_events([%Order{} = order], _from, state) do
    {:noreply, [order], state}
  end
end

CashierステージはOrderTakerステージをsubscribeしています。

今回、お会計の部分は特に何もせずに、受け取った注文情報のイベントをそのままパッキング担当に流しています。

商品を提供する(レジ・パッキング担当)

packer.ex
defmodule Packer do
  use GenStage

  defmodule State do
    defstruct order: nil,
              packs: [],
              producers: %{Cashier => nil, PostCook => nil}
  end

  def start_link(_opts) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:consumer, %State{}, subscribe_to: [Cashier, PostCook]}
  end

  def handle_subscribe(
        :producer,
        _opts,
        {pid, _reference} = from,
        %State{producers: producers} = state
      ) do
    producers =
      if pid == Process.whereis(Cashier) do
        GenStage.ask(from, 1)
        %{producers | Cashier => from}
      else
        %{producers | PostCook => from}
      end

    {:manual, %State{state | producers: producers}}
  end

  def handle_events(
        [%Order{} = order],
        {pid, _reference} = _from,
        %State{order: nil, packs: [], producers: %{PostCook => post_cook}} = state
      ) do
    state =
      if pid == Process.whereis(Cashier) do
        GenStage.ask(post_cook, 1)
        %State{state | order: order}
      else
        state
      end

    {:noreply, [], state}
  end

  def handle_events(
        [%Pack{} = pack],
        {pid, _reference} = from,
        %State{
          order: %Order{
            order_number: order_number,
            order_count: order_count
          },
          packs: packs,
          producers: %{Cashier => cashier}
        } = state
      ) do
    state =
      if pid == Process.whereis(PostCook) do
        packs = [pack | packs]
        1..length(packs) |> Enum.map(fn _ -> "." end) |> Enum.join() |> IO.puts()

        if order_count == packs |> length() do
          serve(order_number, packs)
          GenStage.ask(cashier, 1)
          %State{state | packs: [], order: nil}
        else
          GenStage.ask(from, 1)
          %State{state | packs: packs}
        end
      else
        state
      end

    {:noreply, [], state}
  end

  defp serve(order_number, packs) do
    IO.puts(
      "[#{order_number |> to_string |> String.pad_leading(10, "0")}]" <>
        (packs |> Enum.map(&(&1 |> to_string)) |> Enum.join())
    )
  end
end

PackerステージはCashierステージとPostCookステージをsubscribeしています。

Cashierステージからは%Order{}が、PostCookステージからは%Pack{}が、といったように、それぞれ異なるイベントが流れてきます。

PackerステージはCashierステージから受け取った%Order{}に記載されている注文数分の%Pack{}PostCookステージから受け取ってお客様に提供するような仕組みにする必要があります。

これを実現するため、まずはhandle_subscribe/4{:manual, state}を返して上流への要求を手動で管理するようにします。

handle_events/3ではCashierステージから受け取った%Order{}PostCookステージから受け取った%Pack{}をステートとして保持し、必要に応じてGenStage.ask/2でそれぞれのステージに要求を出しています。

呼び出し

takoyaki.ex
defmodule Takoyaki do
  use GenServer

  @max_order_count 6

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    send_next_order()
    {:ok, :non_use}
  end

  def handle_info(:order, state) do
    order()
    send_next_order()
    {:noreply, state}
  end

  defp send_next_order() do
    Process.send_after(self(), :order, (10..50 |> Enum.random()) * 100)
  end

  defp order() do
    OrderTaker.order(1..@max_order_count |> Enum.random())
  end
end

ここではGenServerを使って1秒から5秒までのランダムな間隔で1パックから6パックまでのランダムな数量で注文を行なっています。

takoyaki/application.ex
defmodule Takoyaki.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  @initial_order_number 1
  @initial_cook_count 4

  @impl true
  def start(_type, _args) do
    children = [
      {OrderTaker, @initial_order_number},
      {Cashier, []},
      {InitEmptyPackPreparation, @initial_cook_count},
      {EmptyPackPreparation, []},
      {PreCook, @initial_cook_count},
      Supervisor.child_spec({Cook, :lane1}, id: :lane1),
      Supervisor.child_spec({Cook, :lane2}, id: :lane2),
      Supervisor.child_spec({Cook, :lane3}, id: :lane3),
      Supervisor.child_spec({Cook, :lane4}, id: :lane4),
      {PostCook, [:lane1, :lane2, :lane3, :lane4]},
      {Packer, []},
      {Takoyaki, []}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :rest_for_one, name: Takoyaki.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Takoyaki.Applicationモジュールに各モジュールをワーカーとして登録して起動するようにして完成です。

テスト実行

iex -S mix

下記のように並列にバックプレッシャーがかかった状態で動作します。

4 pack(s) ordered [0000000042]
.
..
...
....
[0000000040][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
6 pack(s) ordered [0000000043]
.
..
...
[0000000041][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
.
5 pack(s) ordered [0000000044]
sorry... bug inside... retrying to cook...
..
...
....
[0000000042][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]
1 pack(s) ordered [0000000045]
.
..
...
....
1 pack(s) ordered [0000000046]
.....
......
[0000000043][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●][●,●,●,●,●,●,●,●]

さいごに

以前GenStageでたこ焼き屋さんを実装しようとして、Akka Streamのように実装できずに断念していたのですが、単に私の理解が不足していただけだったみたいでした。

Concurrent Data Processing in Elixirという本の3章のGenStageの章を読んだことで、理解の幅が広がり、今回、再チャレンジしてみました。

仕組みが理解できるとAkka Streamよりもシンプルで分かりやすい気がしています。

Akka Streamの時にも思ったのですが、今回のような処理が GenStageに向いているのだろうなというのが感想です。

オフィシャルのドキュメントにも書かれていますが、ステージの粒度については、分岐するようなルーティングやバックプレッシャーを実現するときのみステージを分けるのがベストプラクティスだそうなので、今回はステージを細かく分け過ぎているかと思います。

ただ、ブロードキャストやfan-in、fan-out、手動での上流への要求等々、GenStageでどのように実装するかがわかるサンプルプログラムとしては、いい感じなのではと思います。

9
4
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
4