56
50

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

分散Erlang周りの性能測定メモ

Last updated at Posted at 2014-12-24

今日はErlangの内部実装周りの記事を書きたかったのですが、それは準備が間に合いませんでした。
代わりに分散Erlangに関連する機能の性能測定をいくつか行ったので、その結果メモを載せさせて貰います。
※ 測定方法は結構適当なので参考程度に

なお、分散Erlang自体に関しては(例えば)『すごいErlangゆかいに学ぼう! 』の29章(web版)が詳しいので、そちらを参照のこと。

目的等

分散Erlangはメッシュ型のクラスタを組むので、台数が増えると性能が頭打ちしやすいかも、という話を以前にどこかで目にした覚えがあったので、実際にどうなのかを試してみたかった、というのがもともとの動機。

今回の目的は、(最大で)数百ノード規模のErlangクラスタを構築する際に、どこが性能上のボトルネックになり得るかをおおまかに把握すること。

特定のシステムやアプリケーションの全体の性能測定ではなく、標準で提供されている分散Erlang関連機能単体の基礎性能の測定を行なう。
(分散Erlangの基盤的な部分で、スケーラビリティを阻害する要因がないかどうか)

なお、安定性周りの話(ex. ノード数が多い+ネットワークが不安定な際、の分散Erlangの性能や挙動等)は今回は考慮していない。

測定環境および測定方法

測定環境

EC2のc3.8xlargeインスタンスを使用した:

  • CPU: Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz (物理16コア、論理32コア)
  • メモリ: 60GiB
  • OS: CentOS-6.4
  • OTP: 17.4 (rpm)
  • インスタンス数: 10台

クラスタの構築方法とインスタンス構成

計測用の分散Erlangクラスタの構築にはslaveモジュールを使用した。

  • slaveモジュールを使うことで、Erlangシェル上から別のノードをスレーブとして起動することができる
    • 起動した側のノードはマスターノードとなる
  • スレーブノードの生存期間は、マスターノード(or link付きで呼び出した場合は呼び出し元プロセス)のそれと同期する
  • クリーンな環境のクラスタの構築および破棄が手軽に行えるので、ベンチマーク的な用途に向いた便利なモジュール
%% 使用例:
%%
%% スレーブノードを起動したいホスト名とノードの名前を指定して slave:start_link/2 を呼び出す
%% ※ ここで指定するホストにはrshコマンドでログインができる必要がある 
%%    (rshコマンドではなく、sshコマンドを使いたい場合はerlコマンドの実行時に'-rsh ssh'オプションを引数で指定する)
> slave:start_link(hoge_host, fuga).
{ok, fuga@hoge_host}  % 'fuga@hoge_host'というスレーブノードが起動した

> nodes().
[fuga@hoge_host]

> exit(shutdown). % 呼び出し元(link)プロセスがダウンしたら、スレーブノードは自動で終了させられる

> nodes().
[]

測定時に用いたEC2インスタンスの構成は、以下の通り:

  • マスターノード用に専用のインスタンスを一台割当
    • 計測用の各種操作を実行したり、ベンチマーククライアントが動作するのはこのノード
  • スレーブノード用には残りのインスタンス九台を割当
    • 測定用の負荷調整用インスタンス群
    • 各インスタンスでは最大で30個程度のEralngノードを同時に起動される
      • 相乗りしているのはインスタンス数の節約のため仕方なく (可能なら一インスタンス一ノードの方が望ましい)
    • 測定時には最大で256スレーブノード(+ 1マスターノード)を含むクラスタが作成される

スレーブノード群は、各測定関数実行前に起動され、関数実行後に破棄されるようにした。

計測方法 (共通部分)

末尾に記載のdist_bench.erlに定義されている関数群を使って測定を行った。

以下、全ての計測で使用した共通のユーティリティ関数群の紹介および使用例:

%% [1] dist_bench:setup_nodes/1
%% スレーブノード群の起動関数
> NodeSpecs = [{host1, 3}, {host2, 2}]. % スレーブノードを起動するホストと、そのホストでの起動数を指定する
> dist_bench:setup_nodes(NodeSpecs).
{5, 654112}.  % {起動したスレーブノードの数, 各ノードの起動に要した時間の平均(μs)}

> nodes().
[slave_0@host1, slave_1@host1, slave_2@host1, slave_0@host2, slave_1@host2]

> exit(shutdown).


%% [2] dist_bench:make_node_specs/2
%% 'NodeSpecs'の生成補助関数
%% 利用可能なホスト一覧と、スレーブノードの合計起動数を指定すると、各ホスト間で起動数が均等になるように調整してくれる
> dist_bench:make_node_specs([host1, host2], 5).
[{host1, 3}, {host2, 2}]


%% [3] dist_bench:do_bench/3
%% ベンチマーク(計測)実行関数
> ExecuteCount = 5.                          % 'BenchFun'の実行回数
> BenchFun = fun () -> timer:sleep(100) end. % 具体的な計測処理を実行する関数
> dist_bench:do_bench(NodeSpecs, ExecuteCount, BenchFun). 
# setup nodes ... done: count=5, start_avg_time=0.3468948 sec % 最初にスレーブノード群が起動させられる
# [1] do bench ... done: time=0.103409 sec % 'BenchFun'の一回の実行に掛かった時間(前処理や後処理の時間も含むので参考程度)
# [2] do bench ... done: time=0.100309 sec
# [3] do bench ... done: time=0.101726 sec
# [4] do bench ... done: time=0.1009 sec
# [5] do bench ... done: time=0.100786 sec

