4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Broadwayを嗜む②:データ順を維持しながら並行・並列を高めるコントロール

Last updated at Posted at 2025-12-28

この記事は、Elixir Advent Calendar 2025 その5 の3日目です

昨日は私で 「Broadwayを嗜む①:『パーフェクトな並行データフレームワーク』の基本的な使い方」 でした


piacere です、ご覧いただいてありがとございます :bow:

前回 から引き続き、Elixirのパーフェクトな並行データフレームワークだけど、実践例が(国内では)少ない「Broadway」について、今回はデータ順を維持しつつ並行・並列を高める以下4つの方法のうち、上から3つを紹介します

  • a)processorsの並行数を1にする
  • b)データに通番を振り、batchersの並行数を1にする
  • c)後続に待機Agentを置いてシーケンシャル処理
  • d)processorsでパーティション毎の追い越し抑制

なお、前回コラムの続きから行いますので、下記を先に実施しておいてください

この「Broadwayを嗜む」シリーズのコラムは下記です

① パーフェクトな並行・並列データフレームワーク「Broadway」の基本的な使い方
② データ順を維持しながら並行・並列を高めるコントロール
③ カスタムProducerをリストから本物のキューに換装する
④ マルチコア並列をベンチマークする
⑤ パーティショニングによるマルチユーザー対応
⑥ 異常データが来たときはリトライしたり、除外してログ出力

このコラムが面白かったり、役に立ったら、image.png で応援よろしくお願いします :bow:

Elixirアドベントカレンダー、応援お願いします :bow:

今年もやっています

事前準備:ExSyncでBroadway変更の自動反映

今後はBroadwayの改修を割と頻繁に行いますが、イチイチiexを再起動するのが面倒なので、ExSyncでリコンパイル+リロードを賄うようにします

まず、ExSyncをインストールします

basic/mix.exs
defmodule Basic.MixProject do
  use Mix.Project

  defp deps do
    [
+     {:exsync, "~> 0.0"},
      {:broadway, "~> 1.0"}
      # {:dep_from_hexpm, "~> 0.3.0"},

mix deps.get

Broadwayの停止と再起動を行うExSync設定を追加します

basic/.iex.exs
defmodule Reloader do
  def start_watch(), do: watch_loop(last_timestamp())

  def watch_loop(last_time) do
    current_time = last_timestamp()
    if current_time > last_time, do: reload()
    Process.sleep(1000)
    watch_loop(current_time)
  end

  def last_timestamp() do
    Path.wildcard("lib/**/*.ex")
    |> Enum.map(&File.stat!(&1).mtime)
    |> Enum.max(fn -> 0 end)
  end

  def reload() do
    IEx.Helpers.recompile()
    if pid = Process.whereis(Basic.Broadway) do
      GenServer.stop(pid)
      ref = Process.monitor(pid)
      receive do
        {:DOWN, ^ref, :process, _, _} -> :ok
      after 1000 -> :ok
      end
    end
    Basic.Broadway.start_link([])
  end
end

Task.start(fn -> Reloader.start_watch() end)

Ctrl+cでiexを一度落とし、下記で再起動すると、あらゆるコード変更をファイル保存するたびにリコンパイルが走り、Broadwayもリロードされるようになります

iex -S mix

試しに、Broadwayリロード時に設定をデバッグするように変更します(ファイル保存はしないでください)

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

  @doc """
  iex> Basic.Broadway.reload()
  :ok
  """
  def reload() do
    if Process.whereis(__MODULE__), do: GenServer.stop(__MODULE__)
    start_link([])
  end

  def start_link(_opts) do
-   IO.puts("[Basic.Broadway.start_link] 起動")
-   Broadway.start_link(__MODULE__, settings)
+   settings = [
      name: __MODULE__,
      producer: [
        module: {SimpleProducer, :init},
        concurrency: 3,
        transformer: {__MODULE__, :transform, []}
      ],
      processors: [
        default: [
          max_demand: 6,
          concurrency: 1
        ]
      ],
      batchers: [
        default: [
          batch_size: 5,
          batch_timeout: 100,
          concurrency: 8
        ]
      ]
    ]
+   |> then(& IO.puts("[Basic.Broadway.start_link] 起動\n#{inspect(&1)}"))
+   Broadway.start_link(__MODULE__, settings)

ファイル保存をすると、下記のようにリコンパイルが走り、Broadwayのリロードも行われ、Broadway設定や関数が刷新されるようになります

16:30:52.949 [debug] [exsync] running mix compile
Compiling 2 files (.ex)
Generated basic app
[Basic.Broadway.start_link] 起動
[
  name: Basic.Broadway,
  producer: [
    module: {SimpleProducer, :init},
    concurrency: 2,
    transformer: {Basic.Broadway, :transform, []}
  ],
  processors: [default: [max_demand: 6, concurrency: 2]],
  batchers: [default: [batch_size: 10000, batch_timeout: 100, concurrency: 1]]
]
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 6
  [broker] BEFORE: 6
  [broker] AFTER: 6
  [broker] AFTER: 6
[SimpleProducer.broker] 呼出
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 12
  [broker] BEFORE: 12
  [broker] AFTER: 12
  [broker] AFTER: 12

16:30:53.801 [debug] [exsync] reload module Elixir.Basic.Broadway

16:30:53.809 [debug] [exsync] reload module Elixir.SimpleProducer

16:30:53.864 [debug] [exsync] reload complete

a)processorsの並行数を1にする

