この記事は、Elixir Advent Calendar 2025 その5 の2日目です
昨日は @torifukukaiou さんで 「Day 11: ReactorをElixirで解くことを楽しむ」 でした
piacere です、ご覧いただいてありがとございます ![]()
Elixirでバックプレッシャー(後続が処理可能なデータを必要なだけ流す機構)と並列・並列データ処理、レートリミッター(流量のコントロールやQoS担保)、データレベルの耐障害性(安全なシャットダウン、障害処理のカスタマイズ)、Telemetryでの性能可視化、そしてテスト容易 … とパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、最もシンプルなコードで解説します
この「Broadwayを嗜む」シリーズのコラムは下記です
① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ カスタムProducerをリストから本物のキューに換装する
④ マルチコア並列をベンチマークする
⑤ パーティショニングによるマルチユーザー対応
⑥ 異常データが来たときはリトライしたり、除外してログ出力
このコラムが面白かったり、役に立ったら、
で応援よろしくお願いします ![]()
Elixirアドベントカレンダー、応援お願いします
今年もやっています
Broadwayを動かすElixir PJ作成
Elixir PJを作成します
mix new basic
cd basic
Broadwayをインストールします
defmodule Basic.MixProject do
use Mix.Project
…
defp deps do
[
+ {:broadway, "~> 1.0"}
# {:dep_from_hexpm, "~> 0.3.0"},
…
mix deps.get
カスタムProducerとデータを流し込まれるBroadway
Broadwayは、Amazon SQSやKafka等をProducerとする例が良く見られますが、自分でカスタムProducerも作れるので、最もシンプルにBroadwayを理解するためのカスタムProducerを追加してみます
カスタムProducerは、下記コードのように GenStage を使って作れます
init で初期化し、handle_demand でBroadwayからの要求を受け取ってBroadwayにデータが流し込まれます(この処理をバックプレッシャーと言います)
defmodule SimpleProducer do
use GenStage
def start_link(initial_state) do
GenStage.start_link(__MODULE__, initial_state, name: __MODULE__)
end
@name Module.concat([Basic.Broadway, Broadway.Producer_0])
def push(items) when is_list(items), do: GenServer.cast(@name, {:push, items})
@impl true
def init(:init), do: {:producer, {[], 0}}
@impl true
def handle_cast({:push, items}, {queue, pos}), do: broker({queue ++ items, pos})
@impl true
def handle_demand(next, {queue, pos}), do: broker({queue, pos + next})
def broker({queue, pos}) do
IO.puts("[SimpleProducer.broker] 呼出")
IO.puts(" [broker] BEFORE: #{pos}")
{currents, remains} = Enum.split(queue, pos)
new_pos = pos - length(currents)
IO.puts(" [broker] AFTER: #{new_pos}")
{:noreply, currents, {remains, new_pos}}
end
end
次に、Broadwayモジュールを追加します
handle_message() は、Producerから渡された各データを処理するたびに呼ばれるコールバック関数です
handle_batch() は、データのまとまりをバッチとして処理するたびに呼ばれるコールバック関数です
今回は、Broadwayの基本的な挙動を理解してもらうために、どちらもデバッグ表示するだけです
start_link では、Broadway設定を行いますが、各パラメータの意味は下記の通りです
| カテゴリ | パラメータ名 | 意味 |
|---|---|---|
| producer: | - | Procuderの設定群 |
| module | Procuderとなるモジュールの指定と init() 呼出時の引数 |
|
| concurrency | マルチコア並列行実行数 ※マルチコアの2~4倍で設定されることが多い |
|
| transformer |
%Broadway.Message{} への変換関数の指定※Broadwayはこの構造体でしかデータを受け取らない |
|
| processors | - | 各データを処理する handle_message() の設定群 |
| concurrency | マルチコア並列実行数 | |
| batchers: | - | データの塊をバッチとして処理する handle_batch() の設定群 |
| batch_size | バッチサイズ | |
| batch_timeout | バッチ処理のタイムアウトmsec | |
| concurrency | マルチコア並列実行数 |
上記以外のパラメータは、下記を参照してください
defmodule Basic.Broadway do
use Broadway
@impl true
def handle_message(_processor, message, _context) do
message
|> Broadway.Message.update_data(fn x ->
IO.puts("[Basic.Broadway.handle_messaage] 受信:#{inspect(x)}")
x
end)
end
@impl true
def handle_batch(_batcher, messages, _batch_info, _context) do
IO.puts("[Basic.Broadway.handle_batch] 処理したバッチ数:#{length(messages)}")
IO.puts("[Basic.Broadway.handle_batch] #{inspect(Enum.map(messages, & &1.data))}")
messages
end
def start_link(_opts) do
IO.puts("[Basic.Broadway.start_link] 起動")
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {SimpleProducer, :init},
concurrency: 3,
transformer: {__MODULE__, :transform, []}
],
processors: [
default: [
max_demand: 6,
concurrency: 2
]
],
batchers: [
default: [
batch_size: 5,
batch_timeout: 100,
concurrency: 8
]
]
)
end
def transform(event, _opts) do
%Broadway.Message{data: event, acknowledger: {__MODULE__, :ack_id, :ack_data}}
end
def ack(:ack_id, _successful, _failed), do: :ok
end
それから、Broadwayを常時起動させるために、Applicationモジュールを追加します
defmodule Basic.Application do
use Application
def start(_type, _args) do
children = [
{Basic.Broadway, []}
]
opts = [strategy: :one_for_one, name: Basic.Supervisor]
Supervisor.start_link(children, opts)
end
end
最後に、Applicationモジュールをiex起動時に起動することで、Broadwayを常時サーバー起動するよう設定します
なおBroadwayコードの変更は、iex起動時のみ反映されるため、recompile しただけではダメで、都度iexを起動し直してください(ExSyncでのBroadwayリロードもどこかで解説するかも知れません)
defmodule Basic.MixProject do
use Mix.Project
…
def application do
[
+ mod: {Basic.Application, []},
extra_applications: [:logger]
…
iexを起動すると、上記で作成したBroadwayも起動します
iex -S mix
[Basic.Broadway.start_link] 起動
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[broker] BEFORE: 6
[broker] BEFORE: 6
[broker] BEFORE: 6
[broker] AFTER: 6
[broker] AFTER: 6
[broker] AFTER: 6
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[broker] BEFORE: 12
[broker] BEFORE: 12
[broker] BEFORE: 12
[broker] AFTER: 12
[broker] AFTER: 12
[broker] AFTER: 12
broker() でデバッグしている [broker] BEFORE はデータ流し込み前のデータ位置、[broker] AFTER はデータ流し込み後のデータ位置を示しますが、1回あたり6データ分を3個 x 2回繰り返ししているのは、上記Broadway設定の producer: [max_demand: 6] と producer: [concurrency: 3]、 processors: [concurrency: 2] によるものです
つまり、Broadwayのバックプレッシャー(Broadwayからの要求によってProducerがデータを流し込む)は、concurrency 設定に従ってマルチコア並列実行されるということです
Broadwayをマルチコア並列で動かす
1~15の整数リストをカスタムProducerの push() でキューイングして、Broadwayでバックプレッシャー(と言っても受信したことやバッチ処理していることをデバッグしているだけですが …)すると、下記のような結果となります
handle_message() がマルチコア並列処理されていて、順不同にデータ処理されていることが分かります … カスタムProducerの段階では、1~15が順番通りなので、Broadwayに渡ってから順不同となっています
当然、handle_batch() においても順不同です
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 12
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:7
[Basic.Broadway.handle_messaage] 受信:1
[Basic.Broadway.handle_messaage] 受信:8
[Basic.Broadway.handle_messaage] 受信:2
[Basic.Broadway.handle_messaage] 受信:9
[Basic.Broadway.handle_messaage] 受信:3
[Basic.Broadway.handle_messaage] 受信:10
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:4
[Basic.Broadway.handle_messaage] 受信:11
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:5
[Basic.Broadway.handle_messaage] 受信:12
[Basic.Broadway.handle_batch] [7, 8, 9, 1, 2]
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:6
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:13
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:14
[Basic.Broadway.handle_batch] [3, 10, 11, 12, 4]
[broker] AFTER: 3
[Basic.Broadway.handle_messaage] 受信:15
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[broker] BEFORE: 6
[Basic.Broadway.handle_batch] [5, 6, 13, 14, 15]
[broker] AFTER: 6
[SimpleProducer.broker] 呼出
[broker] BEFORE: 9
[broker] AFTER: 9
[SimpleProducer.broker] 呼出
[broker] BEFORE: 12
[broker] AFTER: 12
裏側では、SimpleProducer.brokerがpush時に1回、その後はデータが6件消化されるたびに呼ばれています(デバッグが6件毎では無いのは、裏でBoardwayが並列実行されてデバッグが遅れて出ている影響です)
また、バッチサイズ5なので、5データ毎に1回、handle_batch() が走っていることも分かります(デバッグが5データ毎では無いのは、裏で次のProducerが並列実行されてデバッグが遅れて出ている影響です)
なお、途中で :ok が出ているのは、Producerの handle_cast() が非同期実行された結果によるものです
今回のBroadwayは、リストであればどのようなデータでも渡せるので、下記のようにマップも渡せます
iex> SimpleProducer.push(for i <- 1..15, do: %{id: i, name: "Item-#{Enum.random(1..50)}"})
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 12
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:%{id: 7, name: "Item-28"}
[Basic.Broadway.handle_messaage] 受信:%{id: 1, name: "Item-2"}
[Basic.Broadway.handle_messaage] 受信:%{id: 8, name: "Item-50"}
[Basic.Broadway.handle_messaage] 受信:%{id: 2, name: "Item-2"}
[Basic.Broadway.handle_messaage] 受信:%{id: 9, name: "Item-32"}
[Basic.Broadway.handle_messaage] 受信:%{id: 3, name: "Item-12"}
[Basic.Broadway.handle_messaage] 受信:%{id: 10, name: "Item-49"}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:%{id: 4, name: "Item-31"}
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[Basic.Broadway.handle_messaage] 受信:%{id: 11, name: "Item-38"}
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:%{id: 5, name: "Item-2"}
[Basic.Broadway.handle_batch] [%{id: 7, name: "Item-28"}, %{id: 8, name: "Item-50"}, %{id: 9, name: "Item-32"}, %{id: 1, name: "Item-2"}, %{id: 2, name: "Item-2"}]
[Basic.Broadway.handle_messaage] 受信:%{id: 12, name: "Item-38"}
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:%{id: 6, name: "Item-10"}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:%{id: 13, name: "Item-30"}
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:%{id: 14, name: "Item-45"}
[Basic.Broadway.handle_batch] [%{id: 3, name: "Item-12"}, %{id: 10, name: "Item-49"}, %{id: 11, name: "Item-38"}, %{id: 12, name: "Item-38"}, %{id: 4, name: "Item-31"}]
[broker] AFTER: 3
[Basic.Broadway.handle_messaage] 受信:%{id: 15, name: "Item-47"}
[SimpleProducer.broker] 呼出
[broker] BEFORE: 6
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[broker] AFTER: 6
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] [%{id: 5, name: "Item-2"}, %{id: 6, name: "Item-10"}, %{id: 13, name: "Item-30"}, %{id: 14, name: "Item-45"}, %{id: 15, name: "Item-47"}]
[broker] BEFORE: 9
[broker] AFTER: 9
[SimpleProducer.broker] 呼出
[broker] BEFORE: 12
[broker] AFTER: 12
終わりに
Broadwayを、最もシンプルなコードとカスタムProducerで解説しました
思ったより気軽にBroadwayによるバックプレッシャーと並列データ処理が使えたことが伝われば幸いです
なお、ここまでだとFlowやGenStageとさほど変わらないので、次回からBroadwayならではのフィーチャを扱っていきます