[ok,ok,ok,ok,ok]
% ↑ 'BenchFun'関数の返り値を集めたもの.
% ここでは'ok'が返されているが、実際の対象処理の計時結果が返されることを期待している.
% 以降では、このリスト中の中央値が、最終的な計時結果として採用されている。

スケールアウト関連の性能測定

クラスタ内のノード数が増えた場合のスケーラビリティの測定。

対象

標準で提供されており、かつ、自分が比較的良く利用するモジュールや関数群を中心に測定を行った。

対象外とした分散Erlang関連モジュール等 (すぐ思いついた物のみ記載):

  • mnesiaモジュール
    • 標準で配布されている分散DMBS
    • 個人的には上手い使い所が見つかっていないのと、ここで扱うのは巨大する気がするので今回は対象外
  • pgモジュール
    • pg2の前身(?)で現在は非推奨扱い (OTP18で削除予定、となっていた)
  • global_groupモジュール
    • クラスタ内のノード群をグループ分けするためのモジュール
    • クラスタ全体のサイズが大きい場合でも、グループ分けを適切に行なうことでglobalモジュールの性能劣化を防げるとのこと
    • ノード群のグループ分けの機能自体には若干惹かれるが、いろいろと制限がきついので今後も使うことはなさそう
  • 分散OTPアプリケーション
    • 今のところ使う予定なし

ノード管理系

ノードの追加

測定内容

slave:start_link/3関数を使って、ノードを追加(起動)するのに要する時間が、クラスタサイズに応じて変化するかを測定した。

%% 測定方法:
%% - dist_bench:setup_nodes/1関数を使って、スレーブノードの追加に要した時間の平均値を測定する

> Hosts = [host1, host2, host3, host4, host5, host6, host7, host8, host9]. % スレーブ用のEC2インスタンスリスト
> NodeCount = 1. % 起動するスレーブノードの数。1から256の範囲で変化させて、それぞれの値での追加に要した時間を計測する

> dist_bench:setup_nodes(dist_bench:make_node_specs(Hosts, NodeCount)).
{1, 647250.0}  % 返り値の第二要素が、各ノードの追加(起動)に要した時間の平均値(μs)

> exit(shutdown). % 'NodeCount'を変更して次の想定を行なう前に、起動したスレーブノード群を停止する

測定結果

スレーブノード数 ノードの追加に要した時間の平均
1 0.637 s
4 0.673 s
16 0.691 s
64 0.705 s
256 0.690 s

少なくともslaveモジュール経由で起動した場合は、クラスタ内のノード数が多くなっても、新規ノードの追加に掛かるコストが顕著に増える、といったようなことはなさそう。

ノードの死活監視

※ これに関しては'測定'という程のことは行っていないので、軽く概要だけ書いておく

分散Erlangでの接続しているノードの死活監視はnet_kernelモジュールが行っている。

死活チェック処理のおおよその流れは以下の通り:

  • net_kernelプロセスは、接続ノードとの間に死活監視用のTCP接続を貼っている
  • ticktime / 4秒毎に、その全てのTCP接続に対して、(監視用の)書き込みを行なう
    • ticktimeはnet_kernel:set_net_ticktime/1で指定された秒数で、デフォルトは60秒
  • TCP接続が閉じていたり、ticktime秒間に(相手側からの)書き込みがなかったノードは、ダウンしていると判断する

当然、接続してるノード数に比例して、(ticktime / 4秒毎の)処理量は増えることになるが、特に重い処理でもないので、数百ノード程度であれば、負荷的に特に問題となることもなさそう。

実際に、特に何も処理を走らせていない状態でCPUの使用率を確認してみても、接続ノードがない場合と接続ノード数が200の場合との間で、特に優位な程の差は見られなかったように思う。

ピアツーピア通信系

ノードを跨いだ個々のプロセス同士が直接やりとりする種類の操作の性能測定。

ノードが異なるプロセス間でのメッセージ送受信や、プロセス死活監視、ノード間RPC等。

これらの操作は対象となる特定のプロセス/ノードとさえ通信できれば十分で、クラスタ全体の状態を把握する必要もないので、クラスタ内のノード数が増加しても性能が劣化しない(= 結果としてシステム全体は良好にスケールアウトする)ことが期待される。

具体的な測定対象は以下の通り:

  • erlang:send/2関数:
    • プロセスに対するメッセージ送信関数
    • 実際の測定コードでは!演算子を使用している
    • 送信メッセージの宛先指定方式には、以下の二つを試した:
      • PID(プロセスID)指定:
        • ErlangのPIDには、そのプロセスが存在するノードの情報も埋め込まれているので、PID単体で宛先を一意に決定可能
        • ex: <123.456.789>
      • プロセス名指定:
        • erlang:register/2を使って登録されたプロセス名を送信先に指定する
        • {Name::atom(), Node::node()}形式
  • rpc:async_call/4:
    • rpc:call/4を使うとリモートノード上での関数実行が手軽に行える
    • 今回はスループットを(手軽に)上げるために非同期版のrpc:async_call/4を使用した
  • erlang:monitor/2関数:
    • プロセスの死活管理(単方向監視)
    • モニタしているプロセスがダウンした場合は{'DOWN', MonitorRef, Type, Pid, Reason}という形式のメッセージが呼び出し元プロセスに送られる
    • 双方向監視(?)が行えるerlang:link/2関数もあるが、(性能傾向は変わらないであろうと予想のもと)こちらの計測は割愛

