Elixir
cowboy

ElixirでPhoenixを使わずにWebsocketサーバの実装

ElixirでWebsocketをやろうと思うとまずとりあえずPhoenixが候補として浮かんでくるくらいにはPhoenix Channelは便利だ。
優れた抽象化、明快なインターフェースで直感的にWebsocketアプリケーションを実装出来る。またPhoenix.PubSubで簡単にノードをまたがったメッセージのbroadcastなども実現出来る。

だが本当に必要なものがシンプルなWebsocketだけで、トピックの分割も必要なく接続してきたクライアントに対して返答したいだけなら実はCowboyのみでも実装出来る。実際Phoenix ChannelはCowboyのビルトインのWebsocketサーバをうまく抽象化したものだ。

よく誤解されがちだがPhoenix自体コードベースはかなりマイクロで正直Websocketのためだけに使ってもよいぐらいなのだけど、今回はより低レイヤーでWebsocketを扱いたいのと、極力依存を減らしたいのでCowboyのみで実装してみる。

Cowboy

  • Erlang製のHTTPサーバ
  • Cowboy 2.0よりHTTP2などに対応 (Phoenixは1.4でCowboy 2.0に対応)
  • 1.0とインターフェースが大きく変わっているので1系に関するドキュメントが役に立たない場合もあるので注意
  • なお今回は2系で実装する

Installation

  • プロジェクト作成
$ mix new my_app
$ cd my_app
  • 依存追加
mix.exs
{:cowboy, "~> 2.0"}
$ mix deps.get

Implementation

Dispatch

Cowboyサーバを起動するための処理を記述する

server.ex
defmodule MyApp.Server do
  use Application
  require Logger

  def start(_type, _args) do
    dispatch =
      :cowboy_router.compile([
        {:_,
         [
           {'/websocket', MyApp.WebSocketHandler, []}
         ]}
      ])

    Logger.info("Started listening on port 5984...")

    :cowboy.start_clear(:my_http_listener, [{:port, 5984}], %{env: %{dispatch: dispatch}})
  end

  def stop(_state) do
    :ok
  end
end

サーバをSuperviseするApplicationを定義

application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      supervisor(MyApp.Server, []),
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

mix.exsのapplication/0でmodキーワードを指定しserverを起動

mix.exs
  def application do
    [
      mod: {MyApp.Application, {}},
      extra_applications: [:logger]
    ]
  end

Cowboy websocket behaviour

DispatchされたWebsocketリクエストを実際に処理するハンドラを定義する

websocket_handler.ex
defmodule MyApp.WebSocketHandler do
  @behaviour :cowboy_websocket_handler

  require Logger

  def init(req, state) do
    opts = %{idle_timeout: 60000}

    {:cowboy_websocket, req, state, opts}
  end

  def websocket_init(state) do
    Logger.info("started connection.")
    {:ok, state}
  end

  def websocket_handle({:text, message}, state) do
    {:reply, {:text, message}, state}
  end

  def websocket_handle(_data, state) do
    {:ok, state}
  end

  def websocket_info(:foo, state) do
    {:ok, state}
  end

  def terminate(_reason, _req, _state) do
    Logger.info("connection terminated")
    :ok
  end
end

解説

  • init/2 はcowboyのハンドラ全てでリクエストを受け取ったときに呼び出されるcallbackで、ここでタプルで最初の要素に:cowboy_websocketを返すとWebsocketへUpgradeされる
  • websocket_init/1はWebsocketへUpgrade後に最初に呼び出されるcallback。optionなので実装しないでもいいがWebsocket接続確立後に何か初期化したい場合はここに実装する
  • websocket_handle/2 callbackはwebsocket frameが到着すると呼び出される。frameの種類は:text, :binary, :ping, :pongがある。{:reply, {:text, message}, state}{:text, message}に当たる部分がクライアントへ返されるフレーム
  • websocket_info/2はErlangメッセージがコネクションプロセスに到達し、該当するcallbackがある場合に呼び出される。例えば、上記の場合:fooというメッセージでコネクションのプロセスにsendすると websocket_info(:foo, state)が呼び出される。
def websocket_init(state) do
  Logger.info("started connection.")
  # websocket_init/1内でのself()がコネクションのpidになる
  send(self(), :foo)
  {:ok, state}
end
  • terminate/3は接続がなんらかの理由で切断された場合呼び出されるcallback。reason に切断された理由が入り :remoteはクライアント起因で接続を閉じた場合。:timeout は一定期間クライアントから何もデータが到達しない場合。タイムアウトを変えたい場合はinit/2のoptsで:idle_timeoutを指定する。デフォルトは60秒でタイムアウトなのでクライアントからのハートビートは60秒以内にしよう。

基本的な実装は以上になる。

動作確認するにはブラウザに既にWebsocketクライアントが組み込まれているのでブラウザのコンソールから接続するのが手っ取り早い。

