LoginSignup
19
13

More than 5 years have passed since last update.

Elixirの並行処理についての勉強メモ

Last updated at Posted at 2018-08-12

はじめに

ElixirはBEAM(ErlangVM)で動作することから、Erlangの並行性と信頼性の特徴を受け継いでいます。
そのElixirの並行性について書いていきます。

Elixirのプロセス

Elixirではすべてのコードがプロセス内部で実行されます。プロセスは並行で動作してお互いに独立しています。
Elixir(Erlang)でいうプロセスはOSのプロセスとは別物で非常に軽量です。
メモリは約300ワードで起動にかかる時間は数マイクロ秒だと言われます。

プロセスの生成

spawn/1又はspawn/3関数で新しいプロセスを生成します。

プロセスの生成
defmodule Hello do
  def greet(name) do
    IO.puts "Hello, #{name}"
  end
end

iex> pid = spawn(fn -> Hello.greet("world") end)
Hello, world
#PID<0.128.0>
iex> pid = spawn(Hello, :greet, ["world"])
Hello, world
#PID<0.130.0>
iex> Process.alive?(pid)
false
  • spawn/1は引数に匿名関数を指定する
  • spawn/3の引数はモジュール名、生成したプロセスで実行される関数、関数に渡す引数の順番になる
  • spawnはPID(プロセス識別子)を返す
  • 実行される関数が終わったらプロセスが終了する

プロセス間でのメッセージ送信

メッセージの送信と受け取りはそれぞれsend/2receive/1を使います。

プロセス間でのメッセージ送信
defmodule Hello do
  def greet do
    # 親プロセスからのメッセージを待ち受ける
    receive do
      {sender, name} ->
        send sender, {:ok, "Hello, #{name}"}
    end
  end
end

# 子プロセスを生成
pid = spawn(Hello, :greet, [])
send pid, {self(), "world"}

# 子プロセスからのメッセージを待ち受ける
receive do
  {:ok, message} -> IO.puts message
  after 500 -> "Nothing happened."
end
  • self/0は自分のPIDを返す
  • send/2は引数に送信先のPIDと送信するメッセージを指定する
  • receive/1でメッセージを待ち受けて、マッチするメッセージを受信するまで待ち続ける
  • after ミリ秒でタイムアウトする時間を指定できる

プロセスをリンクする

Elixirのプロセスは独立しているため、強制終了した時に他のプロセスに影響がありません。
強制終了の通知をお互いに受け取るには、複数プロセスをspawn_link/1又はspawn_link/3でリンクする方法があります。

プロセスをリンクする
defmodule Example do
  import :timer, only: [ sleep: 1 ]

  def explode do
    sleep 500
    exit(:boom)
  end

  def run do
    spawn_link(Example, :explode, [])
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
      after 1000 ->
        IO.puts "Nothing happened."
    end
  end
end