最も簡単な方法は、processors の並行度を1にすることです

並行・並列が不要な、小規模PubSubやノンマルチユーザーのデータストリーミング等を行いたいならコレで充分です

ただし、データ処理を並行・並列で行いたい中規模以上PubSubやマルチユーザーに対するデータストリーミング、CPUネックやI/Oネックのある処理を行う場合は、この方法だとブロックされてしまうため、以降の方法が必要です

Producer側でデータ順が荒れるケースにも向いていませんので、その場合は次の方法が有効です

それでは話を戻して、processors の並行数を1に変更しましょう

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

      processors: [
        default: [
          max_demand: 6,
-         concurrency: 2
+         concurrency: 1

ファイル保存し、Broadwayリロードされた後、下記 push() を行うと、今度はデータ順通りに処理されました

handle_batch() だけで無く、handle_message() の方もデータ順通りになっています

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 6
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:1
[Basic.Broadway.handle_messaage] 受信:2
[Basic.Broadway.handle_messaage] 受信:3
[Basic.Broadway.handle_messaage] 受信:4
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:5
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:6
  [broker] AFTER: 0
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:7
+[Basic.Broadway.handle_batch] [1, 2, 3, 4, 5]
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:8
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:9
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:10
  [broker] BEFORE: 3
[Basic.Broadway.handle_messaage] 受信:11
  [broker] AFTER: 0
[Basic.Broadway.handle_messaage] 受信:12
[Basic.Broadway.handle_messaage] 受信:13
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[Basic.Broadway.handle_messaage] 受信:14
  [broker] BEFORE: 3
+[Basic.Broadway.handle_batch] [6, 7, 8, 9, 10]
[Basic.Broadway.handle_messaage] 受信:15
  [broker] AFTER: 3
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[SimpleProducer.broker] 呼出
+[Basic.Broadway.handle_batch] [11, 12, 13, 14, 15]
  [broker] BEFORE: 6
  [broker] AFTER: 6

b)データに通番を振り、batchersの並行数を1にする

次は、Produce側でデータに通番を振り、processors では並行処理し、batchers では並行数1/バッチサイズをデータサイズ以上にすることで通番通りに処理する方法です

データ処理を並行・並列したい中規模以上PubSubやマルチユーザーに対するデータストリーミングでは、この方法を使えば出口である batchers でデータ順通りの出力が出来ます

ただし、processors の全データが揃うのを待つ必要があり、更に batchers での各データ処理をシーケンシャルに行う訳では無いため、たとえば「並行で複数チャンク文字列を音声作成 → 前の順の音声生成が終わってから続けて次の音声再生」と言うようなストリーミングを行うには向かず、以降の方法が必要です

それでは話を戻して、Producer側コードで通番をデータに振りましょう

basic/lib/simple_producer.ex
defmodule SimpleProducer do
  use GenStage

  def handle_cast({:push, items}, {queue, pos}) do
-   broker({queue ++ items, pos})
+   broker({queue ++ Enum.with_index(items, pos), pos})
  end

そして、handle_batch() で通番通りに整え、通番を削除します

これを行う際は、processors の並行数は戻し、batchers の並行数を1、バッチサイズを巨大に変更します

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

  @impl true
  def handle_batch(_batcher, messages, _batch_info, _context) do
+   messages = Enum.sort_by(messages, &elem(&1.data, 1))
+     |> Enum.map(&Map.put(&1, :data, elem(&1.data, 0)))
    IO.puts("[Basic.Broadway.handle_batch] 処理したバッチ数:#{length(messages)}")

      processors: [
        default: [
          max_demand: 6,
-         concurrency: 1
+         concurrency: 2

      batchers: [
        default: [
-         batch_size: 5,
+         batch_size: 10000,
          batch_timeout: 100,
-         concurrency: 8
+         concurrency: 1

下記 push() を行うと、データ順通りに処理されました

それでいて、processors はマルチコア並列で動くので、上記①よりも高速に処理されます

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
:ok
  [broker] BEFORE: 12
…
  [broker] AFTER: 12
[Basic.Broadway.handle_batch] 処理したバッチ数:15
+[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

c)後続に待機Agentを置いてシーケンシャル処理

次は、Produce側でデータに通番を振った順で、後続に置いた待機Agentでシーケンシャル処理する方法です

中規模以上PubSubやマルチユーザーに対するデータストリーミングを行うけど、たまにPubSub横断やマルチユーザー横断での順次処理を行いたいときや、「並行で複数チャンク文字列を音声作成 → 前の順の音声生成が終わってから続けて次の音声再生」と言うようなストリーミングを行いたいときは、この方法が有効です

ただし、ユーザー毎にデータ順通りにしたい等、何らかのグルーピングやパーティショニングを必要とする場合は、やや煩雑なロジックを組む必要があるため、以降の方法が良いでしょう

それでは話を戻して、待機AgentとなるSequencerモジュールを追加してみます

basic/lib/sequencer.ex
defmodule Basic.Sequencer do
  use Agent

  def start_link(_opts), do: Agent.start_link(fn -> {1, %{}} end, name: __MODULE__)

  def push({data, index}) do
    Agent.update(__MODULE__, fn {next_index, buffer} ->
      new_buffer = Map.put(buffer, index, data)
      drain(next_index, new_buffer)
    end)
  end

  def drain(next_index, buffer) do
    case Map.pop(buffer, next_index) do
      {nil, _} -> {next_index, buffer}
      {data, remaining_buffer} ->
        process(next_index, data)
        drain(next_index + 1, remaining_buffer)
    end
  end

  def process(index, data) do
    IO.puts("[Sequencer.process] index:#{index}: data:#{inspect(data)}")
  end
end

次に、handle_message()Basic.Sequencer.push() を呼び、上記Sequencerに積み込みます

積み込まれたデータは、データ順通りに1件ずつ Basic.Sequencer.drain() で取り出され、process での処理が終わるまで待ち、終わり次第、次の1件と処理を数珠繋ぎに繰り返します

なお、今回の対応には無関係となった handle_batch() のソートと batchers の並行数/バッチサイズも元に戻しておきます

basic/lib/broadway.ex
defmodule Basic.Broadway do
  use Broadway

  def handle_message(_processor, message, _context) do
-   message = message
+   message = message
      |> Broadway.Message.update_data(message, fn x ->
          IO.puts("[Basic.Broadway.handle_messaage] 受信:#{inspect(x)}")
          x
        end)
+   Basic.Sequencer.push(message.data)
+   message
  end

  def handle_batch(_batcher, messages, _batch_info, _context) do
-   messages = Enum.sort_by(messages, &elem(&1.data, 1))
-     |> Enum.map(&Map.put(&1, :data, elem(&1.data, 0)))
    IO.puts("[Basic.Broadway.handle_batch] 処理したバッチ数:#{length(messages)}")

      batchers: [
        default: [
-         batch_size: 10000,
+         batch_size: 5,
          batch_timeout: 100,
-         concurrency: 1
+         concurrency: 8

Sequencerの停止と再起動を行うExSync設定を追加します

basic/.iex.exs
defmodule Reloader do
  def start_watch(), do: watch_loop(last_timestamp())

  def reload() do
    IEx.Helpers.recompile()
+   [Basic.Broadway, Basic.Sequencer]
+   |> Enum.map(fn n ->
+     if pid = Process.whereis(n) do
-   if pid = Process.whereis(Basic.Broadway) do
        GenServer.stop(pid)
        ref = Process.monitor(pid)
        receive do
          {:DOWN, ^ref, :process, _, _} -> :ok
        after 1000 -> :ok
        end
      end
+     apply(n, :start_link, [[]])
+   end)
-   Basic.Broadway.start_link([])
  end
end

Task.start(fn -> Reloader.start_watch() end)

起動時にSequencerを常時起動させる設定も追加しておきます

basic/lib/application.ex
defmodule Basic.Application do
  use Application

  def start(_type, _args) do
    children = [
+     {Basic.Sequencer, []},
      {Basic.Broadway, []}

下記 push() を行うと、データ順通りに処理されました

iex> SimpleProducer.push(Enum.to_list(1..15))
[SimpleProducer.broker] 呼出
  [broker] BEFORE: 12
  [broker] AFTER: 0
:ok
[Basic.Broadway.handle_messaage] 受信:{7, 6}
[Basic.Broadway.handle_messaage] 受信:{1, 0}
[Basic.Broadway.handle_messaage] 受信:{8, 7}
+[Sequencer.process] index:0: data:1
[Basic.Broadway.handle_messaage] 受信:{2, 1}
[Basic.Broadway.handle_messaage] 受信:{9, 8}
+[Sequencer.process] index:1: data:2
[Basic.Broadway.handle_messaage] 受信:{3, 2}
+[Sequencer.process] index:2: data:3
[Basic.Broadway.handle_messaage] 受信:{4, 3}
[Basic.Broadway.handle_messaage] 受信:{10, 9}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Sequencer.process] index:3: data:4
  [broker] BEFORE: 3
[Basic.Broadway.handle_batch] [1, 2, 3, 7, 8]
[Basic.Broadway.handle_messaage] 受信:{5, 4}
[Basic.Broadway.handle_messaage] 受信:{11, 10}
  [broker] AFTER: 0
+[Sequencer.process] index:4: data:5
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:{6, 5}
[Basic.Broadway.handle_messaage] 受信:{12, 11}
  [broker] BEFORE: 3
+[Sequencer.process] index:5: data:6
  [broker] AFTER: 3
+[Sequencer.process] index:6: data:7
+[Sequencer.process] index:7: data:8
+[Sequencer.process] index:8: data:9
+[Sequencer.process] index:9: data:10
+[Sequencer.process] index:10: data:11
+[Sequencer.process] index:11: data:12
[Basic.Broadway.handle_messaage] 受信:{13, 12}
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] 処理したバッチ数:5
+[Sequencer.process] index:12: data:13
  [broker] BEFORE: 6
[Basic.Broadway.handle_batch] [4, 5, 6, 9, 10]
[Basic.Broadway.handle_messaage] 受信:{14, 13}
  [broker] AFTER: 6
+[Sequencer.process] index:13: data:14
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_messaage] 受信:{15, 14}
  [broker] BEFORE: 9
+[Sequencer.process] index:14: data:15
  [broker] AFTER: 9
[Basic.Broadway.handle_batch] 処理したバッチ数:5
[SimpleProducer.broker] 呼出
[Basic.Broadway.handle_batch] [11, 12, 13, 14, 15]
  [broker] BEFORE: 12
  [broker] AFTER: 12

終わりに

Broadwayでデータ順を維持しながら並行・並列を高めるコントロールについて、3つの方法を解説しました

FlowやGenStageだと難儀するようなコントロールが、Broadwayではエレガントに組めます

ここまでで、Broadway側の処理はパーティショニングと耐障害性以外の正常系は一通り攻略したので、次回は、単一リストで非効率なままのProducer側を改善しようと思います

p.s.このコラムが、面白かったり、役に立ったら…

image.png で応援よろしくお願いします :bow:


明日も私で 「Broadwayを嗜む③:カスタムProducerをリストから本物のキューに換装する」 です

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?