はじめに
ElixirはBEAM(ErlangVM)で動作することから、Erlangの並行性と信頼性の特徴を受け継いでいます。
そのElixirの並行性について書いていきます。
Elixirのプロセス
Elixirではすべてのコードがプロセス内部で実行されます。プロセスは並行で動作してお互いに独立しています。
Elixir(Erlang)でいうプロセスはOSのプロセスとは別物で非常に軽量です。
メモリは約300ワードで起動にかかる時間は数マイクロ秒だと言われます。
プロセスの生成
spawn/1
又はspawn/3
関数で新しいプロセスを生成します。
defmodule Hello do
def greet(name) do
IO.puts "Hello, #{name}"
end
end
iex> pid = spawn(fn -> Hello.greet("world") end)
Hello, world
#PID<0.128.0>
iex> pid = spawn(Hello, :greet, ["world"])
Hello, world
#PID<0.130.0>
iex> Process.alive?(pid)
false
-
spawn/1
は引数に匿名関数を指定する -
spawn/3
の引数はモジュール名、生成したプロセスで実行される関数、関数に渡す引数の順番になる -
spawn
はPID(プロセス識別子)を返す - 実行される関数が終わったらプロセスが終了する
プロセス間でのメッセージ送信
メッセージの送信と受け取りはそれぞれsend/2
とreceive/1
を使います。
defmodule Hello do
def greet do
# 親プロセスからのメッセージを待ち受ける
receive do
{sender, name} ->
send sender, {:ok, "Hello, #{name}"}
end
end
end
# 子プロセスを生成
pid = spawn(Hello, :greet, [])
send pid, {self(), "world"}
# 子プロセスからのメッセージを待ち受ける
receive do
{:ok, message} -> IO.puts message
after 500 -> "Nothing happened."
end
-
self/0
は自分のPIDを返す -
send/2
は引数に送信先のPIDと送信するメッセージを指定する -
receive/1
でメッセージを待ち受けて、マッチするメッセージを受信するまで待ち続ける -
after ミリ秒
でタイムアウトする時間を指定できる
プロセスをリンクする
Elixirのプロセスは独立しているため、強制終了した時に他のプロセスに影響がありません。
強制終了の通知をお互いに受け取るには、複数プロセスをspawn_link/1
又はspawn_link/3
でリンクする方法があります。
defmodule Example do
import :timer, only: [ sleep: 1 ]
def explode do
sleep 500
exit(:boom)
end
def run do
spawn_link(Example, :explode, [])
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened."
end
end
end
iex> Example.run()
** (EXIT from #PID<0.109.0>) shell process exited with reason: :boom
この例の結果、子プロセスが強制終了するとリンクした親プロセスも落ちます。
親プロセスを強制終了させたくない場合はflag/2
を使って終了を捕捉します。
defmodule Example do
import :timer, only: [ sleep: 1 ]
def explode do
sleep 500
exit(:boom)
end
def run do
Process.flag(:trap_exit, true) # 終了を捕捉する
spawn_link(Example, :explode, [])
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened."
end
end
end
iex> Example.run()
MESSAGE RECEIVED: {:EXIT, #PID<0.196.0>, :boom}
:ok
プロセスのモニタリング
リンクがお互いに監視し合うのに対し、片方だけを監視するのがモニタです。
spawn_monitor/1
又はspawn_monitor/3
を使ってモニタするプロセスを生成します。
defmodule Example do
import :timer, only: [ sleep: 1 ]
def explode do
sleep 500
exit(:boom)
end
def run do
res = spawn_monitor(Example, :explode, [])
IO.puts inspect res
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened."
end
end
end
iex> Example.run()
{#PID<0.246.0>, #Reference<0.824531270.3884187651.58929>}
MESSAGE RECEIVED: {:DOWN, #Reference<0.824531270.3884187651.58929>, :process, #PID<0.246.0>, :boom}
:ok
モニタしたプロセスが終了した場合、ダウンメッセージ({:DOWN, ...}
)を受け取ります。
spawn_monitor
関数はPIDとReference(生成されたモニタの識別子)を返します。
OTPサーバー
OTP
Erlangでは耐障害性の高い(フォールトトレラントな)システムを作るためのライブラリとデザインパターンがOTP(Open Telecom Platform)というフレームワークとして提供されています。
Elixirもプロセスの操作に関する処理にOTPを利用します。
ビヘイビア
OTPでは汎用的な処理のパターンのことを「ビヘイビア」として定義します。ビヘイビアには主に2つの役割があります。
- 実装しなければならない関数一式を定義すること
- その関数一式が実際に実装されているかチェックすること
Javaでいうインターフェースに近いものだと考えても良いです。
Elixirではデフォルトのコールバックをすべて定義しているので必要な実装だけオーバーライトで良いです。
GenServer
GenServer
はOTPビヘイビアの一つです。
- クライアントプロセスからのリクエストを受け取る
- サーバー状態の更新(何らかの処理)を行う
- レスポンスを返す(或いは返さない)
という基本的なサーバーの振る舞いを抽象化したパターンです。
https://github.com/elixir-lang/elixir/blob/master/lib/elixir/lib/gen_server.ex
OTPサーバーはこのパターンにしたがってGenServer
の振る舞いを持ったモジュールです。
簡単なカウンターサーバーを作ります。
- サーバーを開始する時にカウンターの初期値を渡す
- サーバー(プロセス)はサーバー状態(カウンター値)を持つ
- カウントアップ、カウントダウンを呼び出すとサーバー状態(カウンター値)が更新される
- 現在のサーバー状態(カウンター値)を確認できる
GenServer
には6つのコールバック関数があります。use GenServer
でこれらの関数のデフォルトの実装が生成されます。
今回必要なhandle_call
とhandle_cast
だけオーバーライドします。
関数名 | 説明 |
---|---|
handle_call/3 | ・同期関数 ・GenServer.call(サーバーのPID, メッセージ(識別子)) を使った時に実行される ・引数:handle_call(メッセージ(識別子), クライアントのPID, 現在のサーバーの状態) ・戻り値:{:reply, 返り値, 新しい状態} |
handle_cast/2 | ・非同期関数 ・GenServer.cast(サーバーのPID, メッセージ(識別子)) を使った時に実行される ・引数:handle_cast(メッセージ(識別子), 現在のサーバーの状態) ・戻り値:{:noreply, 新しい状態} |
handle_cast/2
とhandle_call/3
はよく似た動きをします。ただし、非同期関数のhandle_cast/2
はクライアントのPID
を受け取らず返り値もありません。
このようにGenServer
を利用して実装したサーバーはGenServer.start_link(サーバーのモジュール名, 初期値, オプション)
でプロセスとして起動します。
name:
オプションを使ってプロセスの名前を付けることで、プロセスを参照する時はPID
の代わりにその名前が使えます。
defmodule Counter.Server do
use GenServer
### ヘルパー関数
def start_link(current_number) do
GenServer.start_link(__MODULE__, current_number, name: __MODULE__) # プロセスの名前をモジュールの名前にする
end
def count_up do
GenServer.call __MODULE__, :up
end
def count_down do
GenServer.cast __MODULE__, :down
end
def current_number do
GenServer.call __MODULE__, :current
end
### コールバック関数
@doc """
GenServer.handle_call/3 コールバック
"""
def handle_call(:up, _from, current_number) do
{:reply, current_number + 1, current_number + 1}
end
def handle_call(:current, _from, current_number) do
{:reply, current_number, current_number}
end
@doc """
GenServer.handle_cast/2 コールバック
"""
def handle_cast(:down, current_number) do
{:noreply, current_number - 1}
end
end
実行した結果です。
iex> Counter.Server.start_link 0
{:ok, #PID<0.120.0>}
iex> Counter.Server.count_up
1
iex> Counter.Server.count_up
2
iex> Counter.Server.count_up
3
iex> Counter.Server.count_down
:ok
iex> Counter.Server.current_number
2
スーパーバイザ - Supervisor
スーパーバイザはOTPスーパーバイザビヘイビアを使ったプロセスで、他のプロセスを監視するのが目的です。
子プロセスが失敗した際に自動的に再起動させることで、フォールトトレラントなシステムを作ることができます。
strategy:
オプションで再起動の戦略を設定することができます。
https://hexdocs.pm/elixir/Supervisor.html#module-strategies
設定値 | 戦略 |
---|---|
:one_for_all | 全ての子プロセスを再起動 |
:one_for_one | 失敗した子プロセスのみ再起動 |
:rest_for_one | 失敗した子プロセスとそれ以降に開始されたプロセスを再起動 |
カウンターサーバーにスーパーバイザを追加します。
コールバック関数のinit/1
を定義して、監視対象となるプロセスのリストを渡します。
defmodule Counter.Supervisor do
use Supervisor
def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl true
def init(_arg) do
children = [
{Counter.Server, 0}
]
Supervisor.init(children, strategy: :one_for_one)
end
end
テストのためにカウンターサーバーに強制終了する関数を追加します。
defmodule Counter.Server do
use GenServer
### ヘルパー関数
...
@doc """
強制終了させる
"""
def kill do
pid = Process.whereis(__MODULE__)
Process.exit(pid, :kill)
end
### コールバック関数
...
end
Counter.Supervisor.start_link/1
でスーパーバイザプロセスを生成します。
生成されたスーパーバイザプロセスは監視対象となるCounter.Server
のstart_link/1
を呼び出して、GenServer
プロセスを生成します。
kill/0
を呼び出した後、プロセスが再起動されてカウンターが初期値の0
に戻りました。
iex> Counter.Supervisor.start_link([])
{:ok, #PID<0.120.0>}
iex> Counter.Server.current_number
0
iex> Counter.Server.count_up
1
iex> Counter.Server.count_up
2
iex> Counter.Server.kill
true
iex> Counter.Server.current_number
0
iex> Counter.Server.count_up
1
エージェント - Agent
エージェントは状態を持つバックグラウンドプロセス周りを抽象化したものです。
https://github.com/elixir-lang/elixir/blob/master/lib/elixir/lib/agent.ex
次のようにカウンターサーバーをエージェントで実装します。
defmodule Counter.Agent do
@doc """
現在のプロセスにリンクされるエージェントを起動する
current_numberは状態の初期値
"""
def start_link(current_number) do
Agent.start_link(fn -> current_number end, name: __MODULE__)
end
@doc """
Agent.update/3で状態を更新する(カウントアップ)
"""
def count_up do
Agent.update(__MODULE__, fn state -> state + 1 end)
end
@doc """
Agent.update/3で状態を更新する(カウントダウン)
"""
def count_down do
Agent.update(__MODULE__, fn state -> state - 1 end)
end
@doc """
Agent.get/3で現在の状態を取得する
"""
def current_number do
Agent.get(__MODULE__, fn state -> state end)
end
end
実行した結果です。
iex> Counter.Agent.start_link(0)
{:ok, #PID<0.153.0>}
iex> Counter.Agent.count_up
:ok
iex> Counter.Agent.count_up
:ok
iex> Counter.Agent.current_number
2
iex> Counter.Agent.count_down
:ok
iex> Counter.Agent.current_number
1
タスク - Task
タスクは関数をバックグラウンドで実行し、後でその戻り値を受け取る方法を提供します。
フィボナッチ数を計算する関数を作ります。
defmodule Fibonacci do
def fib(0), do: 0
def fib(1), do: 1
def fib(n), do: Fibonacci.fib(n-1) + Fibonacci.fib(n-2)
この関数をタスクで実行します。
Task.async/1
を呼ぶと、渡された関数を実行する独立したプロセスを生成します。Task.async
は Task
の構造体を返します。
Task.await/2
はバックグラウンドのタスクが終了するのを待ち、その関数の戻り値を返します。
iex> task = Task.async(fn -> Fibonacci.fib(20) end)
%Task{
owner: #PID<0.84.0>,
pid: #PID<0.94.0>,
ref: #Reference<0.1869583016.3732406273.208984>
}
iex> Task.await(task)
6765
Task.async/3
を使ってモジュール名、関数名と引数を渡すのも同じです。
iex> task = Task.async(Fibonacci, :fib, [20])
%Task{
owner: #PID<0.84.0>,
pid: #PID<0.97.0>,
ref: #Reference<0.1869583016.3732406273.209035>
}
iex> Task.await(task)
6765
Task
は専用のスーパーバイザTask.Supervisor
を持っています。
内部ではDynamicSupervisor
を使って動的にタスクを生成するようになっています。
Supervisor.start_link/2
でスーパーバイザを開始します。
children = [
{Task.Supervisor, name: Fib.TaskSupervisor, restart: :permanent}
]
{:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)
restart:
オプションの値は3つあります。
-
:permanent
- 子プロセスは常に再起動される -
:temporary
- 子プロセスは決して再起動されない(スーパバイザの戦略が:rest_for_one
、:one_for_all
のときも) -
:transient
- 異常終了した時のみ子プロセスは再起動される(:normal
,:shutdown
,{:shutdown, term}
以外のreason)
開始されたスーパーバイザにタスクを追加します。
{:ok, pid} = Task.Supervisor.start_child(Fib.TaskSupervisor, Fibonacci, :fib, [50])
タスクがクラッシュした場合は再起動が行われます。
iex> [pid] = Task.Supervisor.children(Fib.TaskSupervisor)
[#PID<0.97.0>]
iex> Process.exit(pid, :kill) # 強制終了させる
true
iex> [pid] = Task.Supervisor.children(Fib.TaskSupervisor)
[#PID<0.100.0>]
まとめ
- Elixirのプロセスは軽量でお互いに独立して並行で動作している
-
spawn
やsend
、receive
などの関数でプロセスの操作ができる - ElixirとOTPで抽象化したモジュールが提供されている
-
GenServer
: プロセスをカプセル化し、同期通信、非同期通信やコードの再読込をサポートする一般的なサーバー(ビヘイビア) -
Supervisor
: プロセスの監視と再起動を行う(ビヘイビア) -
Agent
: ステート周りのシンプルなラッパー -
Task
: プロセスを発行し後から簡単に呼び出しができる非同期処理の単位
-
参考
プログラミングElixir
https://elixirschool.com/ja/lessons/advanced/concurrency/
https://elixirschool.com/en/lessons/advanced/otp-supervisors/
http://elixir-ja.sena-net.works/getting_started/mix_otp/1.html