ElixirでWebsocketをやろうと思うとまずとりあえずPhoenixが候補として浮かんでくるくらいにはPhoenix Channelは便利だ。
優れた抽象化、明快なインターフェースで直感的にWebsocketアプリケーションを実装出来る。またPhoenix.PubSubで簡単にノードをまたがったメッセージのbroadcastなども実現出来る。
だが本当に必要なものがシンプルなWebsocketだけで、トピックの分割も必要なく接続してきたクライアントに対して返答したいだけなら実はCowboyのみでも実装出来る。実際Phoenix ChannelはCowboyのビルトインのWebsocketサーバをうまく抽象化したものだ。
よく誤解されがちだがPhoenix自体コードベースはかなりマイクロで正直Websocketのためだけに使ってもよいぐらいなのだけど、今回はより低レイヤーでWebsocketを扱いたいのと、極力依存を減らしたいのでCowboyのみで実装してみる。
Cowboy
- Erlang製のHTTPサーバ
- Cowboy 2.0よりHTTP2などに対応 (Phoenixは1.4でCowboy 2.0に対応)
- 1.0とインターフェースが大きく変わっているので1系に関するドキュメントが役に立たない場合もあるので注意
- なお今回は2系で実装する
Installation
- プロジェクト作成
$ mix new my_app
$ cd my_app
- 依存追加
{:cowboy, "~> 2.0"}
$ mix deps.get
Implementation
Dispatch
Cowboyサーバを起動するための処理を記述する
defmodule MyApp.Server do
use Application
require Logger
def start(_type, _args) do
dispatch =
:cowboy_router.compile([
{:_,
[
{'/websocket', MyApp.WebSocketHandler, []}
]}
])
Logger.info("Started listening on port 5984...")
:cowboy.start_clear(:my_http_listener, [{:port, 5984}], %{env: %{dispatch: dispatch}})
end
def stop(_state) do
:ok
end
end
サーバをSuperviseするApplicationを定義
defmodule MyApp.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(MyApp.Server, []),
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
mix.exsのapplication/0
でmodキーワードを指定しserverを起動
def application do
[
mod: {MyApp.Application, {}},
extra_applications: [:logger]
]
end
Cowboy websocket behaviour
DispatchされたWebsocketリクエストを実際に処理するハンドラを定義する
defmodule MyApp.WebSocketHandler do
@behaviour :cowboy_websocket_handler
require Logger
def init(req, state) do
opts = %{idle_timeout: 60000}
{:cowboy_websocket, req, state, opts}
end
def websocket_init(state) do
Logger.info("started connection.")
{:ok, state}
end
def websocket_handle({:text, message}, state) do
{:reply, {:text, message}, state}
end
def websocket_handle(_data, state) do
{:ok, state}
end
def websocket_info(:foo, state) do
{:ok, state}
end
def terminate(_reason, _req, _state) do
Logger.info("connection terminated")
:ok
end
end
解説
-
init/2
はcowboyのハンドラ全てでリクエストを受け取ったときに呼び出されるcallbackで、ここでタプルで最初の要素に:cowboy_websocket
を返すとWebsocketへUpgradeされる -
websocket_init/1
はWebsocketへUpgrade後に最初に呼び出されるcallback。optionなので実装しないでもいいがWebsocket接続確立後に何か初期化したい場合はここに実装する -
websocket_handle/2
callbackはwebsocket frameが到着すると呼び出される。frameの種類は:text
,:binary
,:ping
,:pong
がある。{:reply, {:text, message}, state}
で{:text, message}
に当たる部分がクライアントへ返されるフレーム -
websocket_info/2
はErlangメッセージがコネクションプロセスに到達し、該当するcallbackがある場合に呼び出される。例えば、上記の場合:foo
というメッセージでコネクションのプロセスにsendするとwebsocket_info(:foo, state)
が呼び出される。
def websocket_init(state) do
Logger.info("started connection.")
# websocket_init/1内でのself()がコネクションのpidになる
send(self(), :foo)
{:ok, state}
end
-
terminate/3
は接続がなんらかの理由で切断された場合呼び出されるcallback。reason
に切断された理由が入り:remote
はクライアント起因で接続を閉じた場合。:timeout
は一定期間クライアントから何もデータが到達しない場合。タイムアウトを変えたい場合はinit/2
のoptsで:idle_timeout
を指定する。デフォルトは60秒でタイムアウトなのでクライアントからのハートビートは60秒以内にしよう。
基本的な実装は以上になる。
動作確認するにはブラウザに既にWebsocketクライアントが組み込まれているのでブラウザのコンソールから接続するのが手っ取り早い。
Websocketサーバをiex経由で起動し
➜ iex -S mix
Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [hipe]
06:12:05.459 [info] Started listening on port 5984...
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
ChromeのDevToolを開き、Consoleからサーバへ接続する
var ws = new WebSocket("ws://localhost:5984/websocket")
ws.send("foo")
DevToolのWSタブを開くと実際にFrameが送受信されているのが分かる
あとはメッセージ受信用のハンドラを登録してJSでよしなにしよう
ws.onmessage = function(event){
console.log(event.data)
}
broadcast
基本的なサーバ-クライアントでの1対1のリクエスト-レスポンスモデルの処理だけなら上記だけで十分だが(実際はクライアントでハートビートなどしてコネクションを維持したりする必要はあるが)、Websocketを実用的に使うとなるとサーバに接続している全てのクライアントにbroadcastしたり、broadcast範囲の分割(トピック)、サーバがスケールして複数台になった時に全てのサーバに同報するなどの処理が必要になってくる。
複数台のサーバに同報するようなレベルのアプリケーションはその時点でかなり高度なアプリケーションであることが予想されるので素直にPhoenix.PubSubなどのPubSubフレームワークを使った方がいいが、今回はそこまで必要ないよ単体のサーバで十分だよという方向けに単体のサーバのみでbroadcastを実現してみる。(トピックの分割もなし)
まず必要になるのは接続しているクライアントのプロセスの管理。
基本的にCowboyでは1コネクションごとにプロセスが割り当てられるので、そのプロセスに対してメッセージを送信することでbroadcastを実現する。
まずプロセスの登録にはgprocやRegistry, GenServer, Agentなどが使えてどれでも実装出来るといえば出来るが、今回はElixirビルトインのRegistryで実装してみる。
Registryを使うためにSupervisor監視下にRegistryを加える
defmodule MyApp.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(MyApp.Server, []),
# 追加
supervisor(Registry, [[keys: :duplicate, name: :client_registry]])
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
websocket_init/1
でクライアントをRegistryに登録する
def websocket_init(state) do
Logger.info("Socket starting")
# register/3 すると対象のkeyに {self(), 第三引数} が登録される
Registry.register(:client_registry, :clients, [])
{:ok, state}
end
あとは任意のタイミングでRegistryをlookupして登録してあるプロセスに対してErlangメッセージをsendすることによって現在接続している全てのクライアントに同時にメッセージを送ることが出来る。
# messageをbroadcast
def broadcast(message) do
Registry.dispatch(:client_registry, :clients, fn entries ->
for {pid, _} <- entries do
send(pid, {:broadcast, message})
end
end)
end
def websocket_info({:broadcast, message}, state) do
{:reply, {:text, message}, state}
end
またwebsocket_init/1
ではwebsocketプロセスの登録以外にもタイマーを設定したりなどにも使える。
タイマー実装例
def websocket_init(state) do
Logger.info("Socket starting")
Registry.register(:client_registry, :clients, [])
# 1秒後に自分に対してsend
Process.send_after(self(), {:tick, self()}, 1000)
{:ok, state}
end
def websocket_info({:tick, pid}, state) do
# 再度1秒後にsend. 以降1秒毎に`websocket_info/2`がcallされ1秒ごとにtickメッセージがクライアントに届く
Process.send_after(pid, {:tick, pid}, 1000)
{:reply, {:text, "tick"}, state}
end
1秒ごとにメッセージが届いているのが分かる
この後はトピックへのSubscribeなどでbroadcast範囲の分割をすることで必要ないメッセージをブロードキャストしないでいいようにするなどの最適化をすることなども考えられるが、これも同様にRegistryを使って特定のkey以下にプロセスを登録しそのkeyのentryにbroadcastすることで実現出来る。
SSE (Server-Sent Events)
基本的には上記のようなプロセスの管理によってサーバ契機のイベントも同様に処理出来る。
まとめ
CowboyでWebsocketサーバを実装した。
実際のコード量はこのページにあるもので分かる通りかなり短いコード量で実装出来ることが分かる。
認証や特定のトピックのSubscribe、スケールを考慮しなくてよく、ただリアルタイムなメッセージのやりとりのためだけという点だけであればかなりシンプルにWebsocketサーバを実装出来ることが分かる。
ただこれで実用的なレベルのWebsocketアプリケーションをいきなり作るのは低レイヤーすぎて正直しんどいと思うので素直にPhoenixを使った方がいいと思った。改めてPhoenixの抽象化はよく出来てるなと確認。
用途としてはライブラリのコンポーネントとして一からWebsocketを使いたい場合などはいいんじゃないだろうか。
参照資料
- [Nine Nines: cowboy_websocket(3)] (https://ninenines.eu/docs/en/cowboy/2.0/manual/cowboy_websocket/) (Cowboy公式ドキュメント)
- [RFC 6455 - The WebSocket Protocol] (https://tools.ietf.org/html/rfc6455) (WebsocketのRFC)
- Three alternatives to using GenEvent in Elixir