この記事はElixir Advent Calendar 2020の11日目です。
昨日は @zacky1972 さんの「Elixir から Swift 5.3のコードを呼び出す方法(Autotoolsを使って / Apple Silicon M1チップにも対応)」でした。
明日は @MzRyuKa さんの「書評:プログラミングElixir第2版」です。
本記事では「Taskを使うとすごく簡単に並行処理が書ける!しかもサーバー化も簡単!Elixirすごい!」と感動したので、その作成過程を紹介しつつ、感動をお伝えできたらと思います。
Taskのおさらい
Task 使ってますか? ざっくり復習します。
Task.asyncで別プロセスを生成できる
IExで、Process.sleepするとIExのプロセスがsleepするので入力できなくなります。
iex()> Process.sleep(5000)
# 5秒間入力できない
Task.asyncを使うと別プロセスで処理ができる(つまり並行に処理できている)ので、入力が継続できます。
iex()> require Logger
...()> Task.async(fn ->
...()> Process.sleep(10000)
...()> Logger.info("yey")
...()> end)
%Task{
owner: #PID<0.111.0>,
pid: #PID<0.144.0>,
ref: #Reference<0.3890488730.554434561.238327>
}
...()> "saki ni yey" # 入力できます。
"saki ni yey"
iex()>
22:39:23.952 [info] yey
Task.awaitでTask.asyncの結果を受け取れる
Task.asyncが返すtask構造体をTask.awaitに渡すと、結果を受け取ることができます。
iex()> task = Task.async(fn ->
...()> Process.sleep(5000)
...()> 2+3
...()> end)
%Task{
owner: #PID<0.111.0>,
pid: #PID<0.137.0>,
ref: #Reference<0.3890488730.554434561.238283>
}
iex()> Task.await(task)
5
チョット工夫してみる
おさらいできたので、チョット工夫してみます。
Taskを使うと「mfa(module, function, arguments)のタプルのリスト」(以降コマンドリストと呼ぶ)を並行に実行してくれる関数を以下のように書くことができます。
defmodule CommandExecutor do
@spec do_async([tuple()]) :: [any()]
def do_async(commands) when is_list(commands) do
commands
|> Enum.map(fn {m, f, a} -> Task.async(m, f, a) end) # コマンドリストは処理が開始されタスクのリストに
|> Enum.map(fn task -> Task.await(task) end) # タスクの結果を一つずつ回収
end
end
# Task.async/1は匿名関数を、Task.async/3はmfaを処理することができます。
※コマンドの実行は並行でできますが、結果の回収は一番遅いtaskの処理に引っ張られます。
※do_asyncの呼び出し側は結果の回収が完了するまでブロックします。
コマンド実行サーバーにしてみる
作ったdo_asyncを使って、メッセージとしてコマンドリストを投げたら、処理してくれるサーバーを作ってみます。
※リスト内のコマンドは並行処理しますが、リスト単位では逐次処理します。(GenServerは受け取ったメッセージを逐次処理する)
defmodule CommandExecutor do
use GenServer
def start_link(state), do: GenServer.start_link(__MODULE__, state, name: __MODULE__)
def init(state), do: {:ok, state}
# 結果を返さない
def handle_cast({:commands, commands, :async}, state) when is_list(commands) do
do_async(commands)
{:noreply, state}
end
# 結果を返す
def handle_call({:commands, commands, :async}, _from, state) when is_list(commands) do
{:reply, do_async(commands), state}
end
@spec do_async([tuple()]) :: [any()]
def do_async(commands) when is_list(commands) do
commands
|> Enum.map(fn {m, f, a} -> Task.async(m, f, a) end)
|> Enum.map(fn task -> Task.await(task) end)
end
end
コマンドをぽいぽい投げ込めば、リスト単位で逐次処理してくれます。
※コマンドリストをたくさん投げる可能性がある場合は、GenServerのキュー(mailbox)にたまるので、その限界を知っておく必要があります。(知ってたら教えてくださいm(- -)m
コマンド実行で例外が起きたら?
実行できないmfaが投げ込まれる可能性があるので対策しておきます。
@spec do_async([tuple()]) :: [any()]
def do_async(commands) when is_list(commands) do
commands
|> Enum.map(fn {m, f, a} ->
try do
Task.async(m, f, a)
rescue
error in [UndefinedFunctionError] -> error
other_error -> reraise("あちゃー", System.stacktrace)
end
end)
|> Enum.map(fn task -> Task.await(task) end)
end
でも、これだと予見できない例外でGenServerが落ちるかもしれません。
予見できない例外で落ちる場合はSupervisorに復帰してもらう
「僕に分かる対策はした、あとは頼んだよSupervisor君!!」という感じで、基本はお作法に従って書くだけです。
defmodule CommandExecutorSupervisor do
use Supervisor
def start_link(start_state) do
Supervisor.start_link(__MODULE__, start_state, name: __MODULE__)
end
def init(_start_state) do
children = [{CommandExecutor, nil}]
Supervisor.init(children, strategy: :one_for_one)
end
end
※復帰動作の細かい設定をしたい場合はドキュメントを読みましょう。
Applicationに組み込む
アプリ−ケーションの起動時からコマンド実行サーバーが動作するようにApplicationに組み込みます。
defmodule Oreore.Application do
use Application
def start(_type, _args) do
children = [..., {CommandExecutorSupervisor, []}]
Supervisor.start_link(children, strategy: :one_for_one)
end
end
できました。
うぉお、めっちゃ簡単、並行コマンド実行サーバーできた!
おわり
雑にさらさら進めましたが、ポイントは以下でした。
- Taskを使うことで、スレッドを意識せずに並行処理が書ける!
- GenServerのメッセージキューに頼ることで、自身でキューの実装をせずにすむ!
- SupervisorにGenServerを監視してもらえるので自動復帰も余裕!
Elixir, Erlang OTPすごいですね!!これからもElixirやっていきましょう♪♪