Help us understand the problem. What is going on with this article?

Elixir/Phoenixで並行処理を使って大量のメッセージに対応するBOTサーバーを構築する方法を調査してみました。

More than 3 years have passed since last update.

Facebook Messenger PlatformやLINE BOTが話題になっていますが、下記の記事でも言及されているように、BOTサーバーとして大量メッセージに対応するには「並行処理」がキモになってきます。

大量メッセージが来ても安心なLINE BOTサーバのアーキテクチャ

そしてElixirといえばやっぱり「並行処理」なわけです。ということで「BOTサーバーを効率よく開発するにはElixir/Phoenixってとても良い選択なのでは?」という仮定のもと、色々と検証してみました。

並行処理のコード

Elixirでプロセスを起動・管理する方法はいくつも用意されていますが、BOTサーバーの要件的に「状態」を管理する必要はありませんし、プロセスから「戻り値」を返してもらう必要もありません。要するにプロセスは「使い捨て」というか、実行が終わったら勝手に終了してくれればそれでオッケーなわけです。

この要件にピッタリなのはTaskTask.Supervisorだと思いますので、これを使ってみることにします。

また、LINE BOTの場合、1つのリクエストに最大で100メッセージが含まれるとのことなので、カンマ区切りで複数のメッセージが渡されることを前提としたアクションを定義して、そこから並行処理のコードを呼び出すようにしてみます。

そんな感じで下記のようなコードを定義してみました。

web/router.ex
defmodule BotSample.Router do
  ...
  scope "/", BotSample do
    ...
    post "/callback", PageController, :callback
  end
end
web/controllers/page_controller.ex
defmodule BotSample.PageController do
  ...
  def callback(conn, %{"messages" => messages} = _params) do
    Bot.process_messages(messages)
    json conn, %{ok: "ok"}
  end
end
lib/bot_sample.ex
defmodule BotSample do
  use Application
  def start(_type, _args) do
    ...
    children = [
      ...
      supervisor(Task.Supervisor, [[name: BotSample.BotTaskSupervisor]]),
    ]
  end
end
lib/bot_sample/bot.ex
defmodule BotSample.Bot do
  alias BotSample.Repo

  def process_messages(messages) when is_binary(messages) do
    Task.Supervisor.start_child(BotSample.BotTaskSupervisor, fn -> do_process_messages(messages) end)
  end

  defp do_process_messages(messages) when is_binary(messages) do
    Enum.each String.split(messages, ","), fn(message) ->
      Task.Supervisor.start_child(BotSample.BotTaskSupervisor, fn -> process_message(message) end)
    end
  end

  defp process_message(message) when is_binary(message) do
    IO.puts "message = #{inspect message}"
  end
end
config/dev.exs
config :logger, :console, format: "[$level] $message\n", level: :error

あくまで検証用なのでごくごく簡単なコードになっています。ElixirのLoggerの:sync_thresholdの仕様によるパフォーマンスへの影響を排除するために、loggerの出力レベルは:infoではなく:errorに設定しています。(詳しくはこちらをご参照ください)

ベンチマーク用のツール

ベンチマークというか負荷検証用のツールとしては、PhoenixとRailsのパフォーマンス比較記事でも紹介されていたwrkというツールを使ってみました。

ベンチマークツール"wrk"で自由にリクエストを送る

POSTメソッドでJSON形式のリクエストを送信したいので、下記のような設定ファイルを用意します。

-- wrk.lua
wrk.method = "POST"
wrk.body = '{"messages":"1,2,3,4,5,6,7,8,9,10"}'
wrk.headers["Content-Type"] = "application/json"
wrk.headers["Accept"] = "application/json"

モニタリング用のツール

検証スクリプト実行中のCPUやプロセス数等の状況を確認するために、下記の記事で紹介されているex_topというツールを使用しました。

Elixirでプロセス20万くらい作ってみた

検証用のサーバー

並行処理に関する動作を検証することが目的なので、CPUのコア数が多めのサーバーを使用する必要があります。また、wrkによるリクエストを送信するサーバーと、Elixir/Phoenixでリクエストを処理するサーバーは別々に用意します。

この検証作業においては、下記のインスタンスタイプのEC2を2台使用しました。

インスタンスタイプ vCPU メモリ
c4.2xlarge  8  15GiB 

検証1

まず、検証用のサーバーでPhoenixを起動します。ベンチマークだけなら普通にmix phoenix.serverで起動しても良いのですが、ex_topを使用する場合はノード名を明示的に指定する必要があるので下記のようなコマンドで起動します。

iex --sname bar@localhost -S mix phoenix.server

続いてex_topを起動します。検証用サーバーのex_topをダウンロードしたディレクトリで下記のようなコマンドを実行します。

./ex_top bar@localhost

ex_topを実行すると、環境にもよりますが8コアのサーバーだと下記のような画面が表示されると思います。

スクリーンショット 2016-04-26 14.40.59.png

