5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Broadwayを嗜む①:「パーフェクトな並行・並列データフレームワーク」の基本的な使い方

Last updated at Posted at 2025-12-27

この記事は、Elixir Advent Calendar 2025 その5 の2日目です

昨日は @torifukukaiou さんで 「Day 11: ReactorをElixirで解くことを楽しむ」 でした


piacere です、ご覧いただいてありがとございます :bow:

Elixirでバックプレッシャー(後続が処理可能なデータを必要なだけ流す機構)と並列・並列データ処理、レートリミッター(流量のコントロールやQoS担保)、データレベルの耐障害性(安全なシャットダウン、障害処理のカスタマイズ)、Telemetryでの性能可視化、そしてテスト容易 … とパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、最もシンプルなコードで解説します

この「Broadwayを嗜む」シリーズのコラムは下記です

① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ カスタムProducerをリストから本物のキューに換装する
④ マルチコア並列をベンチマークする
⑤ パーティショニングによるマルチユーザー対応
⑥ 異常データが来たときはリトライしたり、除外してログ出力

このコラムが面白かったり、役に立ったら、image.png で応援よろしくお願いします :bow:

Elixirアドベントカレンダー、応援お願いします :bow:

今年もやっています

Broadwayを動かすElixir PJ作成

Elixir PJを作成します

mix new basic
cd basic

Broadwayをインストールします

basic/mix.exs
defmodule Basic.MixProject do
  use Mix.Project

  defp deps do
    [
+     {:broadway, "~> 1.0"}
      # {:dep_from_hexpm, "~> 0.3.0"},

mix deps.get

カスタムProducerとデータを流し込まれるBroadway

Broadwayは、Amazon SQSやKafka等をProducerとする例が良く見られますが、自分でカスタムProducerも作れるので、最もシンプルにBroadwayを理解するためのカスタムProducerを追加してみます

カスタムProducerは、下記コードのように GenStage を使って作れます

init で初期化し、handle_demand でBroadwayからの要求を受け取ってBroadwayにデータが流し込まれます(この処理をバックプレッシャーと言います)

basic/lib/simple_producer.ex
defmodule SimpleProducer do
  use GenStage

  def start_link(initial_state) do
    GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
  end

  @name Module.concat([Basic.Broadway, Broadway.Producer_0])
  def push(items) when is_list(items), do: GenServer.cast(@name, {:push, items})

  @impl true
  def init(:init), do: {:producer, {[], 0}}

  @impl true
  def handle_cast({:push, items}, {queue, pos}), do: broker({queue ++ items, pos})

  @impl true
  def handle_demand(next, {queue, pos}), do: broker({queue, pos + next})

  def broker({queue, pos}) do
    IO.puts("[SimpleProducer.broker] 呼出")
    IO.puts("  [broker] BEFORE: #{pos}")
    {currents, remains} = Enum.split(queue, pos)
    new_pos = pos - length(currents)
    IO.puts("  [broker] AFTER: #{new_pos}")
    {:noreply, currents, {remains, new_pos}}
  end
end

次に、Broadwayモジュールを追加します

handle_message() は、Producerから渡された各データを処理するたびに呼ばれるコールバック関数です

handle_batch() は、データのまとまりをバッチとして処理するたびに呼ばれるコールバック関数です

今回は、Broadwayの基本的な挙動を理解してもらうために、どちらもデバッグ表示するだけです

start_link では、Broadway設定を行いますが、各パラメータの意味は下記の通りです

カテゴリ パラメータ名 意味
producer: Procuderの設定群
  module Procuderとなるモジュールの指定と init() 呼出時の引数
  concurrency マルチコア並列行実行数
※マルチコアの2~4倍で設定されることが多い
  transformer %Broadway.Message{} への変換関数の指定
※Broadwayはこの構造体でしかデータを受け取らない
processors 各データを処理する handle_message() の設定群
  concurrency マルチコア並列実行数
batchers: データの塊をバッチとして処理する handle_batch() の設定群
  batch_size バッチサイズ
  batch_timeout バッチ処理のタイムアウトmsec
  concurrency マルチコア並列実行数

上記以外のパラメータは、下記を参照してください

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

  @impl true
  def handle_message(_processor, message, _context) do
    message
    |> Broadway.Message.update_data(fn x ->
      IO.puts("[Basic.Broadway.handle_messaage] 受信:#{inspect(x)}")
      x
    end)
  end

  @impl true
  def handle_batch(_batcher, messages, _batch_info, _context) do
    IO.puts("[Basic.Broadway.handle_batch] 処理したバッチ数:#{length(messages)}")
    IO.puts("[Basic.Broadway.handle_batch] #{inspect(Enum.map(messages, & &1.data))}")
    messages
  end

  def start_link(_opts) do
    IO.puts("[Basic.Broadway.start_link] 起動")
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {SimpleProducer, :init},
        concurrency: 3,
        transformer: {__MODULE__, :transform, []}
      ],
      processors: [
        default: [
          max_demand: 6,
          concurrency: 2
        ]
      ],
      batchers: [
        default: [
          batch_size: 5,
          batch_timeout: 100,
          concurrency: 8
        ]
      ]
    )
  end

  def transform(event, _opts) do
    %Broadway.Message{data: event, acknowledger: {__MODULE__, :ack_id, :ack_data}}
  end

  def ack(:ack_id, _successful, _failed), do: :ok
