この記事は、Elixir Advent Calendar 2025 その5 の4日目です
昨日は私で 「Broadwayを嗜む②:データ順を維持しながら並行・並列を高めるコントロール」 でした
piacere です、ご覧いただいてありがとございます ![]()
前回 から引き続き、Elixirのパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、今回はProducerのレートリミッターとbatcherのタイムアウトによるQoS(Quality of Service:ネットワーク通信の品質の保証)について解説します
なお、初回コラムの続きから行いますので、下記を先に実施しておいてください
この「Broadwayを嗜む」シリーズのコラムは下記です
① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ ProducerのレートリミッターとbatcherのタイムアウトによるQoS
④ カスタムProducerをリストから本物のキューに換装する
⑤ マルチコア並列をベンチマークする
⑥ パーティショニングによるマルチユーザー対応
⑦ 異常データが来たときはリトライしたり、除外してログ出力
このコラムが面白かったり、役に立ったら、
で応援よろしくお願いします ![]()
Elixirアドベントカレンダー、応援お願いします
今年もやっています
ProducerのレートリミッターによるQoS
Producerのレートリミッター設定は、producer: rate_limiting を設定するだけです
ここでは、1秒間隔(interval: 1000)に2データ(allowed_messages: 2)だけ、ProducerからBroadwayに渡す制限を入れてみます
defmodule Basic.Broadway do
use Broadway
…
producer: [
module: {SimpleProducer, :init},
concurrency: 3,
+ rate_limiting: [
+ allowed_messages: 2,
+ interval: 1000,
+ ],
transformer: {__MODULE__, :transform, []}
…
下記 push() を行うと、1秒間に2データずつ処理されていることが分かります
このように、Producer段階でQoSが可能です
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 12
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Sequencer.process] index:1: data:2
[Basic.Broadway.handle_batch] 処理したバッチ数:2
[Basic.Broadway.handle_batch] [1, 2]
+(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Sequencer.process] index:3: data:4
[Basic.Broadway.handle_batch] 処理したバッチ数:2
[Basic.Broadway.handle_batch] [3, 4]
+(1秒止まる)
…
batcherのタイムアウトによるQoS
ここでレートリミッター無の前回までは、バッチのデバッグが遅れて出ていましたが、レートリミッターを入れると、遅れずに出るようになっていますが、一方で、バッチサイズ(batchers.batch_size: 5)に満たないのにバッチが実行されています
これは、レートリミッターで1秒待っている間に、バッチ側のタイムアウト(batchers.batch_timeout: 100)が先に起こり、そのタイミングでバッチ処理が実行されているという挙動となります
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 12
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Sequencer.process] index:1: data:2
+[Basic.Broadway.handle_batch] 処理したバッチ数:2
+[Basic.Broadway.handle_batch] [1, 2]
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Sequencer.process] index:3: data:4
+[Basic.Broadway.handle_batch] 処理したバッチ数:2
+[Basic.Broadway.handle_batch] [3, 4]
(1秒止まる)
何のためにこのような挙動になっているかと言うと、Producer側からデータが途絶えたときでも、後続のBroadwayを止めず、そのときまでに来ているデータで処理させるためのメカニズムです
つまり、バッチ段階でもQoSが可能なことを意味します
データが一定数溜まるまでBroadwayに永続的に待たせたい場合は、batch_timeout を巨大な数字にすればOKです(なお設定を消すと、デフォルトの1秒で設定されるので気を付けてください)
defmodule Basic.Broadway do
use Broadway
…
batchers: [
default: [
batch_size: 5,
- batch_timeout: 100,
+ batch_timeout: 10000,
concurrency: 8
…
ここでは batch_timeout を10秒に設定したので、1秒間隔でProducerが送り出す一方、バッチサイズである5件が溜まるまでバッチは待つようになります
iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
[broker] BEFORE: 10
[broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Basic.Broadway.handle_messaage] 受信:{2, 1}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Basic.Broadway.handle_messaage] 受信:{4, 3}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{5, 4}
[Basic.Broadway.handle_messaage] 受信:{6, 5}
+[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Basic.Broadway.handle_batch] [1, 2, 3, 4, 5]
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{7, 6}
[Basic.Broadway.handle_messaage] 受信:{8, 7}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{9, 8}
[Basic.Broadway.handle_messaage] 受信:{10, 9}
+[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Basic.Broadway.handle_batch] [6, 7, 8, 9, 10]
[SimpleProducer.broker] 呼出
[broker] BEFORE: 4
[broker] AFTER: 0
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{11, 10}
[Basic.Broadway.handle_messaage] 受信:{12, 11}
(1秒止まる)
…
processorsでもQoSできる
なお、今回のような小規模データだと実感できないのですが、初回で解説した、Broadwayが1度に処理するデータ数を制御する processors.max_demand でもQoSできます
これは、Broadwayの「マルチコア並列をベンチマーク」する回で大量データを扱うので、その際に解説しようと思います
終わりに
BroadwayでQoSをするためのメカニズムである、Producerのレートリミッターとbatcherのタイムアウトを見てきました
FlowやGenStageでは、ここまで気軽にQoSをコントロールできないので、まさにBroadwayならではの機能です
ここまでで、Broadway側の処理はパーティショニングと耐障害性以外の正常系は一通り攻略したので、次回は、単一リストで非効率なままのProducer側を改善しようと思います
p.s.このコラムが、面白かったり、役に立ったら…
明日も私で 「Broadwayを嗜む④:カスタムProducerをリストから本物のキューに換装する」 です