軽量プロセスという仕組みが珍しくて多言語から来たエンジニアはみんな一回プロセスの限界を試すのではないだろうか?
Taskについて
ElixirのTask.asyncは渡した関数を非同期で実行し結果は後でTask.await
を使って受け取るというJavaScriptのasync/awaitみたいな関数です。
Elixirを動かすErlangVMは無数の軽量プロセスを大量に生成しプロセスが相互に通信することでこの仕組を実現しているらしいです。
このErlangVM、デフォルトでは作成できるプロセスの上限が決まっているらしく、Taskもプロセスを利用して作成する仕組みであるなら限界以上のプロセスは生成できないから何らかのエラーが出るはず。
面白いので見てみよう
実験方法
どうやら :erlang.system_info(:process_limit)
で作成できるプロセスの上限を取得できるみたい
結果はintegerなので 1..:erlang.system_info(:process_limit)
iex(1)> :erlang.system_info(:process_limit)
262144
iex(2)> 1..:erlang.system_info(:process_limit)
1..262144
Taskは Task.async/1
と Task.async/3
の引数違い2パターンがあるが実験のためにモジュール作るのも手間だなと思ったので無名関数に外から引数を渡すお行儀の悪いことをした。
iex(1)> tasks_1_step = 1..1 |> Enum.map(& Task.async(fn -> &1 * 1000 end))
[
%Task{
owner: #PID<0.191.0>,
pid: #PID<0.193.0>,
ref: #Reference<0.418778444.981205003.227703>
}
]
iex(2)> Task.await(tasks_1_step |> List.first)
1000
検証
上の手順に従って限界いっぱいまでプロセスを作ってみる
iex(1)> tasks_2_step = 1..(:erlang.system_info(:process_limit)) |> Enum.map(& Task.async(fn -> &1 * 1000 end))
[
%Task{
owner: #PID<0.191.0>,
pid: #PID<0.196.0>,
ref: #Reference<0.418778444.981205003.227749>
},
...]
iex(2)> tasks_2_step |> length
262144
iex(3)> :erlang.system_info(:process_limit)
262144
iex(4)> :erlang.system_info(:process_limit) == tasks_2_step |> length
true
どうやらIExの返すプロセスリミットまでは作成できたらしい
次は値を取り出してみる
iex(5)> tasks_2_step |> Enum.map(& Task.await(&1))
[1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000,
24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000,
35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000,
46000, 47000, 48000, 49000, 50000, ...]
滞りなく取り出せる。
ではプロセスリミット + 1で処理するとどうなるかを見たいと思うんですよ!
何かしらのエラーを吐いてくれるはず
iex(1)> tasks_3_step = 1..(:erlang.system_info(:process_limit) + 1) |> Enum.map(& Task.async(fn -> &1 * 1000 end))
[
%Task{
owner: #PID<0.191.0>,
pid: #PID<0.307.8>,
ref: #Reference<0.3806640768.713293825.124028>
},
...]
iex(2)> tasks_3_step |> length == :erlang.system_info(:process_limit)
false
iex(3)> tasks_3_step |> length
262145
iex(4)> :erlang.system_info(:process_limit)
262144
iex(12)> results = tasks_3_step |> Enum.map(& Task.await(&1))
[1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000,
24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000,
35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000,
46000, 47000, 48000, 49000, 50000, ...]
iex(13)> results |> length
262145
え、エラーにならない...
なぜ?
結論を先に書くと検証に使ったコードが悪かった。
今回Task.asyncで実行しているのは簡単な掛け算。 :observer.start()
を眺めて気づいたがこれだとTaskの実行が速くて1つのプロセスで全計算を終わらせられていた。
ということで検証コードを変更した。
ついでに hoge.exs
にコードを書いた
1 defmodule Experimental do
2 def run(n) do
3 process_limit = :erlang.system_info(:process_limit)
4 IO.puts "process limit is #{process_limit}"
5
6 1..(process_limit + n)
7 |> Enum.map(fn i ->
8 Task.async(fn ->
9 :timer.sleep(2000) # 2秒処理を止める
10 i * 1000
11 end)
12 end)
13 |> Enum.map(& Task.await(&1))
14 end
15 end
これを起動する。
iex(1)> c "hoge.exs"
[Experimental]
iex(2)> Experimental.run(1)
process limit is 262144
12:10:50.113 [error] Too many processes
** (SystemLimitError) a system limit has been reached
:erlang.spawn_link(:proc_lib, :init_p, [#PID<0.106.0>, [#PID<0.73.0>], Task.Supervised, :reply, [{:nonode@nohost, #PID<0.106.0>, #PID<0.106.0>}, [#PID<0.106.0>], :nomonitor, {:erlang, :apply, [#Function<1.92674052/0 in Experimental.run/1>, []]}]])
(stdlib 3.13.2) proc_lib.erl:107: :proc_lib.spawn_link/3
(elixir 1.11.0) lib/task/supervised.ex:14: Task.Supervised.start_link/4
(elixir 1.11.0) lib/task.ex:425: Task.async/3
(elixir 1.11.0) lib/enum.ex:1403: anonymous fn/3 in Enum.map/2
(elixir 1.11.0) lib/enum.ex:3449: Enum.reduce_range_inc/4
(elixir 1.11.0) lib/enum.ex:2186: Enum.map/2
hoge.exs:7: Experimental.run/1
落ちた...
見たかったプロセスリミットは見られたが動くはずの処理が動かない。
なぜかと思って :observer.start
を眺めていると特に何もしていなくてもIExを動かしている限りある程度プロセスが作成されるようだ。
現在稼働中のプロセスを見るには :erlang.processes
で見られる。
iex(1)> :erlang.processes
[#PID<0.0.0>, #PID<0.1.0>, #PID<0.2.0>, #PID<0.3.0>, #PID<0.4.0>, #PID<0.5.0>,
#PID<0.6.0>,...]
ということで6行目の処理をちょっと直して動かしてみる 1..(process_limit - (:erlang.processes |> length) + n)
iex(1)> :observer.start()
:ok
iex(2)> c "hoge.exs"
[Experimental]
iex(3)> Experimental.run(0)
process limit is 262144
[1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 11000, 12000,
13000, 14000, 15000, 16000, 17000, 18000, 19000, 20000, 21000, 22000, 23000,
24000, 25000, 26000, 27000, 28000, 29000, 30000, 31000, 32000, 33000, 34000,
35000, 36000, 37000, 38000, 39000, 40000, 41000, 42000, 43000, 44000, 45000,
46000, 47000, 48000, 49000, 50000, ...]
やったー!
さらなる謎
大量にプロセスを作りプロセスの限界を突破したらどうなるかも見れたので満足しているのですがTask.asyncで大量にプロセスを作る -> Task.awaitで結果を出す前にまたTask.asyncで大量にプロセスを作るとしてもプロセスの限界に当たりませんでした。
これはTask.asyncの計算結果はプロセスの外に記録されているように思われるのですがどこに記録されているのかがわからずじまいでした。
また情報が入ったら更新してみようと思います。