環境
$ lsb_release -d
Description: Ubuntu 18.04.2 LTS
$ elixir -v
Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]
Elixir 1.8.1 (compiled with Erlang/OTP 20)
概要
アプリケーションについて、プロセスプールを作ることで説明されるが、
この章ではまだ正式な OTP アプリケーションを作るわけではない。
E本の説明の流れが分かりづらいんだよね。
翻訳のせいじゃない気がする。流れがね。
18.1 プロセスプール
Poolvisor
が Workervisor
も起動しちゃっていいと思うんだけど、
Stash
にそれをやらせようとしているのは、
スーパーバイザはシンプルであれということなのかな?
18.5 プールよ、走れ
PPool
モジュールには以下の関数がある。
-
start_link/0
: プロセスプール・アプリケーションを起動する。 -
stop/0
:アプリケーションを無理矢理落とす。 -
start_pool(mod, fun, args, opts)
: mfa、最大起動数、プール名にてプールを起動する。 -
stop_pool(name)
: プールを停止する。 -
run(name, args)
: プールのワーカーに仕事をさせる。最大数に達していたら:no_alloc
を返す。 -
async_queue(name, args)
:run/2
の非同期版。最大数に達していたらキューに貯め随時処理。 -
sync_queue(name, args)
:run/2
の同期版。最大数に達していたら処理開始まで戻ってこない。
以下に完成後の動作を示す。
Nagger
ワーカーは GenServer
で実装されていて、意味は「口うるさい奴」。
Nagger
に与える引数は [通知先, 送信内容, 送信間隔(ミリ秒), 送信回数]
である。
iex(1)> PPool.start_link()
{:ok, #PID<0.135.0>}
iex(2)> PPool.start_pool(Nagger, :start_link, [], limit: 2, name: :nagger)
{:ok, #PID<0.137.0>}
iex(3)> PPool.run(:nagger, [self(), :one, 10_000, 3])
{:ok, #PID<0.141.0>}
iex(4)> PPool.run(:nagger, [self(), :two, 10_000, 3])
{:ok, #PID<0.143.0>}
iex(5)> PPool.run(:nagger, [self(), :three, 10_000, 3])
:no_alloc
Received down message. # 30秒経過して最初のワーカーが終了した
iex(6)> PPool.run(:nagger, [self(), :three, 10_000, 3])
{:ok, #PID<0.146.0>}
Received down message.
iex(7)> flush()
{#PID<0.141.0>, :one}
{#PID<0.143.0>, :two}
{#PID<0.141.0>, :one}
{#PID<0.143.0>, :two}
{#PID<0.141.0>, :one}
{#PID<0.143.0>, :two}
:ok
Received down message.
iex(8)> flush()
{#PID<0.146.0>, :three}
{#PID<0.146.0>, :three}
{#PID<0.146.0>, :three}
:ok
### 非同期 ###
iex(9)> PPool.async_queue(:nagger, [self(), :one, 20_000, 1])
:ok
iex(10)> PPool.async_queue(:nagger, [self(), :two, 20_000, 1])
:ok
iex(11)> PPool.async_queue(:nagger, [self(), :three, 20_000, 1])
:ok
Received down message. # 20秒経過、3つ目のワーカー開始
Received down message.
iex(12)> flush()
{#PID<0.150.0>, :one}
{#PID<0.152.0>, :two}
:ok
Received down message.
iex(13)> flush()
{#PID<0.155.0>, :three}
:ok
### 同期 ###
iex(14)> PPool.sync_queue(:nagger, [self(), :one, 20_000, 1])
{:ok, #PID<0.158.0>}
iex(15)> PPool.sync_queue(:nagger, [self(), :two, 20_000, 1])
{:ok, #PID<0.160.0>}
iex(16)> PPool.sync_queue(:nagger, [self(), :three, 20_000, 1])
# 待機・・・
Received down message. # 20秒経過、3つ目のワーカー開始
{:ok, #PID<0.162.0>}
Received down message.
iex(17)> flush()
{#PID<0.158.0>, :one}
{#PID<0.160.0>, :two}
:ok
Received down message.
iex(18)> flush()
{#PID<0.162.0>, :three}
:ok
### 終了 ###
iex(19)> PPool.stop_pool(:nagger)
:ok
iex(20)> PPool.stop()
** (EXIT from #PID<0.133.0>) shell process exited with reason: killed
18.2 スーパバイザを実装する
$ mix new p_pool --module PPool
$ cd p_pool
$ mkdir lib/p_pool
--module PPool
とすると lib/p_pool.ex
が作られるので、
それに合わせて new p_pool
としている。
3つのスーパーバイザ(Server
, Poolvisor
, Workervisor
)から作る。
defmodule PPool.Supervisor do
use Supervisor
alias __MODULE__, as: Me
def start_link, do: Supervisor.start_link(Me, [], name: Me)
def stop do
pid = Process.whereis(Me)
if is_pid(pid) do
Process.exit(pid, :kill)
else
:ok
end
end
def start_pool(mod, fun, args, opts \\ []) do
default = [limit: 2, name: mod]
opts = Keyword.merge(default, opts)
limit = Keyword.get(opts, :limit)
name = Keyword.get(opts, :name)
mfa = {mod, fun, args}
overrides = [
id: name,
start: {PPool.Poolvisor, :start_link, [mfa, limit, name]}
]
child = Supervisor.child_spec(PPool.Poolvisor, overrides)
Supervisor.start_child(Me, child)
end
def stop_pool(name) do
Supervisor.terminate_child(Me, name)
Supervisor.delete_child(Me, name)
end
@impl true
def init([]) do
opts = [
strategy: :one_for_one,
max_restarts: 6,
max_seconds: 3_600
]
Supervisor.init([], opts)
end
end
Supervisor
は Poolvisor
を起動する。
defmodule PPool.Poolvisor do
use Supervisor, shutdown: 10_500
alias __MODULE__, as: Me
def start_link([mfa, limit, name]) do
start_link(mfa, limit, name)
end
def start_link(mfa, limit, name) do
Supervisor.start_link(Me, {mfa, limit, name})
end
@impl true
def init({mfa, limit, name}) do
children = [
{PPool.Stash, [self(), mfa, limit, name]}
]
opts = [
strategy: :one_for_all,
max_restarts: 1,
max_seconds: 3_600
]
Supervisor.init(children, opts)
end
end
Poolvisor
によって起動された Stash
が Workervisor
を起動する。
defmodule PPool.Workervisor do
# Stash が起動を担当しているので :temporary にしている
use DynamicSupervisor, restart: :temporary, shutdown: 10_000
alias __MODULE__, as: Me
def start_link([]), do: DynamicSupervisor.start_link(Me, [])
def start_child(pid, mfa = {_, _, _}) do
child = %{
id: :p_pool_worker,
start: mfa,
restart: :temporary
}
DynamicSupervisor.start_child(pid, child)
end
@impl true
def init([]) do
[
strategy: :one_for_one,
max_restarts: 5,
max_seconds: 3_600
]
|> DynamicSupervisor.init()
end
end
18.3 ワーカに取り組む
ユーティリティ関数は、第一引数に構造体を取ることによって
「Stash
クラスのメソッド」として見ることができる。
defmodule PPool.Stash do
use GenServer
alias __MODULE__, as: Me
defstruct [:sup, :mfa, :limit, :refs, :queue]
defp new(mfa = {_, _, _}, limit) do
refs = MapSet.new()
queue = :queue.new()
%Me{sup: nil, mfa: mfa, limit: limit, refs: refs, queue: queue}
end
defp start_worker(me = %Me{sup: sup, mfa: mfa}, args) do
{:ok, pid} = start_worker(sup, mfa, args)
refs = MapSet.put(me.refs, Process.monitor(pid))
new_me = %Me{me | limit: me.limit - 1, refs: refs}
{:ok, pid, new_me}
end
defp start_worker(sup, {m, f, a}, args) do
PPool.Workervisor.start_child(sup, {m, f, a ++ args})
end
# E本の handle_down_worker/2 に相当
defp update_worker(me = %Me{sup: sup, mfa: mfa}, ref) do
if MapSet.member?(me.refs, ref) do
case :queue.out(me.queue) do
{{:value, {from, args}}, queue} ->
{:ok, pid} = start_worker(sup, mfa, args)
refs = update_refs(me.refs, ref, pid)
GenServer.reply(from, {:ok, pid})
%Me{me | refs: refs, queue: queue}
{{:value, args}, queue} ->
{:ok, pid} = start_worker(sup, mfa, args)
refs = update_refs(me.refs, ref, pid)
%Me{me | refs: refs, queue: queue}
{:empty, _queue} ->
refs = MapSet.delete(me.refs, ref)
%Me{me | limit: me.limit + 1, refs: refs}
end
else
me
end
end
defp update_refs(refs = %MapSet{}, ref, pid) do
refs
|> MapSet.put(Process.monitor(pid))
|> MapSet.delete(ref)
end
def start_link([sup, mfa, limit, name]) do
start_link(sup, mfa, limit, name)
end
def start_link(sup, mfa, limit, name) do
GenServer.start_link(Me, {sup, mfa, limit}, name: name)
end
def run(name, args) do
GenServer.call(name, {:run, args})
end
def sync_queue(name, args) do
GenServer.call(name, {:sync, args}, :infinity)
end
def async_queue(name, args) do
GenServer.cast(name, {:async, args})
end
def stop(name) do
GenServer.call(name, :stop)
end
@impl true
def init({sup, mfa, limit}) do
# Workervisor を起動する前に init/1 を抜けておかないと
# 親の Poolvisor との間でデットロックが起きてしまう。
send(self(), {:start_worker_supervisor, sup})
{:ok, new(mfa, limit)}
end
@impl true
def handle_info({:start_worker_supervisor, sup}, me = %Me{}) do
{:ok, pid} = Supervisor.start_child(sup, PPool.Workervisor)
# :one_for_all で監視されてるのに、このリンクは必要あるのかな?
# Process.link(pid)
{:noreply, %Me{me | sup: pid}}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, _reason}, me = %Me{}) do
IO.puts("Received down message.")
{:noreply, update_worker(me, ref)}
end
@impl true
def handle_info(msg, me = %Me{}) do
IO.puts("Unknown message: #{inspect(msg)}")
{:noreply, me}
end
@impl true
def handle_call({:run, args}, _from, me = %Me{limit: limit}) do
if limit > 0 do
{:ok, pid, new_me} = start_worker(me, args)
{:reply, {:ok, pid}, new_me}
else
{:reply, :no_alloc, me}
end
end
@impl true
def handle_call({:sync, args}, from, me = %Me{limit: limit}) do
if limit > 0 do
{:ok, pid, new_me} = start_worker(me, args)
{:reply, {:ok, pid}, new_me}
else
queue = :queue.in({from, args}, me.queue)
{:noreply, %Me{me | queue: queue}}
end
end
@impl true
def handle_call(:stop, _from, me = %Me{}) do
{:stop, :normal, :ok, me}
end
@impl true
def handle_cast({:async, args}, me = %Me{limit: limit}) do
if limit > 0 do
{:ok, _pid, new_me} = start_worker(me, args)
{:noreply, new_me}
else
queue = :queue.in(args, me.queue)
{:noreply, %Me{me | queue: queue}}
end
end
@impl true
def code_change(_old_vsn, state, _extra), do: {:ok, state}
@impl true
def terminate(_reason, _state), do: :ok
end
Stash
のコードがなんか汚い。取り急ぎということでご了承ください。
最後にこのアプリケーションのインターフェースを整える。
defmodule PPool do
alias __MODULE__, as: Me
def start_link do
Me.Supervisor.start_link()
end
def stop do
Me.Supervisor.stop()
end
def start_pool(mod, fun, args, opts \\ [])
when is_atom(mod) and is_atom(fun) and is_list(args) and is_list(opts) do
Me.Supervisor.start_pool(mod, fun, args, opts)
end
def stop_pool(name) when is_atom(name) do
Me.Supervisor.stop_pool(name)
end
def run(name, args) when is_atom(name) and is_list(args) do
Me.Stash.run(name, args)
end
def sync_queue(name, args) when is_atom(name) and is_list(args) do
Me.Stash.sync_queue(name, args)
end
def async_queue(name, args) when is_atom(name) and is_list(args) do
Me.Stash.async_queue(name, args)
end
end
18.4 ワーカを書く
defmodule Nagger do
use GenServer
alias __MODULE__, as: Me
defstruct [:send_to, :task, :delay, :max]
def start_link(send_to, task, delay, max) do
me = %Me{send_to: send_to, task: task, delay: delay, max: max}
GenServer.start_link(Me, me)
end
def stop(pid), do: GenServer.call(pid, :stop)
@impl true
def init(me = %Me{delay: delay}), do: {:ok, me, delay}
@impl true
def handle_call(:stop, _from, me = %Me{}) do
{:stop, :normal, :ok, me}
end
@impl true
def handle_info(:timeout, me = %Me{}) do
%{send_to: send_to, task: task, delay: delay, max: max} = me
send(send_to, {self(), task})
cond do
max == :infinity ->
{:noreply, me, delay}
max > 1 ->
{:noreply, %Me{me | max: max - 1}, delay}
max <= 1 ->
{:stop, :normal, %Me{me | max: 0}}
end
end
@impl true
def code_change(_old_vsn, state, _extra), do: {:ok, state}
@impl true
def terminate(_reason, _state), do: :ok
end
E本との辻褄合わせがきつくなってきた。
Stash
はもうちょっと綺麗にしたい。
次章で正式なアプリケーションになるから、その後かな。