Flowを使っても並列実行されない
この記事では、Flowを使う事でCPUコアを並列して使う事ができ処理速度が向上しました。
しかし、別のプログラムでは全く並列実行されませんでした。
簡単なテストプログラムで、速くなる場合とならない場合を再現してみました。
defmodule Test2 do
def sum_i(0) , do: 0
def sum_i(i) , do: i + sum_i(i-1)
def hello_flow(loop, load) do
ret = 1..loop
|> Flow.from_enumerable()
|> Flow.map(fn _ -> sum_i(load) end)
|> Enum.count(fn _ -> true end)
end
def hello_enum(loop, load) do
ret = 1..loop
|> Enum.map(fn _ -> sum_i(load) end)
|> Enum.count(fn _ -> true end)
end
end
# 結果
No | 方法 | 実行時間(sec) | loop回数 | 1loopの演算回数 | 演算量 | 演算回数/sec | CPU使用率 |
---|---|---|---|---|---|---|---|
1 | flow | 7.7 | 1K | 1M | 1G | 130M/sec | 約10% |
2 | enum | 7.9 | 1K | 1M | 1G | 127M/sec | 約10% |
3 | flow | 0.4 | 1M | 1K | 1G | 2500M/sec | 100% |
4 | enum | 3.0 | 1M | 1K | 1G | 333M/sec | 約10% |
解決策
loop回数が少ないと並列実行されない
from_enumerableに、max_demand: を指定することで解決しました。
max_demand:の働きは、ZACKY さんの記事がわかりやすかったです。
無条件に並列実行するように、max_demand: 1としてみます。
def hello_flow2(loop, load) do
ret = 1..loop
|> Flow.from_enumerable(max_demand: 1)
|> Flow.map(fn _ -> sum_i(load) end)
|> Enum.count(fn _ -> true end)
end
No | 方法 | 実行時間(sec) | loop回数 | 1loopの演算回数 | 演算量 | 演算回数/sec | CPU使用率 |
---|---|---|---|---|---|---|---|
1 | flow | 7.7 | 1K | 1M | 1G | 130M/sec | 約10% |
2 | enum | 7.9 | 1K | 1M | 1G | 127M/sec | 約10% |
5 | flow2 | 2.2 | 1K | 1M | 1G | 454M/sec | 100% |
No3の場合と比べると、遅いですが、CPUの使用率は100%になっているので、最大処理にはなっていると思います。
結果メモ
iex(1)> :timer.tc(Test2, :hello_flow, [1000, 1_000_000])
{7759007, 1000}
iex(2)> :timer.tc(Test2, :hello_enum, [1000, 1_000_000])
{7940184, 1000}
iex(3)> :timer.tc(Test2, :hello_flow, [1000_000, 1_000])
{425530, 1000000}
iex(4)> :timer.tc(Test2, :hello_enum, [1000_000, 1_000])
{3023313, 1000000}
iex(5)> :timer.tc(Test2, :hello_flow, [10_000_000, 1_000])
{4003593, 10000000}
iex(6)> :timer.tc(Test2, :hello_enum, [10_000_000, 1_000])
{30227115, 10000000}
iex(10)> :timer.tc(Test2, :hello_flow2, [1000, 1_000_000])
{2247715, 1000}
参考
https://qiita.com/zacky1972/items/e843607881bbeca34b70
https://hexdocs.pm/flow/Flow.html