end

それから、Broadwayを常時起動させるために、Applicationモジュールを追加します

basic/lib/application.ex
defmodule Basic.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Basic.Broadway, []}
    ]

    opts = [strategy: :one_for_one, name: Basic.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

最後に、Applicationモジュールをiex起動時に起動することで、Broadwayを常時サーバー起動するよう設定します

なおBroadwayコードの変更は、iex起動時のみ反映されるため、recompile しただけではダメで、都度iexを起動し直してください(ExSyncでのBroadwayリロードもどこかで解説するかも知れません)

basic/mix.exs
defmodule Basic.MixProject do
  use Mix.Project

  def application do
    [
+     mod: {Basic.Application, []},
      extra_applications: [:logger]

iexを起動すると、上記で作成したBroadwayも起動します

iex -S mix
[Basic.Broadway.start_link] 起動
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 6
  [broker] BEFORE: 6
  [broker] BEFORE: 6
  [broker] AFTER: 6
  [broker] AFTER: 6
  [broker] AFTER: 6
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 12
  [broker] BEFORE: 12
  [broker] BEFORE: 12
  [broker] AFTER: 12
  [broker] AFTER: 12
  [broker] AFTER: 12

broker() でデバッグしている [broker] BEFORE はデータ流し込み前のデータ位置、[broker] AFTER はデータ流し込み後のデータ位置を示しますが、1回あたり6データ分を3個 x 2回繰り返ししているのは、上記Broadway設定の producer: [max_demand: 6]producer: [concurrency: 3]processors: [concurrency: 2] によるものです

つまり、Broadwayのバックプレッシャー(Broadwayからの要求によってProducerがデータを流し込む)は、concurrency 設定に従ってマルチコア並列実行されるということです

Broadwayをマルチコア並列で動かす

1~15の整数リストをカスタムProducerの push() でキューイングして、Broadwayでバックプレッシャー(と言っても受信したことやバッチ処理していることをデバッグしているだけですが …)すると、下記のような結果となります

handle_message() がマルチコア並列処理されていて、順不同にデータ処理されていることが分かります … カスタムProducerの段階では、1~15が順番通りなので、Broadwayに渡ってから順不同となっています

当然、handle_batch() においても順不同です

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:7
[Basic.Broadway.handle_messaage] 受信:1
[Basic.Broadway.handle_messaage] 受信:8
[Basic.Broadway.handle_messaage] 受信:2
[Basic.Broadway.handle_messaage] 受信:9
[Basic.Broadway.handle_messaage] 受信:3
[Basic.Broadway.handle_messaage] 受信:10
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:4
[Basic.Broadway.handle_messaage] 受信:11
[Basic.Broadway.handle_batch] 処理したバッチ数:5
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:5
[Basic.Broadway.handle_messaage] 受信:12
[Basic.Broadway.handle_batch] [7, 8, 9, 1, 2]
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:6
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:13
[Basic.Broadway.handle_batch] 処理したバッチ数:5
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:14
[Basic.Broadway.handle_batch] [3, 10, 11, 12, 4]
  [broker] AFTER: 3
[Basic.Broadway.handle_messaage] 受信:15
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
  [broker] BEFORE: 6
[Basic.Broadway.handle_batch] [5, 6, 13, 14, 15]
  [broker] AFTER: 6
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 9
  [broker] AFTER: 9
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 12
  [broker] AFTER: 12

裏側では、SimpleProducer.brokerがpush時に1回、その後はデータが6件消化されるたびに呼ばれています(デバッグが6件毎では無いのは、裏でBoardwayが並列実行されてデバッグが遅れて出ている影響です)

また、バッチサイズ5なので、5データ毎に1回、handle_batch() が走っていることも分かります(デバッグが5データ毎では無いのは、裏で次のProducerが並列実行されてデバッグが遅れて出ている影響です)

なお、途中で :ok が出ているのは、Producerの handle_cast() が非同期実行された結果によるものです

今回のBroadwayは、リストであればどのようなデータでも渡せるので、下記のようにマップも渡せます

iex> SimpleProducer.push(for i <- 1..15, do: %{id: i, name: "Item-#{Enum.random(1..50)}"})
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:%{id: 7, name: "Item-28"}
[Basic.Broadway.handle_messaage] 受信:%{id: 1, name: "Item-2"}
[Basic.Broadway.handle_messaage] 受信:%{id: 8, name: "Item-50"}
[Basic.Broadway.handle_messaage] 受信:%{id: 2, name: "Item-2"}
[Basic.Broadway.handle_messaage] 受信:%{id: 9, name: "Item-32"}
[Basic.Broadway.handle_messaage] 受信:%{id: 3, name: "Item-12"}
[Basic.Broadway.handle_messaage] 受信:%{id: 10, name: "Item-49"}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:%{id: 4, name: "Item-31"}
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[Basic.Broadway.handle_messaage] 受信:%{id: 11, name: "Item-38"}
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:%{id: 5, name: "Item-2"}
[Basic.Broadway.handle_batch] [%{id: 7, name: "Item-28"}, %{id: 8, name: "Item-50"}, %{id: 9, name: "Item-32"}, %{id: 1, name: "Item-2"}, %{id: 2, name: "Item-2"}]
[Basic.Broadway.handle_messaage] 受信:%{id: 12, name: "Item-38"}
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:%{id: 6, name: "Item-10"}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:%{id: 13, name: "Item-30"}
[Basic.Broadway.handle_batch] 処理したバッチ数:5
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:%{id: 14, name: "Item-45"}
[Basic.Broadway.handle_batch] [%{id: 3, name: "Item-12"}, %{id: 10, name: "Item-49"}, %{id: 11, name: "Item-38"}, %{id: 12, name: "Item-38"}, %{id: 4, name: "Item-31"}]
  [broker] AFTER: 3
[Basic.Broadway.handle_messaage] 受信:%{id: 15, name: "Item-47"}
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 6
[Basic.Broadway.handle_batch] 処理したバッチ数:5
  [broker] AFTER: 6
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] [%{id: 5, name: "Item-2"}, %{id: 6, name: "Item-10"}, %{id: 13, name: "Item-30"}, %{id: 14, name: "Item-45"}, %{id: 15, name: "Item-47"}]
  [broker] BEFORE: 9
  [broker] AFTER: 9
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 12
  [broker] AFTER: 12

終わりに

Broadwayを、最もシンプルなコードとカスタムProducerで解説しました

思ったより気軽にBroadwayによるバックプレッシャーと並列データ処理が使えたことが伝われば幸いです

なお、ここまでだとFlowやGenStageとさほど変わらないので、次回からBroadwayならではのフィーチャを扱っていきます

p.s.このコラムが、面白かったり、役に立ったら…

image.png で応援よろしくお願いします :bow:


明日も私で 「Broadwayを嗜む②:データ順を維持しながら並行・並列を高めるコントロール」 です

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?