この記事は Elixir Advent Calendar 2024 シリーズ10の25日目です。昨日は @t-yamanashi さんでした!
先日書いた記事で、Stripeでの従量課金について解説しました。
imagepix では当然ながら顧客ごとに請求を行うため、顧客ごとのサービス使用量を定期的に記録するバッチ処理が必要です。ここで逐次処理をしていると、顧客が増えた際に全顧客分の使用量の記録が終わらないまま次のバッチ処理が始まってしまったり、過少請求が発生してしまったりする可能性があります
それぞれが独立していてある程度時間がかかってしまう処理を高速化するには、並列に処理するのが手っ取り早いです。Elixirには並行処理1を実現する機能が充実していますが、今回は Task.async_stream/3
で手軽に処理を並列化してみます。
Task.async_stream/3
はStream2を作成するだけで、処理は実行されません。今回は各処理の戻り値を使わないので、Stream.run/1 で処理を実行します。
なお、実際にリクエストを送信すると迷惑なので、今回は Process.sleep/1 で代用します。
1..100
|> Task.async_stream(
fn n ->
IO.puts("Request #{n}")
Process.sleep(1_000) # 1秒待つ
end,
max_concurrency: 10
)
|> Stream.run()
実行すると、以下のように出力されます。
Request 1
Request 2
Request 3
...
Request 100
max_concurrency: 10
を指定しているので、最大で10プロセスが並行で動きます。今回は全100件を10件ずつ処理し、1件あたり1秒かかるので、約10秒ですべての処理が完了します。簡単ですねぇ〜。
なお、max_concurrency
を指定しない場合でもプロセスが無尽蔵に起動されるわけではありません。
:max_concurrency
- sets the maximum number of tasks to run at the same time. Defaults to System.schedulers_online/0.
とあるように、デフォルトで System.schedulers_online/0
の戻り値分だけプロセスが並行で動きます。この値は環境依存なので、必要に応じて(特に外部のAPIを実行する場合はRate Limitを気にしますね) max_concurrency
を明示的に指定しておくとより安心です
これで Elixir Advent Calendar 2024 はおしまいです!今年もお疲れ様でした
-
記事内の記述が並列だったり並行だったりしますが、
Task.async_stream/3
のドキュメントでは「concurrently」という言葉が使われているのでここでは「並行処理」と記述しています。今どきCPUのコア数が1つということはあまりないと思うので、実際は並列に処理されていることでしょう。 ↩ -
https://hexdocs.pm/elixir/enumerable-and-streams.html#streams ↩