この記事は、Elixir Advent Calendar 2025 その5 の3日目です
昨日は私で 「Broadwayを嗜む①:『パーフェクトな並行データフレームワーク』の基本的な使い方」 でした
piacere です、ご覧いただいてありがとございます ![]()
前回 から引き続き、Elixirのパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、今回はデータ順を維持しつつ並行・並列を高める以下4つの方法のうち、上から3つを紹介します
- a)processorsの並行数を1にする
- b)データに通番を振り、batchersの並行数を1にする
- c)後続に待機Agentを置いてシーケンシャル処理
- d)processorsでパーティション毎の追い越し抑制
なお、前回コラムの続きから行いますので、下記を先に実施しておいてください
この「Broadwayを嗜む」シリーズのコラムは下記です
① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ カスタムProducerをリストから本物のキューに換装する
④ マルチコア並列をベンチマークする
⑤ パーティショニングによるマルチユーザー対応
⑥ 異常データが来たときはリトライしたり、除外してログ出力
このコラムが面白かったり、役に立ったら、
で応援よろしくお願いします ![]()
Elixirアドベントカレンダー、応援お願いします
今年もやっています
事前準備:ExSyncでBroadway変更の自動反映
今後はBroadwayの改修を割と頻繁に行いますが、イチイチiexを再起動するのが面倒なので、ExSyncでリコンパイル+リロードを賄うようにします
まず、ExSyncをインストールします
defmodule Basic.MixProject do
use Mix.Project
…
defp deps do
[
+ {:exsync, "~> 0.0"},
{:broadway, "~> 1.0"}
# {:dep_from_hexpm, "~> 0.3.0"},
…
mix deps.get
Broadwayの停止と再起動を行うExSync設定を追加します
defmodule Reloader do
def start_watch(), do: watch_loop(last_timestamp())
def watch_loop(last_time) do
current_time = last_timestamp()
if current_time > last_time, do: reload()
Process.sleep(1000)
watch_loop(current_time)
end
def last_timestamp() do
Path.wildcard("lib/**/*.ex")
|> Enum.map(&File.stat!(&1).mtime)
|> Enum.max(fn -> 0 end)
end
def reload() do
IEx.Helpers.recompile()
if pid = Process.whereis(Basic.Broadway) do
GenServer.stop(pid)
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, _, _} -> :ok
after 1000 -> :ok
end
end
Basic.Broadway.start_link([])
end
end
Task.start(fn -> Reloader.start_watch() end)
Ctrl+cでiexを一度落とし、下記で再起動すると、あらゆるコード変更をファイル保存するたびにリコンパイルが走り、Broadwayもリロードされるようになります
iex -S mix
試しに、Broadwayリロード時に設定をデバッグするように変更します(ファイル保存はしないでください)
defmodule Basic.Broadway do
use Broadway
@doc """
iex> Basic.Broadway.reload()
:ok
"""
def reload() do
if Process.whereis(__MODULE__), do: GenServer.stop(__MODULE__)
start_link([])
end
…
def start_link(_opts) do
- IO.puts("[Basic.Broadway.start_link] 起動")
- Broadway.start_link(__MODULE__, settings)
+ settings = [
name: __MODULE__,
producer: [
module: {SimpleProducer, :init},
concurrency: 3,
transformer: {__MODULE__, :transform, []}
],
processors: [
default: [
max_demand: 6,
concurrency: 1
]
],
batchers: [
default: [
batch_size: 5,
batch_timeout: 100,
concurrency: 8
]
]
]
+ |> then(& IO.puts("[Basic.Broadway.start_link] 起動\n#{inspect(&1)}"))
+ Broadway.start_link(__MODULE__, settings)
…
ファイル保存をすると、下記のようにリコンパイルが走り、Broadwayのリロードも行われ、Broadway設定や関数が刷新されるようになります
16:30:52.949 [debug] [exsync] running mix compile
Compiling 2 files (.ex)
Generated basic app
[Basic.Broadway.start_link] 起動
[
name: Basic.Broadway,
producer: [
module: {SimpleProducer, :init},
concurrency: 2,
transformer: {Basic.Broadway, :transform, []}
],
processors: [default: [max_demand: 6, concurrency: 2]],
batchers: [default: [batch_size: 10000, batch_timeout: 100, concurrency: 1]]
]
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[broker] BEFORE: 6
[broker] BEFORE: 6
[broker] AFTER: 6
[broker] AFTER: 6
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
[broker] BEFORE: 12
[broker] BEFORE: 12
[broker] AFTER: 12
[broker] AFTER: 12
16:30:53.801 [debug] [exsync] reload module Elixir.Basic.Broadway
16:30:53.809 [debug] [exsync] reload module Elixir.SimpleProducer
16:30:53.864 [debug] [exsync] reload complete
a)processorsの並行数を1にする
最も簡単な方法は、processors の並行度を1にすることです
並行・並列が不要な、小規模PubSubやノンマルチユーザーのデータストリーミング等を行いたいならコレで充分です
ただし、データ処理を並行・並列で行いたい中規模以上PubSubやマルチユーザーに対するデータストリーミング、CPUネックやI/Oネックのある処理を行う場合は、この方法だとブロックされてしまうため、以降の方法が必要です
Producer側でデータ順が荒れるケースにも向いていませんので、その場合は次の方法が有効です
それでは話を戻して、processors の並行数を1に変更しましょう
defmodule Basic.Broadway do
use Broadway
…
processors: [
default: [
max_demand: 6,
- concurrency: 2
+ concurrency: 1
…
ファイル保存し、Broadwayリロードされた後、下記 push() を行うと、今度はデータ順通りに処理されました
handle_batch() だけで無く、handle_message() の方もデータ順通りになっています
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 6
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:1
[Basic.Broadway.handle_messaage] 受信:2
[Basic.Broadway.handle_messaage] 受信:3
[Basic.Broadway.handle_messaage] 受信:4
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:5
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:6
[broker] AFTER: 0
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:7
+[Basic.Broadway.handle_batch] [1, 2, 3, 4, 5]
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:8
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:9
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:10
[broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:11
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:12
[Basic.Broadway.handle_messaage] 受信:13
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[Basic.Broadway.handle_messaage] 受信:14
[broker] BEFORE: 3
+[Basic.Broadway.handle_batch] [6, 7, 8, 9, 10]
[Basic.Broadway.handle_messaage] 受信:15
[broker] AFTER: 3
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[SimpleProducer.broker] 呼出
+[Basic.Broadway.handle_batch] [11, 12, 13, 14, 15]
[broker] BEFORE: 6
[broker] AFTER: 6
b)データに通番を振り、batchersの並行数を1にする
次は、Produce側でデータに通番を振り、processors では並行処理し、batchers では並行数1/バッチサイズをデータサイズ以上にすることで通番通りに処理する方法です
データ処理を並行・並列したい中規模以上PubSubやマルチユーザーに対するデータストリーミングでは、この方法を使えば出口である batchers でデータ順通りの出力が出来ます
ただし、processors の全データが揃うのを待つ必要があり、更に batchers での各データ処理をシーケンシャルに行う訳では無いため、たとえば「並行で複数チャンク文字列を音声作成 → 前の順の音声生成が終わってから続けて次の音声再生」と言うようなストリーミングを行うには向かず、以降の方法が必要です
それでは話を戻して、Producer側コードで通番をデータに振りましょう
defmodule SimpleProducer do
use GenStage
…
def handle_cast({:push, items}, {queue, pos}) do
- broker({queue ++ items, pos})
+ broker({queue ++ Enum.with_index(items, pos), pos})
end
そして、handle_batch() で通番通りに整え、通番を削除します
これを行う際は、processors の並行数は戻し、batchers の並行数を1、バッチサイズを巨大に変更します
defmodule Basic.Broadway do
use Broadway
…
@impl true
def handle_batch(_batcher, messages, _batch_info, _context) do
+ messages = Enum.sort_by(messages, &elem(&1.data, 1))
+ |> Enum.map(&Map.put(&1, :data, elem(&1.data, 0)))
IO.puts("[Basic.Broadway.handle_batch] 処理したバッチ数:#{length(messages)}")
…
processors: [
default: [
max_demand: 6,
- concurrency: 1
+ concurrency: 2
…
batchers: [
default: [
- batch_size: 5,
+ batch_size: 10000,
batch_timeout: 100,
- concurrency: 8
+ concurrency: 1
…
下記 push() を行うと、データ順通りに処理されました
それでいて、processors はマルチコア並列で動くので、上記①よりも高速に処理されます
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 12
…
[broker] AFTER: 12
[Basic.Broadway.handle_batch] 処理したバッチ数:15
+[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
c)後続に待機Agentを置いてシーケンシャル処理
次は、Produce側でデータに通番を振った順で、後続に置いた待機Agentでシーケンシャル処理する方法です
中規模以上PubSubやマルチユーザーに対するデータストリーミングを行うけど、たまにPubSub横断やマルチユーザー横断での順次処理を行いたいときや、「並行で複数チャンク文字列を音声作成 → 前の順の音声生成が終わってから続けて次の音声再生」と言うようなストリーミングを行いたいときは、この方法が有効です
ただし、ユーザー毎にデータ順通りにしたい等、何らかのグルーピングやパーティショニングを必要とする場合は、やや煩雑なロジックを組む必要があるため、以降の方法が良いでしょう
それでは話を戻して、待機AgentとなるSequencerモジュールを追加してみます
defmodule Basic.Sequencer do
use Agent
def start_link(_opts), do: Agent.start_link(fn -> {1, %{}} end, name: __MODULE__)
def push({data, index}) do
Agent.update(__MODULE__, fn {next_index, buffer} ->
new_buffer = Map.put(buffer, index, data)
drain(next_index, new_buffer)
end)
end
def drain(next_index, buffer) do
case Map.pop(buffer, next_index) do
{nil, _} -> {next_index, buffer}
{data, remaining_buffer} ->
process(next_index, data)
drain(next_index + 1, remaining_buffer)
end
end
def process(index, data) do
IO.puts("[Sequencer.process] index:#{index}: data:#{inspect(data)}")
end
end
次に、handle_message() で Basic.Sequencer.push() を呼び、上記Sequencerに積み込みます
積み込まれたデータは、データ順通りに1件ずつ Basic.Sequencer.drain() で取り出され、process での処理が終わるまで待ち、終わり次第、次の1件と処理を数珠繋ぎに繰り返します
なお、今回の対応には無関係となった handle_batch() のソートと batchers の並行数/バッチサイズも元に戻しておきます
defmodule Basic.Broadway do
use Broadway
…
def handle_message(_processor, message, _context) do
- message = message
+ message = message
|> Broadway.Message.update_data(message, fn x ->
IO.puts("[Basic.Broadway.handle_messaage] 受信:#{inspect(x)}")
x
end)
+ Basic.Sequencer.push(message.data)
+ message
end
def handle_batch(_batcher, messages, _batch_info, _context) do
- messages = Enum.sort_by(messages, &elem(&1.data, 1))
- |> Enum.map(&Map.put(&1, :data, elem(&1.data, 0)))
IO.puts("[Basic.Broadway.handle_batch] 処理したバッチ数:#{length(messages)}")
…
batchers: [
default: [
- batch_size: 10000,
+ batch_size: 5,
batch_timeout: 100,
- concurrency: 1
+ concurrency: 8
…
Sequencerの停止と再起動を行うExSync設定を追加します
defmodule Reloader do
def start_watch(), do: watch_loop(last_timestamp())
…
def reload() do
IEx.Helpers.recompile()
+ [Basic.Broadway, Basic.Sequencer]
+ |> Enum.map(fn n ->
+ if pid = Process.whereis(n) do
- if pid = Process.whereis(Basic.Broadway) do
GenServer.stop(pid)
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, _, _} -> :ok
after 1000 -> :ok
end
end
+ apply(n, :start_link, [[]])
+ end)
- Basic.Broadway.start_link([])
end
end
Task.start(fn -> Reloader.start_watch() end)
起動時にSequencerを常時起動させる設定も追加しておきます
defmodule Basic.Application do
use Application
def start(_type, _args) do
children = [
+ {Basic.Sequencer, []},
{Basic.Broadway, []}
…
下記 push() を行うと、データ順通りに処理されました
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
[broker] BEFORE: 12
[broker] AFTER: 0
:ok
[Basic.Broadway.handle_messaage] 受信:{7, 6}
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Basic.Broadway.handle_messaage] 受信:{8, 7}
+[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Basic.Broadway.handle_messaage] 受信:{9, 8}
+[Sequencer.process] index:1: data:2
[Basic.Broadway.handle_messaage] 受信:{3, 2}
+[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Basic.Broadway.handle_messaage] 受信:{10, 9}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Sequencer.process] index:3: data:4
[broker] BEFORE: 3
[Basic.Broadway.handle_batch] [1, 2, 3, 7, 8]
[Basic.Broadway.handle_messaage] 受信:{5, 4}
[Basic.Broadway.handle_messaage] 受信:{11, 10}
[broker] AFTER: 0
+[Sequencer.process] index:4: data:5
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:{6, 5}
[Basic.Broadway.handle_messaage] 受信:{12, 11}
[broker] BEFORE: 3
+[Sequencer.process] index:5: data:6
[broker] AFTER: 3
+[Sequencer.process] index:6: data:7
+[Sequencer.process] index:7: data:8
+[Sequencer.process] index:8: data:9
+[Sequencer.process] index:9: data:10
+[Sequencer.process] index:10: data:11
+[Sequencer.process] index:11: data:12
[Basic.Broadway.handle_messaage] 受信:{13, 12}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Sequencer.process] index:12: data:13
[broker] BEFORE: 6
[Basic.Broadway.handle_batch] [4, 5, 6, 9, 10]
[Basic.Broadway.handle_messaage] 受信:{14, 13}
[broker] AFTER: 6
+[Sequencer.process] index:13: data:14
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:{15, 14}
[broker] BEFORE: 9
+[Sequencer.process] index:14: data:15
[broker] AFTER: 9
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] [11, 12, 13, 14, 15]
[broker] BEFORE: 12
[broker] AFTER: 12
終わりに
Broadwayでデータ順を維持しながら並行・並列を高めるコントロールについて、3つの方法を解説しました
FlowやGenStageだと難儀するようなコントロールが、Broadwayではエレガントに組めます
ここまでで、Broadway側の処理はパーティショニングと耐障害性以外の正常系は一通り攻略したので、次回は、単一リストで非効率なままのProducer側を改善しようと思います
p.s.このコラムが、面白かったり、役に立ったら…
明日も私で 「Broadwayを嗜む③:カスタムProducerをリストから本物のキューに換装する」 です