測定内容

  • erlang:send/2関数:
    • リモートノード上にechoサーバプロセスを用意して 一回のメッセージ送受信に要した平均時間 を計測
      • メッセージの送信元はマスターノード上の単一プロセス
      • echoサーバプロセスはリモートのスレーブノード上で起動する (各ノードに付き一プロセスが存在)
    • 測定関数: dist_bench:pid_msg_echo_bench/2 (実際に指定したパラメータ等は下の実行例を参照)
  • rpc:async_call/4:
    • 一回のRPC(リクエスト送信からレスポンス受信まで)に要した平均時間 を計測
      • マスターノード上の(単一)プロセスから、リモートのスレーブノードに対するRPC
    • 測定関数: dist_bench:rpc_call_bench/2 (実際に指定したパラメータ等は下の実行例を参照)
  • erlang:monitor/2関数:
    • スレーブノード上のプロセスに対する 監視の登録からダウン通知の受信までに要した平均時間 を計測
      • プロセスの監視元はマスターノード上の単一プロセス
      • スレーブノード上のプロセスを対象としてerlang:monitor/2を呼び出した直後に、そのプロセスを終了(erlang:exit/2)させて、ダウン通知を待機する
    • 測定関数: dist_bench:monitor_bench/2 (実際に指定したパラメータ等は下の実行例を参照)

測定の際のスレーブノードの数は1から256の間で変化させ、クラスタサイズの大小により各操作のコストが変わるかどうかの確認を行った。
また参考として、ノード間通信が一切不要な、マスターノードのみ(= スレーブノード数が0)の構成での測定も行った。

%% 実行例:

%% スレーブノード数=4、でメッセージ送信ベンチマークを実行h
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 4), 5,
                      fun () -> dist_bench:pid_msg_echo_bench(500000, nodes()) end).

%% スレーブノード数=4, でRPCベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 4), 5,
                      fun () -> dist_bench:rpc_call_bench(100000, nodes()) end).

%% スレーブノード数=4, でプロセス監視ベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 4), 5,
                      fun () -> dist_bench:monitor_bench(100000, nodes()) end).

測定結果

スレーブノード数 erlang:send/2 (PID) erlang:send/2 (名前) rpc:async_call/4 erlang:monitor/2
0 0.758 μs 0.905 μs 3.269 μs 19.450 μs
1 8.020 μs 8.055 μs 40.105 μs 9.074 μs
4 2.021 μs 2.116 μs 11.518 μs 8.598 μs
16 1.429 μs 1.672 μs 7.160 μs 4.985 μs
64 2.345 μs 2.340 μs 11.667 μs 4.839 μs
256 2.634 μs 2.767 μs 11.320 μs 5.053 μs

測定結果を見ると、全体的に、クラスタのサイズが大きくなっても、個々の処理の性能はほとんど劣化してないことが見て取れる。

むしろ並列度が上がるためか、スレーブノード数が16までの場合は、単体の処理に要する時間も(ノード数が増えるに従い)短くなっている。

スレーブノード数が16を超えると若干性能が劣化し始める傾向があるが、これは一つのEC2インスタンス上で複数のスレーブノードを動かしていることが影響を与えているのではないかと思っている。
(スレーブノード数が9を超えた段階で、一つのインスタンス上でのノードの同居が始まるのでインスタンスの負荷は高くなる)

グローバルな情報管理系

クラスタ全体に影響するような操作を行なうモジュール群の性能測定。

グローバルなプロセス名管理やプロセスグループ管理等。

クラスタ内のノード同士が何らかの手段で情報を共有する必要が出てくるので、単なるメッセージ送信のような効率的な実装は難しいと予想される。

具体的な測定対象は以下の通り:

  • globalモジュール:
    • グローバルスコープなプロセス名管理が行える (erlang:register/2のグローバル版)
    • グローバルな相互排他ロック機能も提供している
    • globalモジュールは更新系の操作時に整合性を保証するようにしている
      • プロセス名の登録操作の場合:
        1. 最初に登録する名前をキーにして、クラスタ全体をスコープとしてロックを獲得
        2. ロック獲得に成功したら、登録する名前とPIDを、全ノードに周知して登録を行わせる
        3. ロック獲得に失敗したら、一定時間スリープして再試行
      • 整合性は担保しやすいが、スケールは難しい
    • 登録された名前の情報は、各ノードのローカルのETSに複製・保持されているので、参照系の操作は(他のノードとの通信を行なうことなく)高速に行える
  • pg2モジュール:
    • グローバルスコープなプロセスグループ管理が行える
      • グループの作成、グループへのメンバー(PID)追加、メンバーの取得、etc
      • 特定のノード上で作成されたグループは、クラスタ内の全てのノードから参照可能
    • グループの作成やメンバの追加/削除時には、globalモジュールのトランザクション機能を使って整合性が維持されるようにしている
      • 「グループ名をキーとしてグローバルなロックを獲得」=>「全ノードに情報を共有」=>「ロック解除」
    • 全てグループ情報は、各ノードのローカルのETSに複製・保持されているので、参照系の操作は(他のノードとの通信を行なうことなく)高速に行える