iex> Example.run()
** (EXIT from #PID<0.109.0>) shell process exited with reason: :boom

この例の結果、子プロセスが強制終了するとリンクした親プロセスも落ちます。
親プロセスを強制終了させたくない場合はflag/2を使って終了を捕捉します。

プロセスをリンクする
defmodule Example do
  import :timer, only: [ sleep: 1 ]

  def explode do
    sleep 500
    exit(:boom)
  end

  def run do
    Process.flag(:trap_exit, true)  # 終了を捕捉する
    spawn_link(Example, :explode, [])
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
      after 1000 ->
        IO.puts "Nothing happened."
    end
  end
end

iex> Example.run()
MESSAGE RECEIVED: {:EXIT, #PID<0.196.0>, :boom}
:ok

プロセスのモニタリング

リンクがお互いに監視し合うのに対し、片方だけを監視するのがモニタです。
spawn_monitor/1又はspawn_monitor/3を使ってモニタするプロセスを生成します。

プロセスのモニタリング
defmodule Example do
  import :timer, only: [ sleep: 1 ]

  def explode do
    sleep 500
    exit(:boom)
  end

  def run do
    res = spawn_monitor(Example, :explode, [])
    IO.puts inspect res
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
      after 1000 ->
        IO.puts "Nothing happened."
    end
  end
end

iex> Example.run()
{#PID<0.246.0>, #Reference<0.824531270.3884187651.58929>}
MESSAGE RECEIVED: {:DOWN, #Reference<0.824531270.3884187651.58929>, :process, #PID<0.246.0>, :boom}
:ok

モニタしたプロセスが終了した場合、ダウンメッセージ({:DOWN, ...})を受け取ります。
spawn_monitor関数はPIDとReference(生成されたモニタの識別子)を返します。

OTPサーバー

OTP

Erlangでは耐障害性の高い(フォールトトレラントな)システムを作るためのライブラリとデザインパターンがOTP(Open Telecom Platform)というフレームワークとして提供されています。
Elixirもプロセスの操作に関する処理にOTPを利用します。

ビヘイビア

OTPでは汎用的な処理のパターンのことを「ビヘイビア」として定義します。ビヘイビアには主に2つの役割があります。

  • 実装しなければならない関数一式を定義すること
  • その関数一式が実際に実装されているかチェックすること

Javaでいうインターフェースに近いものだと考えても良いです。
Elixirではデフォルトのコールバックをすべて定義しているので必要な実装だけオーバーライトで良いです。

GenServer

GenServerはOTPビヘイビアの一つです。

  • クライアントプロセスからのリクエストを受け取る
  • サーバー状態の更新(何らかの処理)を行う
  • レスポンスを返す(或いは返さない)

という基本的なサーバーの振る舞いを抽象化したパターンです。
https://github.com/elixir-lang/elixir/blob/master/lib/elixir/lib/gen_server.ex
OTPサーバーはこのパターンにしたがってGenServerの振る舞いを持ったモジュールです。
簡単なカウンターサーバーを作ります。

  • サーバーを開始する時にカウンターの初期値を渡す
  • サーバー(プロセス)はサーバー状態(カウンター値)を持つ
  • カウントアップ、カウントダウンを呼び出すとサーバー状態(カウンター値)が更新される
  • 現在のサーバー状態(カウンター値)を確認できる

GenServerには6つのコールバック関数があります。use GenServerでこれらの関数のデフォルトの実装が生成されます。
今回必要なhandle_callhandle_castだけオーバーライドします。

関数名 説明
handle_call/3 ・同期関数
・GenServer.call(サーバーのPID, メッセージ(識別子)) を使った時に実行される
・引数:handle_call(メッセージ(識別子), クライアントのPID, 現在のサーバーの状態)
・戻り値:{:reply, 返り値, 新しい状態}
handle_cast/2 ・非同期関数
・GenServer.cast(サーバーのPID, メッセージ(識別子)) を使った時に実行される
・引数:handle_cast(メッセージ(識別子), 現在のサーバーの状態)
・戻り値:{:noreply, 新しい状態}

handle_cast/2handle_call/3はよく似た動きをします。ただし、非同期関数のhandle_cast/2はクライアントのPIDを受け取らず返り値もありません。

このようにGenServerを利用して実装したサーバーはGenServer.start_link(サーバーのモジュール名, 初期値, オプション)でプロセスとして起動します。
name:オプションを使ってプロセスの名前を付けることで、プロセスを参照する時はPIDの代わりにその名前が使えます。

defmodule Counter.Server do
  use GenServer

  ### ヘルパー関数

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)  # プロセスの名前をモジュールの名前にする
  end

  def count_up do
    GenServer.call __MODULE__, :up
  end

  def count_down do
    GenServer.cast __MODULE__, :down
  end

  def current_number do
    GenServer.call __MODULE__, :current
  end

  ### コールバック関数

  @doc """
  GenServer.handle_call/3 コールバック
  """
  def handle_call(:up, _from, current_number) do
    {:reply, current_number + 1, current_number + 1}
  end

  def handle_call(:current, _from, current_number) do
    {:reply, current_number, current_number}
  end

  @doc """
  GenServer.handle_cast/2 コールバック
  """
  def handle_cast(:down, current_number) do
    {:noreply, current_number - 1}
  end
end

実行した結果です。

iex> Counter.Server.start_link 0
{:ok, #PID<0.120.0>}
iex> Counter.Server.count_up
1
iex> Counter.Server.count_up
2
iex> Counter.Server.count_up
3
iex> Counter.Server.count_down
:ok
iex> Counter.Server.current_number
2

スーパーバイザ - Supervisor

スーパーバイザはOTPスーパーバイザビヘイビアを使ったプロセスで、他のプロセスを監視するのが目的です。
子プロセスが失敗した際に自動的に再起動させることで、フォールトトレラントなシステムを作ることができます。

strategy:オプションで再起動の戦略を設定することができます。
https://hexdocs.pm/elixir/Supervisor.html#module-strategies

設定値 戦略
:one_for_all 全ての子プロセスを再起動
:one_for_one 失敗した子プロセスのみ再起動
:rest_for_one 失敗した子プロセスとそれ以降に開始されたプロセスを再起動

カウンターサーバーにスーパーバイザを追加します。
コールバック関数のinit/1を定義して、監視対象となるプロセスのリストを渡します。

defmodule Counter.Supervisor do
  use Supervisor

  def start_link(arg) do
    Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  @impl true
  def init(_arg) do
    children = [
      {Counter.Server, 0}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end

テストのためにカウンターサーバーに強制終了する関数を追加します。

defmodule Counter.Server do
  use GenServer

  ### ヘルパー関数
  ...

  @doc """
  強制終了させる
  """
  def kill do
    pid = Process.whereis(__MODULE__)
    Process.exit(pid, :kill)
  end

  ### コールバック関数
  ...
end

Counter.Supervisor.start_link/1でスーパーバイザプロセスを生成します。

生成されたスーパーバイザプロセスは監視対象となるCounter.Serverstart_link/1を呼び出して、GenServerプロセスを生成します。
kill/0を呼び出した後、プロセスが再起動されてカウンターが初期値の0に戻りました。

iex> Counter.Supervisor.start_link([])
{:ok, #PID<0.120.0>}
iex> Counter.Server.current_number
0
iex> Counter.Server.count_up
1
iex> Counter.Server.count_up
2
iex> Counter.Server.kill
true
iex> Counter.Server.current_number
0
iex> Counter.Server.count_up
1

エージェント - Agent

エージェントは状態を持つバックグラウンドプロセス周りを抽象化したものです。

https://github.com/elixir-lang/elixir/blob/master/lib/elixir/lib/agent.ex
次のようにカウンターサーバーをエージェントで実装します。

defmodule Counter.Agent do
  @doc """
  現在のプロセスにリンクされるエージェントを起動する
  current_numberは状態の初期値
  """
  def start_link(current_number) do
    Agent.start_link(fn -> current_number end, name: __MODULE__)
  end

  @doc """
  Agent.update/3で状態を更新する(カウントアップ)
  """
  def count_up do
    Agent.update(__MODULE__, fn state -> state + 1 end)
  end

  @doc """
  Agent.update/3で状態を更新する(カウントダウン)
  """
  def count_down do
    Agent.update(__MODULE__, fn state -> state - 1 end)
  end

  @doc """
  Agent.get/3で現在の状態を取得する
  """
  def current_number do
    Agent.get(__MODULE__, fn state -> state end)
  end
end

実行した結果です。

iex> Counter.Agent.start_link(0)
{:ok, #PID<0.153.0>}
iex> Counter.Agent.count_up
:ok
iex> Counter.Agent.count_up
:ok
iex> Counter.Agent.current_number
2
iex> Counter.Agent.count_down
:ok
iex> Counter.Agent.current_number
1

タスク - Task

タスクは関数をバックグラウンドで実行し、後でその戻り値を受け取る方法を提供します。

フィボナッチ数を計算する関数を作ります。

defmodule Fibonacci do
  def fib(0), do: 0
  def fib(1), do: 1
  def fib(n), do: Fibonacci.fib(n-1) + Fibonacci.fib(n-2)

この関数をタスクで実行します。

Task.async/1を呼ぶと、渡された関数を実行する独立したプロセスを生成します。Task.asyncTaskの構造体を返します。
Task.await/2はバックグラウンドのタスクが終了するのを待ち、その関数の戻り値を返します。

iex> task = Task.async(fn -> Fibonacci.fib(20) end)
%Task{
  owner: #PID<0.84.0>,
  pid: #PID<0.94.0>,
  ref: #Reference<0.1869583016.3732406273.208984>
}
iex> Task.await(task)
6765

Task.async/3を使ってモジュール名、関数名と引数を渡すのも同じです。

iex> task = Task.async(Fibonacci, :fib, [20])
%Task{
  owner: #PID<0.84.0>,
  pid: #PID<0.97.0>,
  ref: #Reference<0.1869583016.3732406273.209035>
}
iex> Task.await(task)
6765

Taskは専用のスーパーバイザTask.Supervisorを持っています。
内部ではDynamicSupervisorを使って動的にタスクを生成するようになっています。

Supervisor.start_link/2でスーパーバイザを開始します。

children = [
  {Task.Supervisor, name: Fib.TaskSupervisor, restart: :permanent}
]
{:ok, pid} = Supervisor.start_link(children, strategy: :one_for_one)

restart:オプションの値は3つあります。

  • :permanent - 子プロセスは常に再起動される
  • :temporary - 子プロセスは決して再起動されない(スーパバイザの戦略が:rest_for_one:one_for_allのときも)
  • :transient - 異常終了した時のみ子プロセスは再起動される(:normal, :shutdown, {:shutdown, term}以外のreason)

開始されたスーパーバイザにタスクを追加します。

{:ok, pid} = Task.Supervisor.start_child(Fib.TaskSupervisor, Fibonacci, :fib, [50])

タスクがクラッシュした場合は再起動が行われます。

iex> [pid] = Task.Supervisor.children(Fib.TaskSupervisor)
[#PID<0.97.0>]
iex> Process.exit(pid, :kill)  # 強制終了させる
true
iex> [pid] = Task.Supervisor.children(Fib.TaskSupervisor)
[#PID<0.100.0>]

まとめ

  • Elixirのプロセスは軽量でお互いに独立して並行で動作している
  • spawnsendreceiveなどの関数でプロセスの操作ができる
  • ElixirとOTPで抽象化したモジュールが提供されている
    • GenServer: プロセスをカプセル化し、同期通信、非同期通信やコードの再読込をサポートする一般的なサーバー(ビヘイビア)
    • Supervisor: プロセスの監視と再起動を行う(ビヘイビア)
    • Agent: ステート周りのシンプルなラッパー
    • Task: プロセスを発行し後から簡単に呼び出しができる非同期処理の単位

参考

プログラミングElixir
https://elixirschool.com/ja/lessons/advanced/concurrency/
https://elixirschool.com/en/lessons/advanced/otp-supervisors/
http://elixir-ja.sena-net.works/getting_started/mix_otp/1.html

19
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
13