7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ElixirAdvent Calendar 2024

Day 25

[Elixir] Task.async_streamで手軽に処理を並列化する

Last updated at Posted at 2024-12-24

この記事は Elixir Advent Calendar 2024 シリーズ10の25日目です。昨日は @t-yamanashi さんでした!


先日書いた記事で、Stripeでの従量課金について解説しました。

imagepix では当然ながら顧客ごとに請求を行うため、顧客ごとのサービス使用量を定期的に記録するバッチ処理が必要です。ここで逐次処理をしていると、顧客が増えた際に全顧客分の使用量の記録が終わらないまま次のバッチ処理が始まってしまったり、過少請求が発生してしまったりする可能性があります :money_with_wings:

それぞれが独立していてある程度時間がかかってしまう処理を高速化するには、並列に処理するのが手っ取り早いです。Elixirには並行処理1を実現する機能が充実していますが、今回は Task.async_stream/3 で手軽に処理を並列化してみます。

Task.async_stream/3 はStream2を作成するだけで、処理は実行されません。今回は各処理の戻り値を使わないので、Stream.run/1 で処理を実行します。

なお、実際にリクエストを送信すると迷惑なので、今回は Process.sleep/1 で代用します。

1..100
|> Task.async_stream(
  fn n ->
    IO.puts("Request #{n}")
    Process.sleep(1_000) # 1秒待つ
  end,
  max_concurrency: 10
)
|> Stream.run()

実行すると、以下のように出力されます。

Request 1
Request 2
Request 3

...

Request 100

max_concurrency: 10 を指定しているので、最大で10プロセスが並行で動きます。今回は全100件を10件ずつ処理し、1件あたり1秒かかるので、約10秒ですべての処理が完了します。簡単ですねぇ〜。

なお、max_concurrency を指定しない場合でもプロセスが無尽蔵に起動されるわけではありません。

とあるように、デフォルトで System.schedulers_online/0 の戻り値分だけプロセスが並行で動きます。この値は環境依存なので、必要に応じて(特に外部のAPIを実行する場合はRate Limitを気にしますね) max_concurrency を明示的に指定しておくとより安心です :blush:


これで Elixir Advent Calendar 2024 はおしまいです!今年もお疲れ様でした :tea:

  1. 記事内の記述が並列だったり並行だったりしますが、Task.async_stream/3 のドキュメントでは「concurrently」という言葉が使われているのでここでは「並行処理」と記述しています。今どきCPUのコア数が1つということはあまりないと思うので、実際は並列に処理されていることでしょう。

  2. https://hexdocs.pm/elixir/enumerable-and-streams.html#streams

7
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?