この文章はFabio Akitaさんの2015年11月25日付のブログ記事ExMessenger Exercise: Understanding Nodes in Elixirの翻訳です。
Fabio Akitaさんはブラジル在住で、ブログの名前からも分かる通り熱狂的なRubyistです。Ruby/Railsの世界ではかなり知られた方のようですが、最近はElixirにハマっているようです。
Elixirの作者のJose ValimがRailsのコミッタであったこととPhoenixというRails風味のWebフレームワークがあることもあってか、RubyistがElixirを使い始めるケースが増えてきているように思います。
Elixirはノード間で通信するための便利な機能を提供していて、この記事はそのチュートリアルです。
翻訳をすると流し読みをしがちなところを、精読する必要があるので、自分自身の理解を深めるのに役に立っています。今後も面白そうな記事があれば取り組んでみたいと思います。
誤訳や関連記事などがありましたらコメント欄にお願いいたします。
私はDrew Kerriganによる2014年の記事を読んでワクワクしていた。ここで彼はコマンドラインベースのクライアントがサーバにメッセージを送信するチャットアプリケーションを作った。
ここではElixir1.0以前のバージョンを使用し、練習のために書かれたコードなので、オリジナルのコードを修正してElixir Umbrellaプロジェクトにマージした。コードはGithubで見ることができる。
もしお互いに協調しあって動作し、依存性を共有するような複数のアプリケーションを作るのであれば、アンブレラプロジェクトの規約に則ることで全てのコードをひとまとめにすることができる。mix compileコマンドをアンブレラプロジェクトのルートで実行すれば、全てのアプリケーションを一度にコンパイルしてくれる。これは関連するアプリケーションを異なるリポジトリに入れずに一つにするための方法である。
Node 101
この練習用のリポジトリをチェックアウトする前に、もう一つのコンセプトを明らかにしようと思う。以前の記事でどのようにしてプロセスを起動し、メッセージを交換するかについて、そしてどのようにOTP GenServerやSupervisorを使ってより堅牢で耐障害性の高いプロセスを作るかについて説明した。
しかしこれはただの物語の除幕でしかない。もしかしたらErlangは分散処理に優れていることを耳にしたことがあるかもしれない。Erlang VM(BEAM)はネットワーク内での相互通信をも可能にする。
もう一度言うが、私はまだこれについて学び始めたばかりであるので、Elixirのウェブサイトにある分散タスクと設定を一読することをお勧めする。
2つのiexセッションから始めてみよう。一方のターミナルから以下のように打ち込んでみよう。
iex --sname Fabio --cookie chat
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]
Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(fabio@Hal9000u)1>
そしてもう一方からは以下のように打ち込んでみよう。
iex --sname akita --cookie chat
Erlang/OTP 18 [erts-7.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]
Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(akita@Hal9000u)1>
"fabio@Hal9000u"と"akita@Hal9000u"でそれぞれのインスタンスのノード名が違うのにお気づきだろうか?これはsnameにマシン名がくっついたものだ。一方のインスタンスからもう一方にpingを送ることができる。例えば、
iex(akita@Hal9000u)2> Node.ping(:"fabio@Hal9000u")
:pong
もしノード名が正しく、もう一方のインスタンスがちゃんと立ち上がって入れば、pingに対して:pongで返す。ではもしリモートノードに接続したい場合はどうすればいいだろうか?
iex(akita@Hal9000u)3> Node.ping(:"fabio@192.168.1.13")
11:02:46.152 [error] ** System NOT running to use fully qualified hostnames **
** Hostname 192.168.1.13 is illegal **
--snameオプションは名前だけを設定し、それは同一サブネット内のみで有効なので、完全修飾名を使いたい場合は--nameオプションを使うといい。例えばこのような感じである。
iex --name fabio@192.168.1.13 --cookie chat
そしてこちらはもう一方のノード。
iex --name akita@192.168.1.13 --cookie chat
もしかしたら--cookieとは何のためかと考えているかもしれない。3つ目のターミナルを以下のようにcookieオプションなし、異なるクライアント名で立ち上げてみよう。
iex --name john@192.168.1.13
そして最初に立ち上げたノードの一方にpingを送ってみると:pongが返ってこない。
iex(john@192.168.1.13)1> Node.ping(:"fabio@192.168.1.13")
:pang
クッキーはノード間の関係を結びつけるためのただのアトムである。いくつかあるサーバの中から、関連のないアプリケーションとは通信できないようにできる。その結果として:pangを受信する。IPアドレスの代わりに完全修飾名を使うことができる。
akita@やfabio@といったノードを持つだけでお互いに認識することができる。
iex(fabio@192.168.1.13)2> Node.list
[:"akita@192.168.1.13"]
そしてこちらはもう一方のノード
iex(akita@192.168.1.13)2> Node.list
[:"fabio@192.168.1.13"]
もしどちらか一方のノードがクラッシュしたら、ノードリストは自動的に更新され生きているノードに反映される。
もしもっと詳しく知りたいようであれば公式APIリファレンスのNodeをチェックするといいだろう。これは次の節を読む助けになると思う。
チャットクライアントを作る
では本題に戻ろう。ExMessengerサーバはGenServerの
ExMessenger.Serverとそれを起動するExMessenger.Supervisorを持つ。ExMessenger.ServerはExMessenger.Supervisorによって起動され、監督される。それは:message_serverとしてグローバルに登録される。
ExMessengerClientは監督されないExMessengerClient.MessageHandlerを起動する。それもまたGenServerで:message_handlerとしてグローバルに登録される。
2つのアプリケーションの構造は大まかに言うと以下のようになる。
ExMessenger
- ExMessenger.Supervisor
+ ExMessenger.Server
ExMessengerClient
- ExMessengerClient.MessageHandler
それぞれは別個に起動される。最初はメッセージサーバから。
cd apps/ex_messenger
iex --sname server --cookie chocolate-chip -S mix run
この例ではローカルサブネット向けにserverという単純な名前でまず始めることにする。server@Hal9000uという名前が返る(ちなみにHal9000uは私のマシン名である。)
そしてクライアントアプリケーションを起動する。
cd apps/ex_messenger_client
server=server@Hal9000u nick=john elixir --sname client -S mix run
ここでは2種類の環境変数を設定している。これらの変数はプログラム内でSystem.get_env/1関数を使うことで取得できる。またローカルのノード名としてclientを設定している。異なるsnameとnickを設定することで別のターミナルから好きなだけクライアントノードを起動することもできる。そしてそれらのノードは全て同一のserver@Hal9000uメッセージサーバにひも付けられている。
ExMangaDownloadrでお見せしたようなコマンドラインベースのescriptを使う代わりにこのような形で起動したのは、--snameや--nameをプログラム内で設定する方法(--cookieに対応するNode.set_cookieにような)が見つからなかったからである。
「接続する」という代わりに「ひも付ける」と表現したのにお気づきだろうか。ExMessengerClientは以下のように始まる。
defmodule ExMessengerClient do
use Application
alias ExMessengerClient.CLI
alias ExMessengerClient.ServerProcotol
def start(_type, _args) do
get_env
|> connect
|> start_message_handler
|> join_chatroom
|> CLI.input_loop
end
...
end
プライベート関数のget_envはコマンドライン引数として渡した値を扱うためのただのラッパーである。
defp get_env do
server = System.get_env("server")
|> String.rstrip
|> String.to_atom
nick = System.get_env("nick")
|> String.rstrip
{server, nick}
end
次にリモートサーバに接続するための関数である。
defp connect({server, nick}) do
IO.puts "Connecting to #{server} from #{Node.self} ..."
Node.set_cookie(Node.self, :"chocolate-chip")
case Node.connect(server) do
true -> :ok
reason ->
IO.puts "Could not connect to server, reason: #{reason}"
System.halt(0)
end
{server, nick}
end
大事なポイントはクッキーをNode.set_cookie/1関数を使って設定している部分である。サーバインスタンスで行ったようにコマンドライン引数として渡していないことにお気づきだろうか。前節でご説明したように、クッキーを設定せずに次の行のNode.connect(server)を実行すると接続に失敗するだろう。
そして、GenServerであるExMessengerClient.MessageHandlerをメッセージサーバインスタンスとひも付ける。
defp start_message_handler({server, nick}) do
ExMessengerClient.MessageHandler.start_link(server)
IO.puts "Connected"
{server, nick}
end
メッセージハンドラのGenServerの作りはいたって単純で、サーバを登録し、そこから受け取ったメッセージをコンソール上に表示するだけのものである。
defmodule ExMessengerClient.MessageHandler do
use GenServer
def start_link(server) do
:gen_server.start_link({ :local, :message_handler }, __MODULE__, server, [])
end
def init(server) do
{ :ok, server }
end
def handle_cast({ :message, nick, message }, server) do
message = message |> String.rstrip
IO.puts "\n#{server}> #{nick}: #{message}"
IO.write "#{Node.self}> "
{:noreply, server}
end
end
ExMessengerClientモジュールに戻ろう。メッセージを受け取るための(監督されない)GenServerを起動した後は、サーバ上にある擬似チャットルームに参加する処理に進む。
defp join_chatroom({server, nick}) do
case ServerProcotol.connect({server, nick}) do
{:ok, users} ->
IO.puts "* Joined the chatroom *"
IO.puts "* Users in the room: #{users} *"
IO.puts "* Type /help for options *"
reason ->
IO.puts "Could not join chatroom, reason: #{reason}"
System.halt(0)
end
{server, nick}
end
ServerProcotolモジュールはGenServer.call/3やGenServer.cast/2を呼ぶ際に利便性を高めるためのラッパーで、それはリモートのGenServerである:message_serverにメッセージを送信するためのものである。
defmodule ExMessengerClient.ServerProcotol do
def connect({server, nick}) do
server |> call({:connect, nick})
end
def disconnect({server, nick}) do
server |> call({:disconnect, nick})
end
def list_users({server, nick}) do
server |> cast({:list_users, nick})
end
def private_message({server, nick}, to, message) do
server |> cast({:private_message, nick, to, message})
end
def say({server, nick}, message) do
server |> cast({:say, nick, message})
end
defp call(server, args) do
GenServer.call({:message_server, server}, args)
end
defp cast(server, args) do
GenServer.cast({:message_server, server}, args)
end
end
そのまんまだね。それからExMessengerClientはCLIモジュールにある再帰的なinput_loop/1を呼ぶ。それはただユーザの入力を受け取ってパターンマッチングで適切なコマンドを実行するだけである。
defmodule ExMessengerClient.CLI do
alias ExMessengerClient.ServerProcotol
def input_loop({server, nick}) do
IO.write "#{Node.self}> "
line = IO.read(:line)
|> String.rstrip
handle_command line, {server, nick}
input_loop {server, nick}
end
def handle_command("/help", _args) do
IO.puts """
Available commands:
/leave
/join
/users
/pm <to nick> <message>
or just type a message to send
"""
end
def handle_command("/leave", args) do
ServerProcotol.disconnect(args)
IO.puts "You have exited the chatroom, you can rejoin with /join or quit with /quit"
end
def handle_command("/quit", args) do
ServerProcotol.disconnect(args)
System.halt(0)
end
def handle_command("/join", args) do
ServerProcotol.connect(args)
IO.puts "Joined the chatroom"
end
def handle_command("/users", args) do
ServerProcotol.list_users(args)
end
def handle_command("", _args), do: :ok
def handle_command(nil, _args), do: :ok
def handle_command(message, args) do
if String.contains?(message, "/pm") do
{to, message} = parse_private_recipient(message)
ServerProcotol.private_message(args, to, message)
else
ServerProcotol.say(args, message)
end
end
defp parse_private_recipient(message) do
[to|message] = message
|> String.slice(4..-1)
|> String.split
message = message
|> List.foldl("", fn(x, acc) -> "#{acc} #{x}" end)
|> String.lstrip
{to, message}
end
end
クライアント側は以上である。
チャットサーバを作る
チャットクライアントはメッセージをリモートの{:message_server, server}にメッセージを送信する。サーバのsnameはserver@Hal9000uというアトムである。
:message_serverとはExMessenger.ServerというGenServerである。
defmodule ExMessenger.Server do
use GenServer
require Logger
def start_link([]) do
:gen_server.start_link({ :local, :message_server }, __MODULE__, [], [])
end
def init([]) do
{ :ok, HashDict.new }
end
...
end
ExMessenger.SupervisorがこのGenServerを起動する時、スーパーバイザーがこれをこのインスタンス内で:message_serverとしてグローバルに登録する。これがクライアントと呼ばれるExMessengerClientアプリケーションからメッセージを届けるための方法である。
ExMessengerClientがServerProtocol.connect/1を呼ぶと、{:connect, nick}というメッセージがサーバに送信される。サーバでは以下のように処理される。
def handle_call({ :connect, nick }, {from, _} , users) do
cond do
nick == :server or nick == "server" ->
{:reply, :nick_not_allowed, users}
HashDict.has_key?(users, nick) ->
{:reply, :nick_in_use, users}
true ->
new_users = users |> HashDict.put(nick, node(from))
user_list = log(new_users, nick, "has joined")
{:reply, { :ok, user_list }, new_users}
end
end
まず初めにnickの値がserverであるかどうかをチェックし、もしそうであれば許可しないようにする。二つ目に、渡されたニックネームが内部のHashDict(keyとvalueを持つ辞書型のデータ構造)に既に存在するかどうかをチェックし、もしあれば拒否する。三つ目にニックネームとclient@Hal9000uのようなノード名のペアをHashDictに保存し、プライベートなlog/3関数を通じてHashDictに保存されている他の全てのノードに通知する。
log/3関数は全てのニックネームを結合したログメッセージを生成し、ログ出力するだけの関数である。そしてHashDictに登録されている全てのクライアントが持つメッセージハンドラーへ向けて通知する。
defp log(users, nick, message) do
user_list = users |> HashDict.keys |> Enum.join(":")
Logger.debug("#{nick} #{message}, user_list: #{user_list}")
say(nick, message)
user_list
end
def say(nick, message) do
GenServer.cast(:message_server, { :say, nick, "* #{nick} #{message} *" })
end
def handle_cast({ :say, nick, message }, users) do
ears = HashDict.delete(users, nick)
Logger.debug("#{nick} said #{message}")
broadcast(ears, nick, message)
{:noreply, users}
end
これまでのところ、{:say, nick, message}といったタプル型のデータを自分自身に対して渡しているだけである。このデータは以下のようにGenServerのbroadcast/3関数によって処理される。
defp broadcast(users, nick, message) do
Enum.map(users, fn {_, node} ->
Task.async(fn ->
send_message_to_client(node, nick, message)
end)
end)
|> Enum.map(&Task.await/1)
end
defp send_message_to_client(client_node, nick, message) do
GenServer.cast({ :message_handler, client_node }, { :message, nick, message })
end
map関数によってusersの要素一つ一つに対して非同期のElixirのTask(これ自体もまた以前にExManga Downloaderの記事でご紹介したようにGenServerである)を実行する。全体に対して一斉送信するので、並列処理をするのが妥当であろう。
大事な点としてはsend_message_to_client/3は{ :message_handler, client_node }のようなタプルを処理するが、client_nodeにはクライアントを起動した時に--snameで設定したclient@Hal9000uのような値が設定されている。
これがクライアントがGenServerとメッセージを送受信する方法である。
これは伝統的なTCPクライアント/サーバの例ではない
ExMessenger.Serverをチャットサーバと呼び、ExMessengerClientをチャットクライアントと呼んでいるが、それらはいわゆるTCPサーバやTCPクライアントを意味しない。
ExMessenger.Serverはまさにサーバ(OTP GenServer)であるが、ExMessengerClient.MessageHandlerもまたサーバ(OTP GenServer)なのである。というのも両方ともノードとして振る舞うので、それらはクライアントとサーバの関係というよりはむしろpeer-to-peerの関係である。クライアントの振る舞い(サーバがクライアントへメッセージを送信)とサーバの振る舞い(サーバがクライアントからメッセージを受信)とがある。
この言語に標準で組み込まれている、成熟した、使いやすいpeer-to-peerのネットワーク分散モデルというコンセプトをしばらく掘り下げてみよう。単独のノードを選ばれし唯一のノードとして持つ必要はない。輪の中にあって強調し合うことで単一障害点を取り除くことができる、全てのノードを持つことができる。
これはejabberdやRabbitMQのようなErlangで作られてサービスの特徴だと思う。
ejabberdの場合、Mnesia(Erlang OTPに標準で付属する高機能な分散NoSQLデータベース)のテーブルにあるクラスターの状態を保持し、分散するノードを協調させるためにノードの機能を使う。
...
join(Node) ->
case {node(), net_adm:ping(Node)} of
{Node, _} ->
{error, {not_master, Node}};
{_, pong} ->
application:stop(ejabberd),
application:stop(mnesia),
mnesia:delete_schema([node()]),
application:start(mnesia),
mnesia:change_config(extra_db_nodes, [Node]),
mnesia:change_table_copy_type(schema, node(), disc_copies),
spawn(fun() ->
lists:foreach(fun(Table) ->
Type = call(Node, mnesia, table_info, [Table, storage_type]),
mnesia:add_table_copy(Table, node(), Type)
end, mnesia:system_info(tables)--[schema])
end),
application:start(ejabberd);
_ ->
{error, {no_ping, Node}}
end.
ところで、これはErlangのコードの断片である。既にElixirに関する十分な知識があると思われるので、Erlangの醜いシンタックスを取り去、パターンマッチングの{_, :pong}のタプルに注目してみよう。Nodeのping機能を使ってノード間の接続をチェックし、Mnesiaのテーブルをアップデートしたりしている。
また、RabbitMQのソースにも同様の記述がある。
become(BecomeNode) ->
error_logger:tty(false),
ok = net_kernel:stop(),
case net_adm:ping(BecomeNode) of
pong -> exit({node_running, BecomeNode});
pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]),
{ok, _} = rabbit_cli:start_distribution(BecomeNode),
io:format(" done~n", []),
Dir = mnesia:system_info(directory),
io:format(" * Mnesia directory : ~s~n", [Dir])
end.
再びMnesiaを使ったノード間の通信である。Erlangのシンタックスは多くの人にとって馴染みがない。例えば変数は大文字で始まる(一般的には定数と考えられている)、文がドット(.)で終わる、モジュールの関数呼び出しがドットではなくコロン(:)である、Elixirと違い丸括弧()が必須である、等々。Erlangの秘められた力を解き放つためにコードを読んでみよう。
ここまででOTPフレームワークの中でどのようにして内部プロセスが生成され、協調させ、リモートのノードとpeer-to-peerで通信するかがわかったと思う。もう一度言うが、これは全てElixirの標準機能である、他の言語が持ち合わせていない。