9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Broadwayを嗜む③:ProducerのレートリミッターとbatcherのタイムアウトによるQoS

Last updated at Posted at 2025-12-29

この記事は、Elixir Advent Calendar 2025 その6 の5日目です

昨日は @RyoWakabayashi さんで 「Google Antigravity に Phoenix LiveView のアプリケーションを設計・実装させてみた」 でした


piacere です、ご覧いただいてありがとございます :bow:

前回 から引き続き、Elixirのパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、今回はProducerのレートリミッターとbatcherのタイムアウトによるQoS(Quality of Service:ネットワーク通信の品質の保証)について解説します

なお、第1回コラムのコードを土台に改善していきますので、下記を先に実施しておいてください

この「Broadwayを嗜む」シリーズのコラムは下記です

① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ ProducerのレートリミッターとbatcherのタイムアウトによるQoS
④ シングルコードからマクロで複数Broadwayを立てる(Elixirマクロの実践利用例)
⑤ カスタムProducerをリストから本物のキューに換装する
⑥ パーティショニングによるマルチユーザー対応
⑦ 異常データが来たときはリトライしたり、除外してログ出力
⑧ マルチコア並列をベンチマークする

このコラムが面白かったり、役に立ったら、image.png で応援よろしくお願いします :bow:

Elixirアドベントカレンダー、応援お願いします :bow:

今年もやっています

ProducerのレートリミッターによるQoS

Producerのレートリミッター設定は、producer: rate_limiting を設定するだけです

ここでは、1秒間隔(interval: 1000)に2データ(allowed_messages: 2)だけ、ProducerからBroadwayに渡す制限を入れてみます

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

      producer: [
        module: {SimpleProducer, :init},
        concurrency: 3,
+       rate_limiting: [
+         allowed_messages: 2,
+         interval: 1000,
+       ],
        transformer: {__MODULE__, :transform, []}

下記 push() を行うと、1秒間に2データずつ処理されていることが分かります

このように、Producer段階でQoSが可能です

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Sequencer.process] index:1: data:2
[Basic.Broadway.handle_batch] 処理したバッチ数:2
[Basic.Broadway.handle_batch] [1, 2]
+(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Sequencer.process] index:3: data:4
[Basic.Broadway.handle_batch] 処理したバッチ数:2
[Basic.Broadway.handle_batch] [3, 4]
+(1秒止まる)

batcherのタイムアウトによるQoS

レートリミッター無の前回までは、バッチのデバッグが遅れて出ていましたが、レートリミッターを入れると、遅れずに出るようになっていますが、一方で、バッチサイズ(batchers.batch_size: 5)に満たないのにバッチが実行されています

これは、レートリミッターで1秒待っている間に、バッチ側のタイムアウト(batchers.batch_timeout: 100)が先に起こり、そのタイミングでバッチ処理が実行されているという挙動となります

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Sequencer.process] index:1: data:2
+[Basic.Broadway.handle_batch] 処理したバッチ数:2
+[Basic.Broadway.handle_batch] [1, 2]
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Sequencer.process] index:3: data:4
+[Basic.Broadway.handle_batch] 処理したバッチ数:2
+[Basic.Broadway.handle_batch] [3, 4]
(1秒止まる)

何のためにこのような挙動になっているかと言うと、Producer側からデータが途絶えたときでも、後続のBroadwayを止めず、そのときまでに来ているデータで処理させるためのメカニズムが生きているからです

これは、バッチ段階でもQoSが可能なことを意味します

データが一定数溜まるまでBroadwayに永続的に待たせたい場合は、batch_timeout を巨大な数字にすればOKです(なお設定を消すと、デフォルトの1秒で設定されるので気を付けてください)

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

      batchers: [
        default: [
          batch_size: 5,
-         batch_timeout: 100,
+         batch_timeout: 10000,
          concurrency: 8

ここでは、Producerが1秒間隔で送り続け、バッチサイズ5件が溜まるまで5秒しかかからないため、batch_timeout に設定された10秒間が有効になることは無い ⋯ つまり、データが一定数溜まるまでBroadwayに永続的に待つ設定となっています

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 10
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Basic.Broadway.handle_messaage] 受信:{2, 1}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{3, 2}
[Basic.Broadway.handle_messaage] 受信:{4, 3}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{5, 4}
[Basic.Broadway.handle_messaage] 受信:{6, 5}
+[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Basic.Broadway.handle_batch] [1, 2, 3, 4, 5]
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{7, 6}
[Basic.Broadway.handle_messaage] 受信:{8, 7}
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{9, 8}
[Basic.Broadway.handle_messaage] 受信:{10, 9}
+[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Basic.Broadway.handle_batch] [6, 7, 8, 9, 10]
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 4
  [broker] AFTER: 0
(1秒止まる)
[Basic.Broadway.handle_messaage] 受信:{11, 10}
[Basic.Broadway.handle_messaage] 受信:{12, 11}
(1秒止まる)
…

processorsでもQoSできる

なお、今回のような小規模データだと実感できないのですが、初回で解説した、Broadwayが1度に処理するデータ数を制御する processors.max_demand でもQoSできます

これは、Broadwayの「マルチコア並列をベンチマーク」する回で大量データを扱うので、その際に解説しようと思います

終わりに

BroadwayでQoSをするためのメカニズムである、Producerのレートリミッターとbatcherのタイムアウトを見てきました

FlowやGenStageでは、ここまで気軽にQoSをコントロールできないので、まさにBroadwayならではの機能です

ここまでで、Broadway側の処理はパーティショニングと耐障害性以外の正常系は一通り攻略したので、次回は設定違いのBroadwayをシングルコードから複数立てるマクロ を紹介します

p.s.このコラムが、面白かったり、役に立ったら…

image.png で応援よろしくお願いします :bow:


明日も私で 「Broadwayを嗜む④:シングルコードからマクロで複数Broadwayを立てる(Elixirマクロの実践利用例)」 です

9
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?