7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Broadwayを嗜む③:ProducerのレートリミッターとbatcherのタイムアウトによるQoS

Last updated at Posted at 2025-12-29

この記事は、Elixir Advent Calendar 2025 その5 の4日目です

昨日は私で 「Broadwayを嗜む②:データ順を維持しながら並行・並列を高めるコントロール」 でした


piacere です、ご覧いただいてありがとございます :bow:

前回 から引き続き、Elixirのパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、今回はProducerのレートリミッターとbatcherのタイムアウトによるQoS(Quality of Service:ネットワーク通信の品質の保証)について解説します

なお、初回コラムの続きから行いますので、下記を先に実施しておいてください

この「Broadwayを嗜む」シリーズのコラムは下記です

① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ ProducerのレートリミッターとbatcherのタイムアウトによるQoS
④ カスタムProducerをリストから本物のキューに換装する
⑤ マルチコア並列をベンチマークする
⑥ パーティショニングによるマルチユーザー対応
⑦ 異常データが来たときはリトライしたり、除外してログ出力

このコラムが面白かったり、役に立ったら、image.png で応援よろしくお願いします :bow:

Elixirアドベントカレンダー、応援お願いします :bow:

今年もやっています

ProducerのレートリミッターによるQoS

Producerのレートリミッター設定は、producer: rate_limiting を設定するだけです

ここでは、1秒間隔(interval: 1000)に2データ(allowed_messages: 2)だけ、ProducerからBroadwayに渡す制限を入れてみます

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

      producer: [
        module: {SimpleProducer, :init},
        concurrency: 3,
+       rate_limiting: [
+         allowed_messages: 2,
+         interval: 1000,
+       ],
        transformer: {__MODULE__, :transform, []}

下記 push() を行うと、1秒間に2データずつ処理されていることが分かります

このように、Producer段階でQoSが可能です

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Sequencer.process] index:1: data:2
[Basic.Broadway.handle_batch] 処理したバッチ数:2
[Basic.Broadway.handle_batch] [1, 2]
+(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Sequencer.process] index:3: data:4
[Basic.Broadway.handle_batch] 処理したバッチ数:2
[Basic.Broadway.handle_batch] [3, 4]
+(1秒止まる)

batcherのタイムアウトによるQoS

ここでレートリミッター無の前回までは、バッチのデバッグが遅れて出ていましたが、レートリミッターを入れると、遅れずに出るようになっていますが、一方で、バッチサイズ(batchers.batch_size: 5)に満たないのにバッチが実行されています

これは、レートリミッターで1秒待っている間に、バッチ側のタイムアウト(batchers.batch_timeout: 100)が先に起こり、そのタイミングでバッチ処理が実行されているという挙動となります

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Sequencer.process] index:1: data:2
+[Basic.Broadway.handle_batch] 処理したバッチ数:2
+[Basic.Broadway.handle_batch] [1, 2]
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Sequencer.process] index:3: data:4
+[Basic.Broadway.handle_batch] 処理したバッチ数:2
+[Basic.Broadway.handle_batch] [3, 4]
(1秒止まる)

何のためにこのような挙動になっているかと言うと、Producer側からデータが途絶えたときでも、後続のBroadwayを止めず、そのときまでに来ているデータで処理させるためのメカニズムです

つまり、バッチ段階でもQoSが可能なことを意味します

データが一定数溜まるまでBroadwayに永続的に待たせたい場合は、batch_timeout を巨大な数字にすればOKです(なお設定を消すと、デフォルトの1秒で設定されるので気を付けてください)

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

      batchers: [
        default: [
          batch_size: 5,
-         batch_timeout: 100,
+         batch_timeout: 10000,
          concurrency: 8

ここでは batch_timeout を10秒に設定したので、1秒間隔でProducerが送り出す一方、バッチサイズである5件が溜まるまでバッチは待つようになります

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 10
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Basic.Broadway.handle_messaage] 受信:{2, 1}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Basic.Broadway.handle_messaage] 受信:{4, 3}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{5, 4}
[Basic.Broadway.handle_messaage] 受信:{6, 5}
+[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Basic.Broadway.handle_batch] [1, 2, 3, 4, 5]
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{7, 6}
[Basic.Broadway.handle_messaage] 受信:{8, 7}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{9, 8}
[Basic.Broadway.handle_messaage] 受信:{10, 9}
+[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Basic.Broadway.handle_batch] [6, 7, 8, 9, 10]
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 4
  [broker] AFTER: 0
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{11, 10}
[Basic.Broadway.handle_messaage] 受信:{12, 11}
(1秒止まる)
…

processorsでもQoSできる

なお、今回のような小規模データだと実感できないのですが、初回で解説した、Broadwayが1度に処理するデータ数を制御する processors.max_demand でもQoSできます

これは、Broadwayの「マルチコア並列をベンチマーク」する回で大量データを扱うので、その際に解説しようと思います

終わりに

BroadwayでQoSをするためのメカニズムである、Producerのレートリミッターとbatcherのタイムアウトを見てきました

FlowやGenStageでは、ここまで気軽にQoSをコントロールできないので、まさにBroadwayならではの機能です

ここまでで、Broadway側の処理はパーティショニングと耐障害性以外の正常系は一通り攻略したので、次回は、単一リストで非効率なままのProducer側を改善しようと思います

p.s.このコラムが、面白かったり、役に立ったら…

image.png で応援よろしくお願いします :bow:


明日も私で 「Broadwayを嗜む④:カスタムProducerをリストから本物のキューに換装する」 です

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?