Elixirの並列処理の特性
ElixirはBEAMという仮想マシン上で動作している。
BEAM上の並列処理については「軽量なプロセス」を大量に扱うことで実現しており、一般的なスレッドベースの並列処理とは仕組みが異なる。
Elixirでは並列処理自体は簡単に書ける。共有メモリやデッドロックといった問題が発生しにくい設計となっており、スケジューリングについてもBEAMが担うため。
反面私は並列処理を実装した結果かえって実行時間が長くなるケースに出くわしてしまい、その性能特性についてより理解が必要と考えた。
本記事では、その挙動を実際に検証していきたい。
検証方法
以下の関数を20回実行し、各々の場合の実行時間を20回分測定する。
関数の処理としてはデータベースを模したPrefab内部のDataTableの保持するData群のうち、正規表現ベースのDSLを用いる表現を通常の正規表現へと変換し保持させる処理である。
Prefab内部のDataTableは16つ、Fontは1つ。Font内部のDataTableは5つ。
処理対象となるDataの数はDataTableによって異なるが、1~700程度である。
ちなみにこの関数自体は実際のプロジェクトに用いているものを引用・調整したである。
def set_precompile_stream_all(%Prefab{static_tables: static_tables, fonts: fonts} = self) do
with {:ok, static_tables_alt} <-
Task.async_stream(static_tables, fn {key, value} ->
{key, set_precompile_internal_stream_as_datatable(self, value)}
end, ordered: false)
|> Enum.reduce_while({:ok, %{}}, fn
{:ok, {key, {:ok, value}}}, {:ok, acc} -> {:cont, {:ok, Map.put(acc, key, value)}}
{:ok, {_, {:error, _} = err}}, _acc -> {:halt, err}
{:exit, reason}, _acc -> {:halt, {:error, reason}}
end),
{:ok, fonts_alt} <-
Task.async_stream(fonts, fn {key, value} ->
{key, set_precompile_internal_stream_as_font(self, value)}
end, ordered: false)
|> Enum.reduce_while({:ok, %{}}, fn
{:ok, {key, {:ok, value}}}, {:ok, acc} -> {:cont, {:ok, Map.put(acc, key, value)}}
{:ok, {_, {:error, _} = err}}, _acc -> {:halt, err}
{:exit, reason}, _acc -> {:halt, {:error, reason}}
end) do
{:ok, %Prefab{self | static_tables: static_tables_alt, fonts: fonts_alt}}
else
{:error, _} = err -> err
end
end
defp set_precompile_internal_stream_as_font(
%Prefab{} = self,
%Font{table_list: table_list} = font
) do
Task.async_stream(
table_list,
fn table ->
set_precompile_internal_stream_as_datatable(self, table)
end, ordered: false
)
|> Enum.reduce_while({:ok, []}, fn
{:ok, {:ok, table}}, {:ok, acc} -> {:cont, {:ok, [table | acc]}}
{:ok, {:error, _} = err}, _acc -> {:halt, err}
{:exit, reason}, _acc -> {:halt, {:error, reason}}
end)
|> case do
{:ok, table_list} -> {:ok, %Font{font | table_list: table_list}}
{:error, _} = err -> err
end
end
defp set_precompile_internal_stream_as_datatable(
%Prefab{} = self,
%DataTable{data_list: data_list} = table
) do
chunk_size = 10000
Enum.chunk_every(data_list, chunk_size)
|> Task.async_stream(fn data_chunk ->
Enum.map(data_chunk, fn data -> Data.set_precompile(data, table, self) end)
end)
|> Enum.flat_map(fn
{:ok, res} -> res
{:exit, _} = ext -> ext
end)
|> Enum.reduce_while({:ok, []}, fn
{:ok, data}, {:ok, acc} -> {:cont, {:ok, [data | acc]}}
{:error, _} = err, _acc -> {:halt, err}
{:exit, reason}, _acc -> {:halt, {:error, reason}}
end)
|> case do
{:ok, data_list} -> {:ok, %DataTable{table | data_list: data_list}}
{:error, _} = err -> err
end
end
chunk_sizeについては並列処理の分割単位で、例えばchunk_size=10ならば10つのDataグループ毎に並行処理を執り行う。
またTask.async_streamを単にEnum.mapとし、適切な返り値の形式としたものも作成・検証。
検証結果
全く並列処理を使用しない場合
# 414 426 358 403 475 401 386 409 372 410
# 379 557 488 466 422 462 453 439 479 475
DataTable、Font単位で並行処理を行う場合
set_precompile_internal_stream_as_datatableでは並行処理を執り行わない。
他関数部分では並行処理を執り行う。
# 445 381 379 355 361 377 360 364 433 339
# 407 364 344 490 409 351 350 357 361 348
Data単位で並列処理を行う場合
chunk_size=1(1プロレス辺り1つのDataを処理させる)
# 2941 2772 2637 2975 2737 2777 3058 2674 3062 2978
# 2577 2960 2744 3069 2607 2654 3057 2944 2685 2696
chunk_size=5(1プロセス辺り5つのDataを処理させる)
# 1625 1150 920 1103 1048 1464 1279 1005 1293 745
# 940 1231 887 1024 943 1177 1512 1834 1167 1050
chunk_size=10
# 459 454 453 407 472 486 416 498 615 553
# 431 416 465 433 447 556 448 493 442 444
chunk_size=30
# 257 246 228 269 264 304 246 261 251 269
# 232 231 233 239 241 223 243 244 269 277
chunk_size=50
# 241 215 217 213 219 223 232 229 234 221
# 192 210 198 211 228 229 202 210 207 211
chunk_size=100
# 259 214 217 201 214 220 212 241 214 219
# 230 216 283 222 279 237 214 226 242
全く並列処理をしない場合に比べ、DataTable、Font単位で並行処理を行う場合は僅かに実行時間が短縮されている。
直感に反し、chunk_sizeが小さいほど非常に低速となり、chunk_size=50が最速、以降は微小だが処理時間が漸増している。
これはDataTableといった大きな粒度であれば並列化による短縮からプロセス生成のオーバーヘッドを踏み倒せるが、Dataといった小さい粒度となるとプロセス生成のオーバーヘッドが並列化による短縮より大きくなってしまうため、のようだ。
Data数が増加すると分割数も当然増加し、プロセス生成およびスケジューリングのオーバーヘッドが増大するため、chunk_size=50が最速となくなる可能性が高い。大本のプロジェクト用のData数が現状より多くなった場合に折って検証したい。
総じて、chunk_size=50が最適というより、「状況に応じた適切な分割数が最適」と考えたほうが良い。
Task.async_streamにmax_concurrency: System.schedulers_online()を指定した場合
max_concurrencyは「一度にどれだけのタスクを同時実行できるか」であり、この場合はCPUが同時に処理できる数。この場合は同時実行タスク数をCPUのコア数MAXまでと制限している。
### chunk_size=50
# 228 199 189 286 410 229 245 214 209 231
# 239 219 224 249 214 235 228 226 227 232
性能差は小さいが、並列度の制御という意味では有効。max_concurrency = max(1, round(System.schedulers_online() * n / 100))のようにしておけば、最低でもコア1つ、MAXで全コアのうちn%という条件指定も可能。
(なお、API通信やDBアクセスと言ったI/O境界操作の場合はCPUのコア数より並列化可能。というのもI/O待ち時間の間はCPUは待機状態となり、そのCPUに別の処理を実行させることが可能な為。盲点だった)
chunk分割についてはコア数ベースに相対割合とするのが良いだろうか。こちらも折って検証したい。
結論
Elixirでの並列処理は書くのは簡単、速くするのはちゃんと難しい。
雑に分割して丸投げするだけでは翻って遅くなってしまうリスクもある。
速くするには、分割粒度と同時実行数のバランスを取る必要がある。突き詰めれば他言語と変わらないようだ。
とはいえ、設計段階からして並列処理の難しい(ただし自由度は高い)Javaや、実行順序の担保をちゃんと確認する必要のあるJavascriptと比べれば書きやすいのはマジ。
Node.jsがI/Oの比重が大きい事務~調整担当だとすれば、Elixirは入力を流れるように加工し出力してくれる工場を構築するようなものだと考えると良いか。