Help us understand the problem. What is going on with this article?

すごいE本 第18章 on Elixir (アプリケーション)

環境

sh
$ lsb_release -d
Description:    Ubuntu 18.04.2 LTS

$ elixir -v
Erlang/OTP 21 [erts-10.3.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Elixir 1.8.1 (compiled with Erlang/OTP 20)

概要

アプリケーションについて、プロセスプールを作ることで説明されるが、
この章ではまだ正式な OTP アプリケーションを作るわけではない。

E本の説明の流れが分かりづらいんだよね。
翻訳のせいじゃない気がする。流れがね。

18.1 プロセスプール

ppool.png

PoolvisorWorkervisor も起動しちゃっていいと思うんだけど、
Stash にそれをやらせようとしているのは、
スーパーバイザはシンプルであれということなのかな?

18.5 プールよ、走れ

PPool モジュールには以下の関数がある。

  • start_link/0: プロセスプール・アプリケーションを起動する。
  • stop/0:アプリケーションを無理矢理落とす。
  • start_pool(mod, fun, args, opts): mfa、最大起動数、プール名にてプールを起動する。
  • stop_pool(name): プールを停止する。
  • run(name, args): プールのワーカーに仕事をさせる。最大数に達していたら :no_alloc を返す。
  • async_queue(name, args)run/2 の非同期版。最大数に達していたらキューに貯め随時処理。
  • sync_queue(name, args)run/2 の同期版。最大数に達していたら処理開始まで戻ってこない。

以下に完成後の動作を示す。
Nagger ワーカーは GenServer で実装されていて、意味は「口うるさい奴」。
Nagger に与える引数は [通知先, 送信内容, 送信間隔(ミリ秒), 送信回数] である。

iex
iex(1)> PPool.start_link()
{:ok, #PID<0.135.0>}
iex(2)> PPool.start_pool(Nagger, :start_link, [], limit: 2, name: :nagger)
{:ok, #PID<0.137.0>}
iex(3)> PPool.run(:nagger, [self(), :one, 10_000, 3])
{:ok, #PID<0.141.0>}
iex(4)> PPool.run(:nagger, [self(), :two, 10_000, 3])
{:ok, #PID<0.143.0>}
iex(5)> PPool.run(:nagger, [self(), :three, 10_000, 3])
:no_alloc
Received down message. # 30秒経過して最初のワーカーが終了した
iex(6)> PPool.run(:nagger, [self(), :three, 10_000, 3])
{:ok, #PID<0.146.0>}
Received down message.
iex(7)> flush()
{#PID<0.141.0>, :one}
{#PID<0.143.0>, :two}
{#PID<0.141.0>, :one}
{#PID<0.143.0>, :two}
{#PID<0.141.0>, :one}
{#PID<0.143.0>, :two}
:ok
Received down message.
iex(8)> flush()
{#PID<0.146.0>, :three}
{#PID<0.146.0>, :three}
{#PID<0.146.0>, :three}
:ok
### 非同期 ###
iex(9)> PPool.async_queue(:nagger, [self(), :one, 20_000, 1])
:ok
iex(10)> PPool.async_queue(:nagger, [self(), :two, 20_000, 1])
:ok
iex(11)> PPool.async_queue(:nagger, [self(), :three, 20_000, 1])
:ok
Received down message. # 20秒経過、3つ目のワーカー開始
Received down message.
iex(12)> flush()
{#PID<0.150.0>, :one}
{#PID<0.152.0>, :two}
:ok
Received down message.
iex(13)> flush()
{#PID<0.155.0>, :three}
:ok
### 同期 ###
iex(14)> PPool.sync_queue(:nagger, [self(), :one, 20_000, 1])
{:ok, #PID<0.158.0>}
iex(15)> PPool.sync_queue(:nagger, [self(), :two, 20_000, 1])
{:ok, #PID<0.160.0>}
iex(16)> PPool.sync_queue(:nagger, [self(), :three, 20_000, 1])
# 待機・・・
Received down message. # 20秒経過、3つ目のワーカー開始
{:ok, #PID<0.162.0>}
Received down message.
iex(17)> flush()
{#PID<0.158.0>, :one}
{#PID<0.160.0>, :two}
:ok
Received down message.
iex(18)> flush()
{#PID<0.162.0>, :three}
:ok
### 終了 ###
iex(19)> PPool.stop_pool(:nagger)
:ok
iex(20)> PPool.stop()
** (EXIT from #PID<0.133.0>) shell process exited with reason: killed

18.2 スーパバイザを実装する

sh
$ mix new p_pool --module PPool
$ cd p_pool
$ mkdir lib/p_pool

--module PPool とすると lib/p_pool.ex が作られるので、
それに合わせて new p_pool としている。

3つのスーパーバイザ(Server, Poolvisor, Workervisor)から作る。

p_pool/lib/p_pool/supervisor.ex
defmodule PPool.Supervisor do
  use Supervisor
  alias __MODULE__, as: Me

  def start_link, do: Supervisor.start_link(Me, [], name: Me)

  def stop do
    pid = Process.whereis(Me)

    if is_pid(pid) do
      Process.exit(pid, :kill)
    else
      :ok
    end
  end

  def start_pool(mod, fun, args, opts \\ []) do
    default = [limit: 2, name: mod]
    opts = Keyword.merge(default, opts)
    limit = Keyword.get(opts, :limit)
    name = Keyword.get(opts, :name)
    mfa = {mod, fun, args}

    overrides = [
      id: name,
      start: {PPool.Poolvisor, :start_link, [mfa, limit, name]}
    ]

    child = Supervisor.child_spec(PPool.Poolvisor, overrides)
    Supervisor.start_child(Me, child)
  end

  def stop_pool(name) do
    Supervisor.terminate_child(Me, name)
    Supervisor.delete_child(Me, name)
  end

  @impl true
  def init([]) do
    opts = [
      strategy: :one_for_one,
      max_restarts: 6,
      max_seconds: 3_600
    ]

    Supervisor.init([], opts)
  end
end

SupervisorPoolvisor を起動する。

p_pool/lib/p_pool/poolvisor.ex
defmodule PPool.Poolvisor do
  use Supervisor, shutdown: 10_500
  alias __MODULE__, as: Me

  def start_link([mfa, limit, name]) do
    start_link(mfa, limit, name)
  end

  def start_link(mfa, limit, name) do
    Supervisor.start_link(Me, {mfa, limit, name})
  end

  @impl true
  def init({mfa, limit, name}) do
    children = [
      {PPool.Stash, [self(), mfa, limit, name]}
    ]

    opts = [
      strategy: :one_for_all,
      max_restarts: 1,
      max_seconds: 3_600
    ]

    Supervisor.init(children, opts)
  end
end

Poolvisor によって起動された StashWorkervisor を起動する。

p_pool/lib/p_pool/workervisor.ex
defmodule PPool.Workervisor do
  # Stash が起動を担当しているので :temporary にしている
  use DynamicSupervisor, restart: :temporary, shutdown: 10_000
  alias __MODULE__, as: Me

  def start_link([]), do: DynamicSupervisor.start_link(Me, [])

  def start_child(pid, mfa = {_, _, _}) do
    child = %{
      id: :p_pool_worker,
      start: mfa,
      restart: :temporary
    }

    DynamicSupervisor.start_child(pid, child)
  end

  @impl true
  def init([]) do
    [
      strategy: :one_for_one,
      max_restarts: 5,
      max_seconds: 3_600
    ]
    |> DynamicSupervisor.init()
  end
end

18.3 ワーカに取り組む

ユーティリティ関数は、第一引数に構造体を取ることによって
Stash クラスのメソッド」として見ることができる。

p_pool/lib/p_pool/stash.ex
defmodule PPool.Stash do
  use GenServer
  alias __MODULE__, as: Me

  defstruct [:sup, :mfa, :limit, :refs, :queue]

  defp new(mfa = {_, _, _}, limit) do
    refs = MapSet.new()
    queue = :queue.new()
    %Me{sup: nil, mfa: mfa, limit: limit, refs: refs, queue: queue}
  end

  defp start_worker(me = %Me{sup: sup, mfa: mfa}, args) do
    {:ok, pid} = start_worker(sup, mfa, args)
    refs = MapSet.put(me.refs, Process.monitor(pid))
    new_me = %Me{me | limit: me.limit - 1, refs: refs}
    {:ok, pid, new_me}
  end

  defp start_worker(sup, {m, f, a}, args) do
    PPool.Workervisor.start_child(sup, {m, f, a ++ args})
  end

  # E本の handle_down_worker/2 に相当
  defp update_worker(me = %Me{sup: sup, mfa: mfa}, ref) do
    if MapSet.member?(me.refs, ref) do
      case :queue.out(me.queue) do
        {{:value, {from, args}}, queue} ->
          {:ok, pid} = start_worker(sup, mfa, args)
          refs = update_refs(me.refs, ref, pid)
          GenServer.reply(from, {:ok, pid})
          %Me{me | refs: refs, queue: queue}

        {{:value, args}, queue} ->
          {:ok, pid} = start_worker(sup, mfa, args)
          refs = update_refs(me.refs, ref, pid)
          %Me{me | refs: refs, queue: queue}

        {:empty, _queue} ->
          refs = MapSet.delete(me.refs, ref)
          %Me{me | limit: me.limit + 1, refs: refs}
      end
    else
      me
    end
  end

  defp update_refs(refs = %MapSet{}, ref, pid) do
    refs
    |> MapSet.put(Process.monitor(pid))
    |> MapSet.delete(ref)
  end

  def start_link([sup, mfa, limit, name]) do
    start_link(sup, mfa, limit, name)
  end

  def start_link(sup, mfa, limit, name) do
    GenServer.start_link(Me, {sup, mfa, limit}, name: name)
  end

  def run(name, args) do
    GenServer.call(name, {:run, args})
  end

  def sync_queue(name, args) do
    GenServer.call(name, {:sync, args}, :infinity)
  end

  def async_queue(name, args) do
    GenServer.cast(name, {:async, args})
  end

  def stop(name) do
    GenServer.call(name, :stop)
  end

  @impl true
  def init({sup, mfa, limit}) do
    # Workervisor を起動する前に init/1 を抜けておかないと
    # 親の Poolvisor との間でデットロックが起きてしまう。
    send(self(), {:start_worker_supervisor, sup})
    {:ok, new(mfa, limit)}
  end

  @impl true
  def handle_info({:start_worker_supervisor, sup}, me = %Me{}) do
    {:ok, pid} = Supervisor.start_child(sup, PPool.Workervisor)
    # :one_for_all で監視されてるのに、このリンクは必要あるのかな?
    # Process.link(pid)
    {:noreply, %Me{me | sup: pid}}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, me = %Me{}) do
    IO.puts("Received down message.")
    {:noreply, update_worker(me, ref)}
  end

  @impl true
  def handle_info(msg, me = %Me{}) do
    IO.puts("Unknown message: #{inspect(msg)}")
    {:noreply, me}
  end

  @impl true
  def handle_call({:run, args}, _from, me = %Me{limit: limit}) do
    if limit > 0 do
      {:ok, pid, new_me} = start_worker(me, args)
      {:reply, {:ok, pid}, new_me}
    else
      {:reply, :no_alloc, me}
    end
  end

  @impl true
  def handle_call({:sync, args}, from, me = %Me{limit: limit}) do
    if limit > 0 do
      {:ok, pid, new_me} = start_worker(me, args)
      {:reply, {:ok, pid}, new_me}
    else
      queue = :queue.in({from, args}, me.queue)
      {:noreply, %Me{me | queue: queue}}
    end
  end

  @impl true
  def handle_call(:stop, _from, me = %Me{}) do
    {:stop, :normal, :ok, me}
  end

  @impl true
  def handle_cast({:async, args}, me = %Me{limit: limit}) do
    if limit > 0 do
      {:ok, _pid, new_me} = start_worker(me, args)
      {:noreply, new_me}
    else
      queue = :queue.in(args, me.queue)
      {:noreply, %Me{me | queue: queue}}
    end
  end

  @impl true
  def code_change(_old_vsn, state, _extra), do: {:ok, state}

  @impl true
  def terminate(_reason, _state), do: :ok
end

Stash のコードがなんか汚い。取り急ぎということでご了承ください。

最後にこのアプリケーションのインターフェースを整える。

p_pool/lib/p_pool.ex
defmodule PPool do
  alias __MODULE__, as: Me

  def start_link do
    Me.Supervisor.start_link()
  end

  def stop do
    Me.Supervisor.stop()
  end

  def start_pool(mod, fun, args, opts \\ [])
      when is_atom(mod) and is_atom(fun) and is_list(args) and is_list(opts) do
    Me.Supervisor.start_pool(mod, fun, args, opts)
  end

  def stop_pool(name) when is_atom(name) do
    Me.Supervisor.stop_pool(name)
  end

  def run(name, args) when is_atom(name) and is_list(args) do
    Me.Stash.run(name, args)
  end

  def sync_queue(name, args) when is_atom(name) and is_list(args) do
    Me.Stash.sync_queue(name, args)
  end

  def async_queue(name, args) when is_atom(name) and is_list(args) do
    Me.Stash.async_queue(name, args)
  end
end

18.4 ワーカを書く

p_pool/lib/nagger.ex
defmodule Nagger do
  use GenServer
  alias __MODULE__, as: Me

  defstruct [:send_to, :task, :delay, :max]

  def start_link(send_to, task, delay, max) do
    me = %Me{send_to: send_to, task: task, delay: delay, max: max}
    GenServer.start_link(Me, me)
  end

  def stop(pid), do: GenServer.call(pid, :stop)

  @impl true
  def init(me = %Me{delay: delay}), do: {:ok, me, delay}

  @impl true
  def handle_call(:stop, _from, me = %Me{}) do
    {:stop, :normal, :ok, me}
  end

  @impl true
  def handle_info(:timeout, me = %Me{}) do
    %{send_to: send_to, task: task, delay: delay, max: max} = me
    send(send_to, {self(), task})

    cond do
      max == :infinity ->
        {:noreply, me, delay}

      max > 1 ->
        {:noreply, %Me{me | max: max - 1}, delay}

      max <= 1 ->
        {:stop, :normal, %Me{me | max: 0}}
    end
  end


  @impl true
  def code_change(_old_vsn, state, _extra), do: {:ok, state}

  @impl true
  def terminate(_reason, _state), do: :ok
end

E本との辻褄合わせがきつくなってきた。
Stash はもうちょっと綺麗にしたい。
次章で正式なアプリケーションになるから、その後かな。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした