3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[Elixir]Taskを使ってプロセスで遊ぶ

Posted at

軽量プロセスという仕組みが珍しくて多言語から来たエンジニアはみんな一回プロセスの限界を試すのではないだろうか? :thinking:

Taskについて

ElixirのTask.asyncは渡した関数を非同期で実行し結果は後でTask.awaitを使って受け取るというJavaScriptのasync/awaitみたいな関数です。
Elixirを動かすErlangVMは無数の軽量プロセスを大量に生成しプロセスが相互に通信することでこの仕組を実現しているらしいです。
このErlangVM、デフォルトでは作成できるプロセスの上限が決まっているらしく、Taskもプロセスを利用して作成する仕組みであるなら限界以上のプロセスは生成できないから何らかのエラーが出るはず。
面白いので見てみよう

実験方法

どうやら :erlang.system_info(:process_limit)で作成できるプロセスの上限を取得できるみたい
結果はintegerなので 1..:erlang.system_info(:process_limit)

iex(1)> :erlang.system_info(:process_limit)
262144
iex(2)> 1..:erlang.system_info(:process_limit)
1..262144

Taskは Task.async/1Task.async/3の引数違い2パターンがあるが実験のためにモジュール作るのも手間だなと思ったので無名関数に外から引数を渡すお行儀の悪いことをした。

iex(1)> tasks_1_step = 1..1 |> Enum.map(& Task.async(fn -> &1 * 1000 end))
[
  %Task{
    owner: #PID<0.191.0>,
    pid: #PID<0.193.0>,
    ref: #Reference<0.418778444.981205003.227703>
  }
]
iex(2)> Task.await(tasks_1_step |> List.first)
1000

検証

上の手順に従って限界いっぱいまでプロセスを作ってみる

iex(1)> tasks_2_step = 1..(:erlang.system_info(:process_limit)) |> Enum.map(& Task.async(fn -> &1 * 1000 end))
[
  %Task{
    owner: #PID<0.191.0>,
    pid: #PID<0.196.0>,
    ref: #Reference<0.418778444.981205003.227749>
  },
...]

iex(2)> tasks_2_step |> length
262144
iex(3)> :erlang.system_info(:process_limit)
262144
iex(4)> :erlang.system_info(:process_limit) == tasks_2_step |> length
true

どうやらIExの返すプロセスリミットまでは作成できたらしい
次は値を取り出してみる

iex(5)> tasks_2_step |> Enum.map(& Task.await(&1))
[1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
 13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000,
 24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000,
 35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000,
 46000, 47000, 48000, 49000, 50000, ...]

滞りなく取り出せる。

ではプロセスリミット + 1で処理するとどうなるかを見たいと思うんですよ!
何かしらのエラーを吐いてくれるはず :eyes:

iex(1)>  tasks_3_step = 1..(:erlang.system_info(:process_limit) + 1) |> Enum.map(& Task.async(fn -> &1 * 1000 end))
[
  %Task{
    owner: #PID<0.191.0>,
    pid: #PID<0.307.8>,
    ref: #Reference<0.3806640768.713293825.124028>
  },
...]

iex(2)> tasks_3_step |> length == :erlang.system_info(:process_limit)
false
iex(3)> tasks_3_step |> length
262145
iex(4)> :erlang.system_info(:process_limit)
262144
iex(12)> results = tasks_3_step |> Enum.map(& Task.await(&1))
[1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
 13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000,
 24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000,
 35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000,
 46000, 47000, 48000, 49000, 50000, ...]
iex(13)> results |> length
262145

え、エラーにならない... :thinking:

なぜ? :thinking:

結論を先に書くと検証に使ったコードが悪かった。
今回Task.asyncで実行しているのは簡単な掛け算。 :observer.start()を眺めて気づいたがこれだとTaskの実行が速くて1つのプロセスで全計算を終わらせられていた。

ということで検証コードを変更した。
ついでに hoge.exsにコードを書いた

 1  defmodule Experimental do
 2    def run(n) do
 3      process_limit = :erlang.system_info(:process_limit)
 4      IO.puts "process limit is #{process_limit}"
 5
 6      1..(process_limit + n)
 7      |> Enum.map(fn i ->
 8        Task.async(fn ->
 9          :timer.sleep(2000) # 2秒処理を止める
10          i * 1000
11        end)
12      end)
13      |> Enum.map(& Task.await(&1))
14    end
15  end

これを起動する。

iex(1)> c "hoge.exs"
[Experimental]
iex(2)> Experimental.run(1)
process limit is 262144

12:10:50.113 [error] Too many processes

** (SystemLimitError) a system limit has been reached
    :erlang.spawn_link(:proc_lib, :init_p, [#PID<0.106.0>, [#PID<0.73.0>], Task.Supervised, :reply, [{:nonode@nohost, #PID<0.106.0>, #PID<0.106.0>}, [#PID<0.106.0>], :nomonitor, {:erlang, :apply, [#Function<1.92674052/0 in Experimental.run/1>, []]}]])
    (stdlib 3.13.2) proc_lib.erl:107: :proc_lib.spawn_link/3
    (elixir 1.11.0) lib/task/supervised.ex:14: Task.Supervised.start_link/4
    (elixir 1.11.0) lib/task.ex:425: Task.async/3
    (elixir 1.11.0) lib/enum.ex:1403: anonymous fn/3 in Enum.map/2
    (elixir 1.11.0) lib/enum.ex:3449: Enum.reduce_range_inc/4
    (elixir 1.11.0) lib/enum.ex:2186: Enum.map/2
    hoge.exs:7: Experimental.run/1

落ちた... :thinking:
見たかったプロセスリミットは見られたが動くはずの処理が動かない。
なぜかと思って :observer.startを眺めていると特に何もしていなくてもIExを動かしている限りある程度プロセスが作成されるようだ。
現在稼働中のプロセスを見るには :erlang.processesで見られる。


iex(1)> :erlang.processes
[#PID<0.0.0>, #PID<0.1.0>, #PID<0.2.0>, #PID<0.3.0>, #PID<0.4.0>, #PID<0.5.0>,
 #PID<0.6.0>,...]

ということで6行目の処理をちょっと直して動かしてみる 1..(process_limit - (:erlang.processes |> length) + n)

iex(1)>  :observer.start()
:ok
iex(2)> c "hoge.exs"
[Experimental]
iex(3)> Experimental.run(0)
process limit is 262144
[1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
 13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000,
 24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000,
 35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000,
 46000, 47000, 48000, 49000, 50000, ...]

やったー! :tada:

さらなる謎

大量にプロセスを作りプロセスの限界を突破したらどうなるかも見れたので満足しているのですがTask.asyncで大量にプロセスを作る -> Task.awaitで結果を出す前にまたTask.asyncで大量にプロセスを作るとしてもプロセスの限界に当たりませんでした。
これはTask.asyncの計算結果はプロセスの外に記録されているように思われるのですがどこに記録されているのかがわからずじまいでした。
また情報が入ったら更新してみようと思います。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?