Websocketサーバをiex経由で起動し

 iex -S mix
Erlang/OTP 21 [erts-10.1] [source] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:1] [hipe]

06:12:05.459 [info]  Started listening on port 5984...
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>

ChromeのDevToolを開き、Consoleからサーバへ接続する

var ws = new WebSocket("ws://localhost:5984/websocket")
ws.send("foo")

image.png

DevToolのWSタブを開くと実際にFrameが送受信されているのが分かる

image.png

あとはメッセージ受信用のハンドラを登録してJSでよしなにしよう

ws.onmessage = function(event){
  console.log(event.data)
}

broadcast

基本的なサーバ-クライアントでの1対1のリクエスト-レスポンスモデルの処理だけなら上記だけで十分だが(実際はクライアントでハートビートなどしてコネクションを維持したりする必要はあるが)、Websocketを実用的に使うとなるとサーバに接続している全てのクライアントにbroadcastしたり、broadcast範囲の分割(トピック)、サーバがスケールして複数台になった時に全てのサーバに同報するなどの処理が必要になってくる。

複数台のサーバに同報するようなレベルのアプリケーションはその時点でかなり高度なアプリケーションであることが予想されるので素直にPhoenix.PubSubなどのPubSubフレームワークを使った方がいいが、今回はそこまで必要ないよ単体のサーバで十分だよという方向けに単体のサーバのみでbroadcastを実現してみる。(トピックの分割もなし)

まず必要になるのは接続しているクライアントのプロセスの管理。
基本的にCowboyでは1コネクションごとにプロセスが割り当てられるので、そのプロセスに対してメッセージを送信することでbroadcastを実現する。

まずプロセスの登録にはgprocやRegistry, GenServer, Agentなどが使えてどれでも実装出来るといえば出来るが、今回はElixirビルトインのRegistryで実装してみる。

Registryを使うためにSupervisor監視下にRegistryを加える

application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      supervisor(MyApp.Server, []),
      # 追加
      supervisor(Registry, [[keys: :duplicate, name: :client_registry]])
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

websocket_init/1でクライアントをRegistryに登録する

websocket_handler.ex
  def websocket_init(state) do
    Logger.info("Socket starting")
    # register/3 すると対象のkeyに {self(), 第三引数} が登録される
    Registry.register(:client_registry, :clients, [])
    {:ok, state}
  end

あとは任意のタイミングでRegistryをlookupして登録してあるプロセスに対してErlangメッセージをsendすることによって現在接続している全てのクライアントに同時にメッセージを送ることが出来る。

# messageをbroadcast
def broadcast(message) do
  Registry.dispatch(:client_registry, :clients, fn entries ->
    for {pid, _} <- entries do
      send(pid, {:broadcast, message})
    end
  end)
end

def websocket_info({:broadcast, message}, state) do
  {:reply, {:text, message}, state}
end

またwebsocket_init/1ではwebsocketプロセスの登録以外にもタイマーを設定したりなどにも使える。

タイマー実装例

  def websocket_init(state) do
    Logger.info("Socket starting")
    Registry.register(:client_registry, :clients, [])
    # 1秒後に自分に対してsend
    Process.send_after(self(), {:tick, self()}, 1000)
    {:ok, state}
  end

  def websocket_info({:tick, pid}, state) do
    # 再度1秒後にsend. 以降1秒毎に`websocket_info/2`がcallされ1秒ごとにtickメッセージがクライアントに届く
    Process.send_after(pid, {:tick, pid}, 1000)
    {:reply, {:text, "tick"}, state}
  end

image.png

1秒ごとにメッセージが届いているのが分かる

この後はトピックへのSubscribeなどでbroadcast範囲の分割をすることで必要ないメッセージをブロードキャストしないでいいようにするなどの最適化をすることなども考えられるが、これも同様にRegistryを使って特定のkey以下にプロセスを登録しそのkeyのentryにbroadcastすることで実現出来る。

SSE (Server-Sent Events)

基本的には上記のようなプロセスの管理によってサーバ契機のイベントも同様に処理出来る。

まとめ

CowboyでWebsocketサーバを実装した。
実際のコード量はこのページにあるもので分かる通りかなり短いコード量で実装出来ることが分かる。
認証や特定のトピックのSubscribe、スケールを考慮しなくてよく、ただリアルタイムなメッセージのやりとりのためだけという点だけであればかなりシンプルにWebsocketサーバを実装出来ることが分かる。

ただこれで実用的なレベルのWebsocketアプリケーションをいきなり作るのは低レイヤーすぎて正直しんどいと思うので素直にPhoenixを使った方がいいと思った。改めてPhoenixの抽象化はよく出来てるなと確認。

用途としてはライブラリのコンポーネントとして一からWebsocketを使いたい場合などはいいんじゃないだろうか。

参照資料