左上の領域でCPUの各コアの使用状況、右上の「Statistics」の領域でプロセス数等をモニタリングすることが出来ます。

次に、リクエストを送信する側のサーバーで下記のようなwrkコマンドを実行します。(ホストの部分はPhoenixが実行されているサーバーのIPを指定します)

wrk -c 8 -t 8 -d 30 -s ./wrk.lua http://ホスト:4000/callback

1リクエストに10メッセージが含まれる状態で上記のコードを実行すると、私の構築した環境だと4800req/secという結果になりました。要するにこのサーバースペックだと、ごく短い処理であれば(1リクエストに10メッセージが含まれているので)秒間48000メッセージに対応出来るということになります。

検証2

さて、上記のBotSample.Bot.process_message/1の処理内では、実際にはプラットフォーム側のAPIを呼び出すことになります。

外部APIの実行に関しては、さすがに戻り値(ステータスコードやレスポンスボディ)をチェックしないわけにはいかないと思いますので、レスポンスが返されるまで待つことになりますが、この戻り時間を少々厳し目に「平均2秒〜3秒」と仮定して、BotSample.Bot.process_message/1関数に下記のようなsleep処理を追加してみます。

defp process_message(message) when is_binary(message) do
  :timer.sleep(Enum.random(2000..3000))
  IO.puts "message = #{inspect message}"
end

この状態で、先ほどと同じwrkコマンドを実行してみると、パフォーマンスは750req/secあたりまで悪化しました。

さらに各CPUコアの使用率も100%になり、プロセス数も20000程度になります。(sleep処理を入れる前のテストだと300程度)

要するに外部API呼び出しに平均2〜3秒必要になると想定すると、このサーバー1台で処理出来る秒間メッセージ数は7000程度、しかもCPUの使用率から考えると、長期間安定して処理を行える秒間メッセージ数はこれよりもかなり少なくなる、ということになりそうです。

検証3

次に、データストアへのアクセス処理を追加して検証を行ってみたいと思いますが、上記の外部API呼び出しに加えて、BOTサーバーで並行処理を行う際にもう一つ大きなボトルネックになると考えられるのが、データストア用のコネクションプールのプールサイズです。

例えばデータストアにPostgresqlやMySQLを使用する場合、Phoenix/Ectoのデフォルトでは、プールサイズは「10」に設定されていますが、このプールサイズで並行処理を行おうとするとtimeout errorが多発してしまうので使いものになりません。

ということで、大きめのプールサイズ(数千程度)を設定して検証を行えるように、メモリサイズが大きめなデータベースサーバーを用意します。

この検証においては、下記のようなスペックのRDS(MySQL)を使用しました。

インスタンスタイプ vCPU メモリ
db.r3.2xlarge  8  61GiB 

RDSのMySQLのmax_connectionsは、こちらの記事で解説されているように「DBInstanceClassMemory/12582880」に設定されますので、上記スペックのRDSだとmax_connectionsは5100程度に設定されます。

Phoenix側では、上記データベースへの接続設定のpool_sizeを下記のように設定してみます。

config/dev.exs
config :bot_sample, BotSample.Repo,
  ...
  pool_size: 5000

BotSample.Bot.process_message/1関数を下記のように変更します。

defp process_message(message) when is_binary(message) do
  :timer.sleep(Enum.random(2000..3000))
  {:ok, %{rows: rows}} = Ecto.Adapters.SQL.query(Repo, "SELECT NOW()", [])
  IO.puts "now = #{inspect rows}"
end

この状態でwrkコマンドを実行してみると、パフォーマンスは530req/secほどになりました。

また、プール1つごとにプロセスが起動するようで、平常時でもプロセス数が5000超になってしまいますが、timeout errorは発生せず、全メッセージを正常に処理出来ました。

大きな遅延も発生しませんでしたので、データストアのプールサイズおよび最大同時接続数を十分に確保しておけば、並行処理に対して大きな影響を与えずにメッセージを処理出来ることが分かりました。

まとめ

Elixir/PhoenixでBOTサーバーを開発する場合、並行処理に関しては上記のようにTaskTask.Supervisorを使えば非常に簡単に実装出来ることが分かりました。(他にも色々とやり方はあるかもしれませんが)

1台のサーバーで処理出来るメッセージ数/seqに関しては、上記のようにボトルネックになる処理(外部APIコールやデータストアのプールサイズ)に関する適当なダミーコードを追加していって、wrk等のベンチマークツールで負荷測定を行えば、あるスペックのサーバーで処理できるメッセージ数/seqというのはおおよそ算出出来ると思いますので、その数値および想定するメッセージ数/seqを元にして必要なサーバー台数を決める、という感じで良さそうかなと。

追記

並行処理に関してはあまり経験がないため、色々おかしな点があるかもしれません。コメント欄でご意見やご指摘等頂けますと大変ありがたいです。よろしくお願い申し上げますm(__)m

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away