測定内容

  • globalモジュール:
    • 以下の各操作をそれぞれ百回実行し、一回辺りの平均所要時間を測定
      • reg: プロセス名の登録. global:register_name/2
      • unreg: 名前の登録解除. global:unregister_name/1
      • where: 名前に対応するPIDの検索. global:whereis_name/1
  • pg2モジュール:
    • 以下の各操作をそれぞれ百回実行し、一回辺りの平均所要時間を測定
      • create: プロセスグループの作成. pg2:create/1
      • join: プロセスグループへのメンバ追加. pg2:join/2
      • select: プロセスグループからのプロセス選択. pg2:get_closest_pid/1
%% 実行例

%% スレーブノード数=16 で、globalモジュールのベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 16), 5, fun () -> dist_bench:global_bench(100) end).
[[{register,2349.97},{unregister,3246.77},{whereis,0.47}], % 登録/解除/検索、処理の一回辺りの平均所要時間(μs)
 [{register,2509.63},{unregister,2608.14},{whereis,0.49}],
 [{register,2420.48},{unregister,3952.45},{whereis,0.59}],
 [{register,2449.36},{unregister,2469.99},{whereis,0.41}],
 [{register,2647.81},{unregister,2469.52},{whereis,0.44}]]

%% スレーブノード数=16 で、pg2モジュールのベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 16), 5, fun () -> dist_bench:pg2_bench(100, 100) end).
[[{create,1787.34},{join,1735.18},{select,78.39}], % 作成/メンバ追加/選択、処理の一回辺りの平均所要時間(μs)
 [{create,1713.19},{join,1704.63},{select,79.92}],
 [{create,1725.01},{join,2014.12},{select,77.63}],
 [{create,1764.74},{join,1721.5},{select,78.14}],
 [{create,1719.31},{join,1704.02},{select,76.69}]]

測定結果

スレーブノード数 global (reg) global (unreg) gloabl (where) pg2 (create) pg2 (join) pg2 (select)
0 55.63 μs 51.83 μs 0.37 μs 33.15 μs 36.53 μs 75.43 μs
1 54.05 μs 51.69 μs 0.38 μs 749.85 μs 750.02 μs 77.39 μs
4 1,704.57 μs 1,593.71 μs 0.44 μs 990.30 μs 1,140.77 μs 78.66 μs
16 2,449.36 μs 2,608.14 μs 0.47 μs 1,725.01 μs 1,721.50 μs 78.39 μs
64 7,348.40 μs 6,665.51 μs 0.57 μs 5,260.28 μs 5,295.05 μs 79.42 μs
256 25,970.31 μs 27,585.72 μs 0.81 μs 21,897.22 μs 21,938.38 μs 79.31 μs

globalモジュールもpg2モジュールも更新系の操作は、クラスタのサイズに比例して処理時間が長くなっている。
(両者の実装的に、検索系は、ローカルのETSテーブルを参照するだけで済むのでクラスタサイズに影響は受けない)

クラスタサイズが比較的小さい場合でも、更新系の操作の性能はそれほど良い訳でもないので、これらのモジュールの多用は避けた方が無難かもしれない。
(システムの起動時に一回だけ、特定にプロセスにグローバルな名前を付与したい、といった用途ならglobalモジュールは十分有用)

もし、数百代台規模以上のクラスタを組みたく、かつ、これらのモジュールが提供してるような機能を使いたい場合は、別途同等の機能を提供している(よりスケールしやすい)外部ライブラリを探すか、自前で実装した方が良さそう。

スケールアップ関連の性能測定

一つのノードが利用可能なCPUコア数が増えた場合のスケーラビリティの測定。

rpcモジュールは手軽なノード間通信の手段として便利だけど、あまりスケールアップはしないので注意が必要かもしれない、と言いたいだけ。

スケールアップしない理由(の予想):

  • 全てのrpcリクエストは、一旦対象ノード上のrexという名前のプロセスを経由するようになっている
  • そのため、多数のノードから並列してリクエストが発行された場合でも、そのプロセス(のキュー)でリクエスト群が直列化されてしまう
    • rexプロセス上で行われる処理自体はかなり軽い
    • ただ、利用可能なCPUコアが増えるに従い、その僅かな直列化点が全体のスケーラビリティに与える影響度は大きくなる

本当にスケールアップしないのかどうかを確認するのがここでの目的

測定内容

利用可能なCPUのコア数に応じたRPCの処理性能の変化を測るために以下のような条件で測定を行った:

  • マスターノードをRPCサーバとし、複数のスレーブノードから大量のRPCリクエストを発行
  • マスターノード(ErlangVM)が利用可能なCPUコア数を変化させて、それに伴う処理性能の変化を測定
    • 正確にはCPUコア数ではなく、ErlangVM上で稼働するスケジューラの数を変化させた
    • スケジューラの数はerlang:system_flag(schedulers_online, 個数)関数で動的に設定可能(のはず)
  • 一回のRPCに要した平均時間 を計測
  • 測定関数: dist_bench:rpc_call_scaleup_bench/3 (実際に指定したパラメータ等は下の実行例を参照)

