Facebook Messenger PlatformやLINE BOTが話題になっていますが、下記の記事でも言及されているように、BOTサーバーとして大量メッセージに対応するには「並行処理」がキモになってきます。
大量メッセージが来ても安心なLINE BOTサーバのアーキテクチャ
そしてElixirといえばやっぱり「並行処理」なわけです。ということで「BOTサーバーを効率よく開発するにはElixir/Phoenixってとても良い選択なのでは?」という仮定のもと、色々と検証してみました。
並行処理のコード
Elixirでプロセスを起動・管理する方法はいくつも用意されていますが、BOTサーバーの要件的に「状態」を管理する必要はありませんし、プロセスから「戻り値」を返してもらう必要もありません。要するにプロセスは「使い捨て」というか、実行が終わったら勝手に終了してくれればそれでオッケーなわけです。
この要件にピッタリなのはTaskとTask.Supervisorだと思いますので、これを使ってみることにします。
また、LINE BOTの場合、1つのリクエストに最大で100メッセージが含まれるとのことなので、カンマ区切りで複数のメッセージが渡されることを前提としたアクションを定義して、そこから並行処理のコードを呼び出すようにしてみます。
そんな感じで下記のようなコードを定義してみました。
defmodule BotSample.Router do
...
scope "/", BotSample do
...
post "/callback", PageController, :callback
end
end
defmodule BotSample.PageController do
...
def callback(conn, %{"messages" => messages} = _params) do
Bot.process_messages(messages)
json conn, %{ok: "ok"}
end
end
defmodule BotSample do
use Application
def start(_type, _args) do
...
children = [
...
supervisor(Task.Supervisor, [[name: BotSample.BotTaskSupervisor]]),
]
end
end
defmodule BotSample.Bot do
alias BotSample.Repo
def process_messages(messages) when is_binary(messages) do
Task.Supervisor.start_child(BotSample.BotTaskSupervisor, fn -> do_process_messages(messages) end)
end
defp do_process_messages(messages) when is_binary(messages) do
Enum.each String.split(messages, ","), fn(message) ->
Task.Supervisor.start_child(BotSample.BotTaskSupervisor, fn -> process_message(message) end)
end
end
defp process_message(message) when is_binary(message) do
IO.puts "message = #{inspect message}"
end
end
config :logger, :console, format: "[$level] $message\n", level: :error
あくまで検証用なのでごくごく簡単なコードになっています。ElixirのLoggerの:sync_threshold
の仕様によるパフォーマンスへの影響を排除するために、logger
の出力レベルは:info
ではなく:error
に設定しています。(詳しくはこちらをご参照ください)
ベンチマーク用のツール
ベンチマークというか負荷検証用のツールとしては、PhoenixとRailsのパフォーマンス比較記事でも紹介されていたwrkというツールを使ってみました。
POSTメソッドでJSON形式のリクエストを送信したいので、下記のような設定ファイルを用意します。
-- wrk.lua
wrk.method = "POST"
wrk.body = '{"messages":"1,2,3,4,5,6,7,8,9,10"}'
wrk.headers["Content-Type"] = "application/json"
wrk.headers["Accept"] = "application/json"
モニタリング用のツール
検証スクリプト実行中のCPUやプロセス数等の状況を確認するために、下記の記事で紹介されているex_topというツールを使用しました。
検証用のサーバー
並行処理に関する動作を検証することが目的なので、CPUのコア数が多めのサーバーを使用する必要があります。また、wrkによるリクエストを送信するサーバーと、Elixir/Phoenixでリクエストを処理するサーバーは別々に用意します。
この検証作業においては、下記のインスタンスタイプのEC2を2台使用しました。
インスタンスタイプ | vCPU | メモリ |
---|---|---|
c4.2xlarge | 8 | 15GiB |
検証1
まず、検証用のサーバーでPhoenixを起動します。ベンチマークだけなら普通にmix phoenix.server
で起動しても良いのですが、ex_topを使用する場合はノード名を明示的に指定する必要があるので下記のようなコマンドで起動します。
iex --sname bar@localhost -S mix phoenix.server
続いてex_topを起動します。検証用サーバーのex_topをダウンロードしたディレクトリで下記のようなコマンドを実行します。
./ex_top bar@localhost
ex_topを実行すると、環境にもよりますが8コアのサーバーだと下記のような画面が表示されると思います。
左上の領域でCPUの各コアの使用状況、右上の「Statistics」の領域でプロセス数等をモニタリングすることが出来ます。
次に、リクエストを送信する側のサーバーで下記のようなwrkコマンドを実行します。(ホストの部分はPhoenixが実行されているサーバーのIPを指定します)
wrk -c 8 -t 8 -d 30 -s ./wrk.lua http://ホスト:4000/callback
1リクエストに10メッセージが含まれる状態で上記のコードを実行すると、私の構築した環境だと4800req/sec
という結果になりました。要するにこのサーバースペックだと、ごく短い処理であれば(1リクエストに10メッセージが含まれているので)秒間48000メッセージに対応出来るということになります。
検証2
さて、上記のBotSample.Bot.process_message/1
の処理内では、実際にはプラットフォーム側のAPIを呼び出すことになります。
外部APIの実行に関しては、さすがに戻り値(ステータスコードやレスポンスボディ)をチェックしないわけにはいかないと思いますので、レスポンスが返されるまで待つことになりますが、この戻り時間を少々厳し目に「平均2秒〜3秒」と仮定して、BotSample.Bot.process_message/1
関数に下記のようなsleep処理を追加してみます。
defp process_message(message) when is_binary(message) do
:timer.sleep(Enum.random(2000..3000))
IO.puts "message = #{inspect message}"
end
この状態で、先ほどと同じwrkコマンドを実行してみると、パフォーマンスは750req/sec
あたりまで悪化しました。
さらに各CPUコアの使用率も100%になり、プロセス数も20000程度になります。(sleep処理を入れる前のテストだと300程度)
要するに外部API呼び出しに平均2〜3秒必要になると想定すると、このサーバー1台で処理出来る秒間メッセージ数は7000程度、しかもCPUの使用率から考えると、長期間安定して処理を行える秒間メッセージ数はこれよりもかなり少なくなる、ということになりそうです。
検証3
次に、データストアへのアクセス処理を追加して検証を行ってみたいと思いますが、上記の外部API呼び出しに加えて、BOTサーバーで並行処理を行う際にもう一つ大きなボトルネックになると考えられるのが、データストア用のコネクションプールのプールサイズです。
例えばデータストアにPostgresqlやMySQLを使用する場合、Phoenix/Ectoのデフォルトでは、プールサイズは「10」に設定されていますが、このプールサイズで並行処理を行おうとするとtimeout errorが多発してしまうので使いものになりません。
ということで、大きめのプールサイズ(数千程度)を設定して検証を行えるように、メモリサイズが大きめなデータベースサーバーを用意します。
この検証においては、下記のようなスペックのRDS(MySQL)を使用しました。
インスタンスタイプ | vCPU | メモリ |
---|---|---|
db.r3.2xlarge | 8 | 61GiB |
RDSのMySQLのmax_connectionsは、こちらの記事で解説されているように「DBInstanceClassMemory/12582880」に設定されますので、上記スペックのRDSだとmax_connectionsは5100程度に設定されます。
Phoenix側では、上記データベースへの接続設定のpool_sizeを下記のように設定してみます。
config :bot_sample, BotSample.Repo,
...
pool_size: 5000
BotSample.Bot.process_message/1
関数を下記のように変更します。
defp process_message(message) when is_binary(message) do
:timer.sleep(Enum.random(2000..3000))
{:ok, %{rows: rows}} = Ecto.Adapters.SQL.query(Repo, "SELECT NOW()", [])
IO.puts "now = #{inspect rows}"
end
この状態でwrkコマンドを実行してみると、パフォーマンスは530req/sec
ほどになりました。
また、プール1つごとにプロセスが起動するようで、平常時でもプロセス数が5000超になってしまいますが、timeout errorは発生せず、全メッセージを正常に処理出来ました。
大きな遅延も発生しませんでしたので、データストアのプールサイズおよび最大同時接続数を十分に確保しておけば、並行処理に対して大きな影響を与えずにメッセージを処理出来ることが分かりました。
まとめ
Elixir/PhoenixでBOTサーバーを開発する場合、並行処理に関しては上記のようにTask
とTask.Supervisor
を使えば非常に簡単に実装出来ることが分かりました。(他にも色々とやり方はあるかもしれませんが)
1台のサーバーで処理出来るメッセージ数/seqに関しては、上記のようにボトルネックになる処理(外部APIコールやデータストアのプールサイズ)に関する適当なダミーコードを追加していって、wrk等のベンチマークツールで負荷測定を行えば、あるスペックのサーバーで処理できるメッセージ数/seqというのはおおよそ算出出来ると思いますので、その数値および想定するメッセージ数/seqを元にして必要なサーバー台数を決める、という感じで良さそうかなと。
追記
並行処理に関してはあまり経験がないため、色々おかしな点があるかもしれません。コメント欄でご意見やご指摘等頂けますと大変ありがたいです。よろしくお願い申し上げますm(__)m