Elixirで監視状態においたWorkerプロセスがエラーになっても,
監視プロセスが検知してWorkerを再起動してくれるという振舞を手を動かしながら試してみましょう.
ElixirでTaskを使ってEchoServerを動かすという記事の一部を再構成して提供しています.
Workerを作る
まずはWorkerの例としてTCP/IPで接続を待ちうけるサーバーを作ります.
Erlangには gen_tcp というライブラリがあるのでこれを用いてTCP/IPを扱います.
:gen_tcp.xxx
となっている記述の部分で利用しています.
# echo_server-1.exs
defmodule EchoServer do
def accept(port) do
# オプションは以下のような意味である:
#
# 1. `:binary` - データを(リストのかわりに)バイナリとして受けとる
# 2. `packet: :line` - データを1行毎に受けとる
# 3. `active: false` - データがくるまで `:gen_tcp.recv/2` でブロックする
# 4. `reuseaddr: true` - リスナーがクラッシュしたら,アドレスを再利用してよいことにする
#
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false, reuseaddr: true])
IO.puts "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(socket) do
line = read_line(socket)
write_line(socket, line)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(socket, line) do
:gen_tcp.send(socket, line)
end
end
処理の流れの概要はこうなっています.
(今はよくわからなくてもOKです.記事を試すことはできるので,先に進めてからいずれ見直したときに処理内容がわかってもOKです)
1-1. EchoServer.accept/1
にポート番号を渡して呼び出します
1-2. :gen_tcp.listen/2
を呼び,リッスンソケットを取得します
_ 2-1. loop_acceptor/1
にリッスンソケットを渡して呼び出します
_ 2-2. :gen_tcp.accept/1
を呼び,サーバーソケットを取得します
__ 3-1. serve/1
にサーバーソケットを渡して呼び出します
__ 3-2. read_line/1
にサーバーソケットを渡して呼び,データを取得します
__ 3-3. write_line/2
にサーバーソケットとデータを渡して呼び出す(データを返却します)
__ 3-4. 3-1に戻る(serve/1
にサーバーソケットを渡して呼び出します)
_ 2-3. 2-1に戻る(loop_acceptor/1
にリッスンソケットを渡して呼び出します)
このファイルを echo_server-1.exs
として保存し,iex -r echo_server-1.exs
というコマンドで,ファイルを読みこんだ状態でREPLを起動します.
REPLで EchoServer.accept(29292)
と打つとEchoサーバーが起動します.
/var/tmp% iex -r echo_server-1.exs
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> EchoServer.accept(29292)
Accepting connections on port 29292
Accepting connections on port 29292
** (MatchError) no match of right hand side value: {:error, :closed}
echo_server-1.exs:30: EchoServer.read_line/1
echo_server-1.exs:23: EchoServer.serve/1
echo_server-1.exs:18: EchoServer.loop_acceptor/1
────────────────────────────────────────────────────────────────────────────────
/Users/niku% telnet localhost 29292
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
foo
foo
^]
telnet> quit
Connection closed.
/Users/niku% telnet localhost 29292
telnet localhost 29292
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
bar
別のターミナルを立ちあげて telnet localhost 29292
で 29292 ポートへ繋ぎ,任意の文字を打ち,リターンを押すと,打ち込んだものと同じ文字(foo
)が返ってきますよね.
エコーサーバーとしては動作しているようです.
しかし,quitして接続を切るとエコーサーバーでエラーになってしまいます.
この状況で再接続して telnet localhost 29292
で 29292 ポートへ繋ぎ,任意の文字を打ち,リターンを押しても,打ち込んだものと同じ文字(bar
)が返ってきません.
Workerがクラッシュしても,再び処理を行えるようにする
ErlangやElixirなどで利用しているErlangVM,その上に用意されているフレームワークOTPにはクラッシュしたものを再起動する仕組みがあります.
具体的にはSupervisorという監視プロセスが,Workerという処理プロセスを見張り,Workerがエラーで落ちた場合にSupervisorへ通知が行くため,SupervisorがWorkerを再起動させるというやり方です.
先ほどのEchoサーバーをWorkerとして,Supervisor経由で起動させて,コードを変えずにEchoサーバーが落ちても再起動できるようにしてみましょう.
REPLで EchoServer.accept(29292)
と打つかわりにいくつかのコードを準備し,最終的に Supervisor.start_link(children, opts)
を実行します.
/var/tmp% iex -r echo_server-1.exs
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import Supervisor.Spec
nil
iex(2)> children = [worker(Task, [EchoServer, :accept, [29292]])]
[{Task, {Task, :start_link, [EchoServer, :accept, [29292]]}, :permanent, 5000, :worker, [Task]}]
iex(3)> opts = [strategy: :one_for_one, name: EchoServer.Supervisor]
[strategy: :one_for_one, name: EchoServer.Supervisor]
iex(4)> Supervisor.start_link(children, opts)
Accepting connections on port 29292
{:ok, #PID<0.71.0>}
Accepting connections on port 29292
iex(5)>
17:27:31.164 [error] Task #PID<0.72.0> started from EchoServer.Supervisor terminating
** (MatchError) no match of right hand side value: {:error, :closed}
echo_server-1.exs:30: EchoServer.read_line/1
echo_server-1.exs:23: EchoServer.serve/1
echo_server-1.exs:18: EchoServer.loop_acceptor/1
(elixir) lib/task/supervised.ex:74: Task.Supervised.do_apply/2
(stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: &EchoServer.accept/1
Args: [29292]
────────────────────────────────────────────────────────────────────────────────
/Users/niku% telnet localhost 29292
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
foo
foo
^]
telnet> quit
Connection closed.
/Users/niku% telnet localhost 29292
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
bar
bar
別のターミナルを立ちあげて telnet localhost 29292
で 29292 ポートへ繋ぎ,任意の文字を打ち,リターンを押すと,打ち込んだものと同じ文字(foo
)が返ってきます.
quitして接続を切るとエコーサーバーでエラーになってしまいます.
ここまでは先ほどと同じですね.
この状況で再接続して telnet localhost 29292
で 29292 ポートへ繋ぎ,任意の文字を打ち,リターンを押すと,打ち込んだものと同じ文字(bar
)が帰ってきます.
成功です!
まとめ
- 最初に普通に動作するWorker(Echoサーバー)を作りました
- Workerがエラーで落ちてしまうと,処理を継続できなくなりました
- Workerを監視するSupervisor経由でWorkerを起動することで,Workerのコードを変えずに,Workerがエラーで落ちても,自動的に再起動して処理を継続できるようになりました
この話には続きがあるのですけど,それはまたの機会に.
冒頭にも書きましたが,気になる方はElixirでTaskを使ってEchoServerを動かすという記事をごらんください.