また比較のために、RPCではなく、echoサーバプロセスに対するメッセージの送受信の場合での測定も行った:

  • マスターノード上でechoサーバプロセスを複数起動した上で、スレーブノード群から対象のメッセージ送信を行なう
    • 起動するechoサーバプロセスの数は 利用可能なコア数*4 とした
    • リクエストを処理するプロセスが分散するため、コア数に応じて良好にスケールすることを期待
  • 一回のメッセージ送受信に要した平均時間 を計測
  • 測定関数: dist_bench:pid_msg_echo_scaleup_bench/3 (実際に指定したパラメータ等は下の実行例を参照)
%% 実行例:

%% 使用CPUコア数=8 で、RPCのスケールアップ度測定関数を実行する
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 9), 5,
                      fun () -> dist_bench:rpc_call_scaleup_bench(400000, 8, nodes()) end)

%% 使用CPUコア数=8 で、メッセージ送受信(+ echoサーバプロセス複製)のスケールアップ度測定関数を実行する
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 9), 5,
                      fun () -> dist_bench:pid_msg_echo_scaleup_bench(400000, 8, nodes()) end).

測定結果

CPUコア数 複数echoサーバ RPC
1 10.147 μs 39.781 μs
2 3.440 μs 21.818 μs
4 1.864 μs 21.186 μs
8 1.663 μs 21.388 μs
16 1.915 μs 20.740 μs
32 1.993 μs 22.576 μs

複数echoサーバプロセスに対するメッセージ送受信では、コア数の増加に伴い、最大で六倍程度まで性能が向上しているが、RPCを使った場合は性能向上は二倍程度に留まっている。
(前者のスケール度合いが六倍に留まっているのは、おそらくベンチマークプログラムの作りが悪いため。ちゃんと書けば、物理コアの数(= 16)の分まではスケールしてくれることを期待したい)

rpcモジュールのスケールアップ度合いが小さいといっても、今回の測定に従えば、一つのノードで(最大で)秒間に五万リクエスト程度は捌ける計算となるので、適材適所で使う分には問題となることは少なさそう。

感想

今回の(かなり簡易的な)測定の結果を見る限りでは、ノード管理周りやノード間のメッセージ送受信、プロセスの死活監視、といった分散Erlangの本当に基礎的な部分(機能)に関していえば、数百ノード規模でも特にスケーラビリティには問題なさそうな印象だった。
(期待通りの性能が出ており、変なボトルネック等はなさそうだった)

逆に、標準で配布されている分散系のライブラリ(ex. globalモジュール)は、クラスタサイズがそこまで大きくはならないことを想定した作りになっているものが多そうに見える(私見)ので、使用する場合は注意する必要がありそう。
(おそらくは別のライブラリを選択した方が懸命。昨日紹介されていたriak_coreはその候補?)

個人的には、メッセージ送受信等といった下回りの機能がちゃんと提供されているだけでもだいぶ助かるので、後は(今回は考慮から完全に外したが)安定性等といった性能以外の面で問題がないかどうかを確認してみて、もし大丈夫そうなら分散Erlangは、そこそこ大きな規模のクラスタを扱うための基板として結構重宝しそうだと思う。
(そもそも性能周りも、もっとちゃんとした測定を行う必要がある、という話は置いておいて...)

測定用ソースコード

dist_bench.erl
-module(dist_bench).

%%% External API
-export([
         %% utility
         make_node_specs/2,
         setup_nodes/1,
         do_bench/3,

         %% scale out benchmarks
         pid_msg_echo_bench/2,
         name_msg_echo_bench/2,
         rpc_call_bench/2,
         monitor_bench/2,
         global_bench/1,
         pg2_bench/2,

         %% scale up benchmarks
         pid_msg_echo_scaleup_bench/3,
         rpc_call_scaleup_bench/3
        ]).


%%% Internal API
-export([
         echo_server_start/1,
         do_echo/2,
         do_rpc_call/2,
         make_echo_msg/1
        ]).


%%% Types
-export_type([
              node_spec/0,
              node_specs/0,
              microseconds/0
             ]).

-type node_spec() :: {HostName::atom(), NodeCount::non_neg_integer()}. % `HostName'上で何個のノードを起動するか
-type node_specs() :: [node_spec()].

-type microseconds() :: non_neg_integer().


%%% External Functions: Utility

%% @doc `TotalNodeCount'で指定された数のノード群を生成するための`node_specs()'を生成して返す
%%
%% `HostNames'で指定された各マシン上で起動されるノード数は、極力等しい数になるように調整される
%%
%% ```
%% > make_node_specs([a,b,c], 10).
%% [{a,4},{b,3},{c,3}]
%% '''
-spec make_node_specs([HostName::atom()], TotalNodeCount::non_neg_integer()) -> node_specs().
make_node_specs(HostNames, TotalNodeCount) ->
    PerHostNodeCount = TotalNodeCount div length(HostNames),
    RemainderCount = TotalNodeCount rem length(HostNames),
    {Specs, _} =
        lists:mapfoldl(
          fun (Host, Rem) ->
                  {{Host, PerHostNodeCount + min(1, Rem)}, max(0, Rem - 1)}
          end,
          RemainderCount,
          HostNames),
    Specs.

%% @doc スレーブノード群を起動する
%%
%% 呼び出し元プロセスが終了した場合、起動したノード群も自動で終了する(リンクが貼られている)
-spec setup_nodes(node_specs()) -> {SlaveNodeCount, StartAverateTime} when
      SlaveNodeCount   :: non_neg_integer(), % 起動したスレーブノードの数
      StartAverateTime :: microseconds().    % スレーブノードの起動に要した時間の平均値
setup_nodes(NodeSpecs) ->
    [] = nodes(),
    Tooks =
        lists:append(
          [[begin
                NodeName = list_to_atom(lists:flatten(io_lib:format("slave_~p", [I]))), % スレーブノードの名前は'slave_通し番号'形式
                {Took, {ok, _}} = timer:tc(fun () -> slave:start_link(HostName, NodeName) end),
                Took
            end || I <- lists:seq(0, NodeCount - 1)]
           || {HostName, NodeCount} <- NodeSpecs]),

    {_, Bin, File} = code:get_object_code(?MODULE),
    rpc:multicall(nodes(), code, load_binary, [?MODULE, File, Bin]),

    TookAverage = lists:sum(Tooks) / max(1, length(Tooks)),
    {length(Tooks), TookAverage}.

%% @doc スレーブノード群を準備した後に、`BenchFun'で指定されたベンチマークを実行する
-spec do_bench(node_specs(), BenchCount, BenchFun) -> [Result] when
      BenchCount :: non_neg_integer(),  % `BenchFun'を何回実行するか
      BenchFun   :: fun (() -> Result), % 個々のベンチマークを実行する関数
      Result     :: term().
do_bench(NodeSpecs, BenchCount, BenchFun) ->
    {Pid, Monitor} =
        spawn_monitor(
          fun () -> % ベンチマーク終了時にスレーブノード群を確実に終了させたいので、別プロセスで実行する
                  io:format("# setup nodes ... "),
                  {NodeCount, StartAverateTime} = setup_nodes(NodeSpecs),
                  io:format("done: count=~p, start_avg_time=~p sec\n", [NodeCount, StartAverateTime / 1000000]),

                  Results =
                      [begin
                           io:format("# [~p] do bench ... ", [I]),
                           lists:foreach(fun erlang:garbage_collect/1, erlang:processes()),
                           {Took, Result} = timer:tc(BenchFun),
                           io:format("done: time=~p sec\n", [Took / 1000000]),
                           Result
                       end || I <- lists:seq(1, BenchCount)],
                  io:format("\n"),
                  exit({bench_finished, Results})
          end),
    receive
        {'DOWN', Monitor, _, _, Reason} ->
            case Reason of
                {bench_finished, Results} -> Results;
                _ -> error({bench_process_down, Pid, Reason})
            end
    end.


%%% External Functions: Scale Out Benchmarks

%% @doc PID指定によるメッセージ送受信のスケールアウト性能を測定するためのベンチマーク関数
-spec pid_msg_echo_bench(non_neg_integer(), [node()]) -> AverageEchoTime::microseconds().
pid_msg_echo_bench(EchoCount, Nodes) ->
    %% 1) リモートノードでechoサーバプロセスを起動する
    RemoteProcs = [spawn_link(Node, ?MODULE, echo_server_start, [undefined]) || Node <- Nodes],

    %% 2) `EchoCount'回だけリモートノード上のプロセスとメッセージをやりとり(echo)し、その所要時間を測定する
    {MicroSeconds, _} =timer:tc(fun () -> do_echo(EchoCount, RemoteProcs) end),

    %% 3) 後始末
    lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, RemoteProcs),
    MicroSeconds / EchoCount.  % 一回のやりとりに要した平均時間を返す

%% @doc プロセス名指定によるメッセージ送信(+ PID指定による受信)のスケールアウト性能を測定するためのベンチマーク関数
-spec name_msg_echo_bench(non_neg_integer(), [node()]) -> AverageEchoTime::microseconds().
name_msg_echo_bench(EchoCount, Nodes) ->
    %% 1) リモートノードでechoサーバプロセスを起動する
    RemoteProcs = [spawn_link(Node, ?MODULE, echo_server_start, [echo_server]) || Node <- Nodes],
    RemoteNames = [{echo_server, node(Pid)} || Pid <- RemoteProcs],
    timer:sleep(100), % NOTE: リモートノード上での名前登録が完了するのを適当な時間だけ待つ (sleepを使っているのは手抜き)

    %% 2) `EchoCount'回だけリモートノード上のプロセスとメッセージをやりとり(echo)し、その所要時間を測定する
    {MicroSeconds, _} = timer:tc(fun () -> do_echo(EchoCount, RemoteNames) end),

    %% 3) 後始末
    lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, RemoteProcs),
    MicroSeconds / EchoCount.  % 一回のやりとりに要した平均時間を返す

%% @doc rpcモジュールを用いた関数呼び出しのスケールアウト性能を測定するためのベンチマーク関数
-spec rpc_call_bench(non_neg_integer(), [node()]) -> AverageCallTime::microseconds().
rpc_call_bench(CallCount, Nodes) ->
    %% 1) `CallCount'回だけリモートノードに対してRPC呼び出しを行い、その所要時間を測定する
    {MicroSeconds, _} = timer:tc(fun () -> do_rpc_call(CallCount, Nodes) end),
    MicroSeconds / CallCount.

%% @doc `erlang:monitor/2'を用いたプロセス死活監視のスケールアウト性能を測定するためのベンチマーク関数
-spec monitor_bench(non_neg_integer(), [node()]) -> AverateTime::microseconds().
monitor_bench(ProcessCount, Nodes) ->
    %% 1) リモートノード上で`ProcessCount'個のプロセスを生成する
    RemoteProcs = [spawn(Node, timer, sleep, [infinity]) ||
                      Node <- Nodes,
                      _    <- lists:seq(1, ProcessCount, length(Nodes))], % NOTE: 剰余分が捨てられてしまう
    RealProcessCount = length(RemoteProcs),

    %% 2) リモートプロセス群に対する監視登録およびダウン通知の受信に掛かった時間を測定する
    {MicroSeconds, _} =
	timer:tc(
	  fun () ->
                  lists:foreach(fun (Pid) -> monitor(process, Pid) end, RemoteProcs),
		  lists:foreach(fun (Pid) -> exit(Pid, kill) end, RemoteProcs),
		  wait_downs(RealProcessCount) % 全ての監視プロセスの'DOWN'メッセージを受け取るまで待機する
	  end),
    MicroSeconds / RealProcessCount.

%% @doc globalモジュールの各種関数のスケールアウト性能を測定するためのベンチマーク関数
-spec global_bench(NameCount) -> [ResultEntry] when
      NameCount   :: non_neg_integer(), % グローバルなプロセス名の登録/解除/検索を何回行なうか
      ResultEntry :: {register, microseconds()}   % `global:register_name/2'呼び出しの平均所要時間
                   | {unregister, microseconds()} % `global:unregister_name/2'呼び出しの平均所要時間
                   | {whereis, microseconds()}.   % `global:whereis_name/2'呼び出しの平均所要時間
global_bench(NameCount) ->
    %% 1) ベンチマークに用いる名前およびプロセスの準備
    Names = [{?MODULE, I} || I <- lists:seq(1, NameCount)],
    Procs = [spawn_link(timer, sleep, [infinity]) || _ <- lists:seq(1, NameCount)],
    Pairs = lists:zip(Names, Procs),

    %% 2) 各種関数の実行時間を測定
    {RegisterMicros, _} = % 登録
        timer:tc(
          fun () -> lists:foreach(fun ({Name, Proc}) -> yes = global:register_name(Name, Proc) end, Pairs) end),
    {WhereisMicros, _} =  % 検索
        timer:tc(
          fun () -> lists:foreach(fun ({Name, Proc}) -> Proc = global:whereis_name(Name) end, Pairs) end),
    {UnregisterMicros, _} = % 登録解除
        timer:tc(
          fun () -> lists:foreach(fun (Name) -> global:unregister_name(Name) end, Names) end),

    %% 3) 後始末
    lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, Procs),
    [
     {register, RegisterMicros / NameCount},
     {unregister, UnregisterMicros / NameCount},
     {whereis, WhereisMicros / NameCount}
    ].

%% @doc pg2モジュールの各種関数のスケールアウト性能を測定するためのベンチマーク関数
-spec pg2_bench(GroupCount, MemberCount) -> [ResultEntry] when
      GroupCount :: non_neg_integer(),  % `pg2:create/1'を使って作成するグループの数
      MemberCount :: non_neg_integer(), % `pg2:join/2'を使ってグループに参加させるプロセスの数
      ResultEntry :: {create, microseconds()} % `pg2:create/1'呼び出しの平均所要時間
                   | {join, microseconds()}   % `pg2:join/2'呼び出しの平均所要時間
                   | {select, microseconds()}.   % `pg2:get_closest_pid/1'呼び出しの平均所要時間
pg2_bench(GroupCount, MemberCount) ->
    %% 1) ベンチマークに用いるグループ名およびプロセスの準備
    [] = pg2:which_groups(),
    Groups = [{group, I} || I <- lists:seq(1, GroupCount)],
    Members = [spawn_link(timer, sleep, [infinity]) || _ <- lists:seq(1, MemberCount)],

    %% 2) 各種関数の実行時間を測地
    {CreateMicros, _} =
        timer:tc(
          fun () -> lists:foreach(fun pg2:create/1, Groups) end),
    {JoinMicros, _} =
        timer:tc(
          fun () -> lists:foreach(fun (Pid) -> ok = pg2:join(hd(Groups), Pid) end, Members) end),
    {SelectMicros, _} =
        timer:tc(
          fun () -> lists:foreach(fun (_) -> true = is_pid(pg2:get_closest_pid(hd(Groups))) end, Members) end),

    %% 3) 後始末
    lists:foreach(fun pg2:delete/1, Groups),
    lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, Members),
    [
     {create, CreateMicros / GroupCount},
     {join, JoinMicros / MemberCount},
     {select, SelectMicros / MemberCount}
    ].


%%% External Functions: Scale Up Benchmarks

%% @doc PID指定によるメッセージ送受信のスケールアップ性能を測定するためのベンチマーク関数
-spec pid_msg_echo_scaleup_bench(non_neg_integer(), non_neg_integer(), [node()]) -> AverageEchoTime::microseconds().
pid_msg_echo_scaleup_bench(EchoCount, SchedulersOnline, Nodes) ->
    %% 1) 利用可能なスケジューラ数(≒  CPUコア数)を制限する
    OldSchedulersOnline = erlang:system_flag(schedulers_online, SchedulersOnline),

    %% 2) 自ノードにechoサーバ群を起動する
    EchoServerCount = SchedulersOnline * 4, % 利用可能なスケジューラ数の四倍のプロセスを起動しておく
    EchoServers = [spawn_link(?MODULE, echo_server_start, [undefined]) || _ <- lists:seq(1, EchoServerCount)],

    %% 3) `Nodes'上で、echoクライアント群を実行し、全ての処理が完了するまでの所要時間を計測する
    PerNodeEchoCount = EchoCount div length(Nodes), % NOTE: 剰余分が捨てられてしまう
    RealEchoCount = PerNodeEchoCount * length(Nodes),
    {MicroSeconds, _} =
        timer:tc(
          fun () ->
                  EchoClients = [monitor(process, spawn(Node, ?MODULE, do_echo, [PerNodeEchoCount, EchoServers])) || Node <- Nodes],
                  wait_downs(length(EchoClients)) % 全てのechoクライアントが終了するの待つ
          end),

    %% 4) 後始末
    lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, EchoServers),
    SchedulersOnline = erlang:system_flag(schedulers_online, OldSchedulersOnline),
    MicroSeconds / RealEchoCount.

%% @doc PID指定によるメッセージ送受信のスケールアップ性能を測定するためのベンチマーク関数
-spec rpc_call_scaleup_bench(non_neg_integer(), non_neg_integer(), [node()]) -> AverageCallTime::microseconds().
rpc_call_scaleup_bench(CallCount, SchedulersOnline, Nodes) ->
    %% 1) 利用可能なスケジューラ数(≒  CPUコア数)を制限する
    OldSchedulersOnline = erlang:system_flag(schedulers_online, SchedulersOnline),

    %% 3) `Nodes'上で、echoクライアント群を実行し、全ての処理が完了するまでの所要時間を計測する
    PerNodeCallCount = CallCount div length(Nodes), % NOTE: 剰余分が捨てられてしまう
    RealCallCount = PerNodeCallCount * length(Nodes),
    {MicroSeconds, _} =
        timer:tc(
          fun () ->
                  CallClients = [monitor(process, spawn(Node, ?MODULE, do_rpc_call, [PerNodeCallCount, [node()]])) || Node <- Nodes],
                  wait_downs(length(CallClients)) % 全てのクライアントが終了するの待つ
          end),

    %% 4) 後始末
    SchedulersOnline = erlang:system_flag(schedulers_online, OldSchedulersOnline),
    MicroSeconds / RealCallCount.


%%% Internal Functions

%% @doc `Servers'で指定されたプロセス群との間で、合計で`EchoCount'個のメッセージをやりとりする
-spec do_echo(non_neg_integer(), [Server]) -> ok when Server :: pid() | {atom(), node()}.
do_echo(EchoCount, Servers) ->
    send_messages(EchoCount, Servers, Servers), % echoメッセージを送信する
    wait_messages(EchoCount). % echobackメッセージを待つ (NOTE: 現状はメッセージの中身には考慮していない)

-spec send_messages(non_neg_integer(), [Server], [Server]) -> ok when Server :: pid() | {atom(), node()}.
send_messages(0, _, _)                -> ok;
send_messages(N, [], Servers)         -> send_messages(N, Servers, Servers); % 一巡したので最初に戻る
send_messages(N, [S | Rest], Servers) ->
    S ! {self(), N},
    send_messages(N - 1, Rest, Servers).

%% @doc `Count'個メッセージを受信するまで待機する (メッセージの内容には関知しない)
-spec wait_messages(Count::non_neg_integer()) -> ok.
wait_messages(0) -> ok;
wait_messages(N) -> receive _ -> wait_messages(N - 1) end.

-spec wait_downs(Count::non_neg_integer()) -> ok.
wait_downs(0) -> ok;
wait_downs(N) ->  receive {'DOWN', _, _, _, _} -> wait_downs(N - 1) end.

%% @doc `Nodes'で指定されたノード群に対して、合計で`CallCount'回のRPC呼び出しを行なう
-spec do_rpc_call(non_neg_integer(), [node()]) -> ok.
do_rpc_call(CallCount, Nodes) ->
    rpc_async_calls(CallCount, Nodes, Nodes),
    wait_messages(CallCount). % RPCのレスポンスを待つ (NOTE: 現状はメッセージの中身には考慮していない)

-spec rpc_async_calls(non_neg_integer(), [node()], [node()]) -> ok.
rpc_async_calls(0, _, _)                -> ok;
rpc_async_calls(N, [], Nodes)           -> rpc_async_calls(N, Nodes, Nodes);% 一巡したので最初に戻る
rpc_async_calls(N, [Dst | Rest], Nodes) ->
    rpc:async_call(Dst, ?MODULE, make_echo_msg, [N]),
    rpc_async_calls(N - 1, Rest, Nodes).

%% @doc エコーサーバを起動する
-spec echo_server_start(Name) -> no_return() when Name :: atom() | undefined.
echo_server_start(undefined) ->
    echo_server_loop();
echo_server_start(Name) ->
    true = register(Name, self()), % `Name =/= undefined'なら名前付きプロセスとして起動する
    echo_server_loop().

-spec echo_server_loop() -> no_return().
echo_server_loop() ->
    receive
        {From, Msg} ->
            From ! make_echo_msg(Msg),
            echo_server_loop()
    end.

-spec make_echo_msg(term()) -> term().
make_echo_msg(Msg) ->
    {self(), Msg}.
56
50
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
56
50

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?