概要
1台のWindows7上に2ノードから成るRabbitMQのクラスターを構築する方法を説明します。
また、簡単ですがRabbitMQの特徴やrabbitmqctlコマンドによるRabbitMQサーバーの操作方法なども説明します。
この記事の内容は下記のバージョンで動作確認を行いました。
- Windows7 (64bit)
- RabbitMQ 3.5.3
- Erlang/OTP 17.5
1. インストールと初期設定
本格的な説明はRabbitMQのInstalling on Windows (manual)ページにあります。
Erlnagのインストール
RabbitMQの実行にはErlangが必要になりますので事前にインストールを行います。
[ダウンロードページ] (http://www.erlang.org/download.html)より、インストーラーをダウンロードして実行します。
今回使用したファイルはotp_win64_17.5.exeです。
初期設定
環境変数のERLANG_HOMEにerlangのインストールディレクトリを設定します。
環境変数のpathに%ERLANG_HOME%\binを追加します。
インストール後の確認
念のためコマンドプロンプトからerl.exeを実行してshellが起動するか確認します。
RabbitMQのインストール
1台のPCにRabbitMQを2ノード起動しクラスターを構成します。
構成は下表の通りです。
| node name | port | port(ssl) | port(management) |
|---|---|---|---|
| rabbit_node1@localhost | 5672 | 5671 | 15672 |
| rabbit_node2@localhost | 5674 | 5673 | 15674 |
[ダウンロードページ] (http://www.rabbitmq.com/install-windows-manual.html)より、アーカイブファイルをダウンロードします。
今回ダウンロードしたファイルはrabbitmq-server-windows-3.5.3.zipです。
アーカイブファイルを適当な場所へ解凍します。
この記事では下記の場所へ準備しました。
node1
C:\rabbitmq-3.5.3\rabbitmq_server-3.5.3_node1
node2
C:\rabbitmq-3.5.3\rabbitmq_server-3.5.3_node2
初期設定
baseディレクトリを作成します。
node1用
C:\rabbitmq-3.5.3\node1
node2用
C:\rabbitmq-3.5.3\node2
db、logファイル用のディレクトリを作成します。
node1用
C:\rabbitmq-3.5.3\node1\db
C:\rabbitmq-3.5.3\node1\log
node2用
C:\rabbitmq-3.5.3\node2\db
C:\rabbitmq-3.5.3\node2\log
confファイルを用意します。
テンプレートファイルがありますのでこれを利用します。
C:\rabbitmq-3.5.3\rabbitmq_server-3.5.3_node1\etc\rabbitmq.config.exampleファイルをrabbitmq.configへリネームして下記の場所へコピーします。
node1用
C:\rabbitmq-3.5.3\node1\rabbitmq.config
node2用
C:\rabbitmq-3.5.3\node2\rabbitmq.config
次にrabbitmq.configを編集します。
下記の内容は変更点だけ抜粋したものです。(ポートやログファイルの出力先を記述します。)
node1用
{rabbit,
[
{tcp_listeners, [5672]},
{ssl_listeners, [5671]},
]},
{rabbitmq_management,
[
{http_log_dir, "C:\rabbitmq-3.5.3\node1\log\access.log"},
{listener, [{port, 15672},
{ip, "127.0.0.1"}]
},
{rates_mode, basic}
]},
node2用
{rabbit,
[
{tcp_listeners, [5674]},
{ssl_listeners, [5673]},
]},
{rabbitmq_management,
[
{http_log_dir, "C:\rabbitmq-3.5.3\node2\log\access.log"},
{listener, [{port, 15674},
{ip, "127.0.0.1"}]
},
{rates_mode, basic}
]},
次にbatファイルを修正します。
batファイルはインストールディレクトリのsbin下にあります。
node1用
rabbitmqctl.bat
setlocal enabledelayedexpansion
+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node1
+ set RABBITMQ_NODENAME=rabbit_node1@localhost
if "!RABBITMQ_BASE!"=="" (
rabbitmq-server.bat
setlocal enabledelayedexpansion
+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node1
+ set RABBITMQ_NODENAME=rabbit_node1@localhost
+ set RABBITMQ_NODE_PORT=5672
if "!RABBITMQ_USE_LONGNAME!"=="" (
rabbitmq-plugins.bat
setlocal enabledelayedexpansion
+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node1
+ set RABBITMQ_NODENAME=rabbit_node1@localhost
if "!RABBITMQ_SERVICENAME!"=="" (
node2用
rabbitmqctl.bat
setlocal enabledelayedexpansion
+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node2
+ set RABBITMQ_NODENAME=rabbit_node2@localhost
if "!RABBITMQ_BASE!"=="" (
rabbitmq-server.bat
setlocal enabledelayedexpansion
+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node2
+ set RABBITMQ_NODENAME=rabbit_node2@localhost
+ set RABBITMQ_NODE_PORT=5674
if "!RABBITMQ_USE_LONGNAME!"=="" (
rabbitmq-plugins.bat
setlocal enabledelayedexpansion
+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node2
+ set RABBITMQ_NODENAME=rabbit_node2@localhost
if "!RABBITMQ_SERVICENAME!"=="" (
インストール後の確認
node1、node2でRabbitMQサーバーを起動します。
> rabbitmq-server.bat -detached
起動ステータスを確認します。
> rabbitmqctl.bat status
また、RabbitMQサーバーの停止は下記のコマンドで行います。
> rabbitmqctl.bat stop
rabbitmq_managementプラグインの有効化
managementプラグインはRabbitMQサーバーの監視やエクスチェンジ、キュー、ユーザーなどを操作できるブラウザベースの管理機能です。
node1、node2でプラグインを有効化します。
node1
> rabbitmq-plugins.bat enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit_node1@localhost... started 6 plugins.
node2
> rabbitmq-plugins.bat enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit_node2@localhost... started 6 plugins.
managementプラグインにアクセスします。
ブラウザから下記のURLにアクセスしてログインページが表示されるか確認します。
デフォルトのユーザー/パスワードはguest/guestです。
node1
http://localhost:15672/
node2
http://localhost:15674/
managementプラグイン用の管理者ユーザーの作成
最初から用意されているguestユーザーを削除し、その代わりの管理者ユーザーを作成します。
ユーザーの作成はmanagementプラグインとrabbitmqctlコマンドから行うことができます。
この記事では下記の情報でユーザーを作成しました。
| name | password | tag | vhost |
|---|---|---|---|
| admin | admin | administrator | / |
タグでロールを設定します。
タグに使用できる名前は下表の通りです。
| name | description |
|---|---|
(none) |
managementプラグインにアクセスできません。 |
management |
|
policymaker |
management+ |
monitoring |
management+ |
administrator |
policymaker+, management+ |
rabbitmqctlコマンドを使用してユーザーを作成します
> rabbitmqctl.bat -n rabbit_node1@localhost add_user admin admin
Creating user "admin" ...
> rabbitmqctl.bat -n rabbit_node1@localhost set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
上記ではデフォルトのバーチャルホスト(/)にアクセス権限を付与していますが、バーチャルホストを増やした場合は、そのバーチャルホストに対してもアクセス権限を付与する必要があります。(権限を付与しないとmanagementプラグイン上で操作ができません。)
guestユーザーを削除します
> rabbitmqctl.bat -n rabbit_node1@localhost delete_user guest
Deleting user "guest" ...
参考
[Management Plugin] (https://www.rabbitmq.com/management.html)
クラスターの構築
クラスターの構築はnode1、node2を起動した状態で行います。
今回はnode2をnode1へ参加させます。(node1がマスター、node2がスレーブになります。)
構築前のクラスターの状態を確認します
> rabbitmqctl.bat -n rabbit_node1@localhost cluster_status
Cluster status of node rabbit_node1@localhost ...
[{nodes,[{disc,[rabbit_node1@localhost]}]},
{running_nodes,[rabbit_node1@localhost]},
{cluster_name,<<"rabbit_node1@localhost">>},
{partitions,[]}]
クラスターを構築します
> rabbitmqctl.bat -n rabbit_node2@localhost stop_app
Stopping node rabbit_node2@localhost ...
> rabbitmqctl.bat -n rabbit_node2@localhost join_cluster rabbit_node1@localhost
Clustering node rabbit_node2@localhost with rabbit_node1@localhost ...
> rabbitmqctl.bat -n rabbit_node2@localhost start_app
Starting node rabbit_node2@localhost ...
構築後のクラスターの状態を確認します
> rabbitmqctl.bat -n rabbit_node1@localhost cluster_status
Cluster status of node rabbit_node2@localhost ...
[{nodes,[{disc,[rabbit_node1@localhost,rabbit_node2@localhost]}]},
{running_nodes,[rabbit_node1@localhost,rabbit_node2@localhost]},
{cluster_name,<<"rabbit_node1@localhost">>},
{partitions,[]}]
クラスターの解除方法
一時的にクラスターから外す方法
forget_cluster_nodeを使用します。クラスターへ参加させるにはもう一度join_clusterを使用します。
> rabbitmqctl.bat -n rabbit_node2@localhost stop_app
Stopping node rabbit_node2@localhost ...
> rabbitmqctl.bat -n rabbit_node1@localhost forget_cluster_node rabbit_node2@localhost
Resetting node rabbit_node2@localhost from cluster ...
クラスターから外す方法
> rabbitmqctl.bat -n rabbit_node2@localhost stop_app
Stopping node rabbit_node2@localhost ...
> rabbitmqctl.bat -n rabbit_node2@localhost reset
Resetting node rabbit_node2@localhost ...
> rabbitmqctl.bat -n rabbit_node2@localhost start_app
Starting node rabbit_node2@localhost ...
参考
[Clustering Guide] (https://www.rabbitmq.com/clustering.html)
[Highly Available Queues] (https://www.rabbitmq.com/ha.html)
2. RabbitMQの構成
ここからは次のシナリオに基づいて登録を行いながら各要素を説明します。
- バーチャルホストを2つ(
v1、v2)登録します。 - それぞれのバーチャルホストへアクセスできるユーザー(
v1_user、v2_user)を登録します。
v1バーチャルホスト
-
v1に2つのエクスチェンジ(notice.direct、broadcast.fanout)を登録します。 -
v1に3つのキュー(userid:1000、userid:1001、userid:2002)を登録します。 -
notice.directからキューへのルーティングは固有のルーティングキーによって行います。 -
broadcast.fanoutからはすべてのキューへルーティングします。
v2バーチャルホスト
-
v2に1つのエクスチェンジ(notice.topic)を登録します。 -
v2に4つのキュー(userid:1000.info、userid:1000.warn、userid:1001.info、userid:1001.warn)を登録します。 -
notice.topicからキューへのルーティングはルーティングキーの部分マッチによって行います。
キューのポリシー
v1のキューuserid:1000、userid:1001に「user1x」ポリシーを適用します。
v1のキューuserid:2002に「user2x」ポリシーを適用します。
v2のキューuserid:1000.info、userid:1001.infoに「info」ポリシーを適用します。
v2のキューuserid:1000.warn、userid:1001.warnに「warn」ポリシーを適用します。
ポリシーの定義内容
| name | pattern | definition | description |
|---|---|---|---|
| user1x | ^userid:[1].* | max-length:50 | メッセージをキューに最大50件まで保持 |
| user2x | ^userid:[2].* | max-length:100 | メッセージをキューに最大100件まで保持 |
| info | ^userid:.*.info | message-ttl:1800000 | メッセージの生存期間を30分まで |
| warn | ^userid:.*.warn | message-ttl:3600000 | メッセージの生存期間を60分まで |
[vhost] [exchange] [routing key] [queue]
----------+----------------------+------------------+-------------------
v1 ---> notice.direct ---> '1000' ---> userid:1000
---> '1001' ---> userid:1001
---> '2002' ---> userid:2002
---> broadcast.fanout ----------------------> userid:1000
----------------------> userid:1001
----------------------> userid:2002
v2 ---> notice.topic ---> '*.info' ---> userid:1000.info
---> '*.info' ---> userid:1001.info
---> '*.warn' ---> userid:1000.warn
---> '*.warn' ---> userid:1001.warn
登録の説明はrabbitmqctlコマンドとmanagementプラグインの両方で行いますが、一部の登録はrabbitmqctlではできません。
それを下表にまとめました。
| 操作 | rabbitmqctlコマンド | managementプラグイン | desc |
|---|---|---|---|
| バーチャルホストの作成 | 可 (add_vhost) |
可 | |
| ユーザーの登録 | 可 (add_user) |
可 | |
| アクセスコントロール | 可 (set_permissions) |
可 | |
| エクスチェンジの作成 | 不可 | 可 | バーチャルホスト別 |
| キューの作成 | 不可 | 可 | バーチャルホスト別 |
| バインド | 不可 | 可 | |
| ポリシーの作成 | 可 (set_policy) |
可 | バーチャルホスト別 |
プロデューサー (producer) / コンシューマー (consumer)
- プロデューサーはメッセージを配信するアプリケーションで、メッセージの配信をブローカーへ依頼します。
- コンシューマーはメッセージを受信するアプリケーションで、メッセージをブローカーから受け取ります。
ブローカー (broker)
RabbitMQサーバーのことです。メッセージの中継を担当します。ブローカーはバーチャルホスト、エクスチェンジ、キューといったコンポーネントから構成されます。
アプリケーション間のメッセージ送受信をブローカーが中継することでアプリケーション同士を疎結合にすることができます。
バーチャルホスト (virtual host)
バーチャルホストは、エクスチェンジやキューを意味のあるグループに分割します。
デフォルトのバーチャルホスト(/)が事前に定義されていて、rabbitmqctlコマンドでバーチャルホストの指定を省略した場合、このデフォルトのバーチャルホストが使用されることがあります。
バーチャルホストの作成はrabbitmqctlコマンド、managementプラグインで行うことができます。
バーチャルホストの一覧を表示します
list_vhosts [vhostinfoitem ...]
> rabbitmqctl.bat -n rabbit_node1@localhost list_vhosts
Listing vhosts ...
/
バーチャルホストの作成
rabbitmqctを使用する場合
rabbitmqctlコマンドで作成するにはadd_vhostを使用します。
add_vhost {vhostpath}
v1バーチャルホストを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost add_vhost v2
Creating vhost "v2" ...
v2バーチャルホストを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost add_vhost v2
Creating vhost "v2" ...
managementプラグインを使用する場合
メニューバーの[Admin]→[Virtual Hosts]を選択します。
Nameにバーチャルホスト名を入力して登録します。
v1バーチャルホストを作成します。
| field | value |
|---|---|
| Name | v1 |
v2バーチャルホストを作成します。
| field | value |
|---|---|
| Name | v2 |
登録後はこうなります。
作成直後のバーチャルホストにはアクセスできるユーザーが存在しませんので、続いてユーザーの登録とそのユーザーにアクセスコントロールの設定を行う必要があります。
ユーザー (user)
バーチャルホストへアクセスするには適切な権限が付与されたユーザーが必要です。
ユーザーの一覧を表示します
list_users
> rabbitmqctl -n rabbit_node1@localhost list_users
Listing users ...
admin [administrator]
ユーザーの登録
RabbitMQのユーザーは、エクスチェンジへメッセージを送信するproducerアプリケーション、キューからメッセージを受信するconsumerアプリケーションでも使用します。
ユーザーの登録はrabbitmqctlコマンド、managementプラグインで行うことができます。
rabbitmqctを使用する場合
rabbitmqctlコマンドで作成するにはadd_userを使用します。
add_user {username} {password}
v1_userを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost add_user v1_user pass
Creating user "v1_user" ...
v2_userを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost add_user v2_user pass
Creating user "v2_user" ...
managementプラグインを使用する場合
メニューバーの[Admin]→[Users]を選択します。
Usernameにユーザー名、Passwordにパスワードを入力して登録します。Tagsは入力不要です。
v1_userを作成します。
| field | value |
|---|---|
| Username | v1_user |
| Password | pass |
| Tags | (nothing) |
v2_userを作成します。
| field | value |
|---|---|
| Username | v2_user |
| Password | pass |
| Tags | (nothing) |
登録後はこうなります。
続いてユーザーへアクセスコントロールの設定を行います。
アクセスコントロール
ユーザーがアクセスできるバーチャルホストと権限を設定します。
rabbitmqctを使用する場合
rabbitmqctlコマンドでアクセスコントロールを設定するにはset_permissionsを使用します。
set_permissions [-p vhostpath] {user} {conf} {write} {read}
| option | value |
|---|---|
-p |
ユーザーのアクセス権限を付与するバーチャルホストを指定します。デフォルトは/です |
{user} |
ユーザーを指定します。 |
{conf} |
設定権限を与えるリソース名を正規表現で指定します。 |
{write} |
書き込み権限を与えるリソース名を正規表現で指定します。 |
{read} |
読み込み権限を与えるリソース名を正規表現で指定します。 |
全権限を与える場合は".*"、権限を与えない場合は"^$"
v1_userにv1バーチャルホストに対する権限を作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v1 v1_user "^$" ".*" ".*"
Setting permissions for user "v1_user" in vhost "v1" ...
v2_userにv2バーチャルホストに対する権限を作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v2 v2_user "^$" ".*" ".*"
Setting permissions for user "v2_user" in vhost "v2" ...
managemanetプラグインの管理ユーザーにも権限を与えます。
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v1 admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "v1" ...
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v2 admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "v2" ...
ユーザーの付与されたパーミッションの一覧を表示します
list_user_permissions {username}
> rabbitmqctl.bat -n rabbit_node1@localhost list_user_permissions admin
Listing permissions for user "admin" ...
/ .* .* .*
v1 .* .* .*
v2 .* .* .*
> rabbitmqctl.bat -n rabbit_node1@localhost list_user_permissions v1_user
Listing permissions for user "v1_user" ...
v1 ^$ .* .*
> rabbitmqctl.bat -n rabbit_node1@localhost list_user_permissions v2_user
Listing permissions for user "v2_user" ...
v2 ^$ .* .*
managementプラグインを使用する場合
メニューバーの[Admin]→[Users]を選択しアクセスコントロールを設定したいユーザー名をクリックします。
アクセスを許可するVirtual Hostをリストから選択し、権限を与えるリソースを正規表現で指定します。
v1_userにv1バーチャルホストに対する権限を作成します。
| field | value |
|---|---|
| Virtual Host | v1 |
| Configure regexp | ^$ |
| Write regexp | .* |
| Read regexp | .* |
v2_userにv2バーチャルホストに対する権限を作成します。
| field | value |
|---|---|
| Virtual Host | v2 |
| Configure regexp | ^$ |
| Write regexp | .* |
| Read regexp | .* |
adminにもv1、v2バーチャルホストに対する権限を作成します。
登録後はこのようになります。
エクスチェンジ (exchange)
プロデューサーから受け取ったメッセージを、バインドの設定にしたがってキューへ配送します。
エクスチェンジの種類
fanout
エクスチェンジにバインドされているすべてのキューへメッセージを配送します。ルーティングキーは使用しません。
メッセージのブロードキャストに向いています。
direct
エクスチェンジにバインドされているキューのうち、メッセージのルーティングキーとバインドのルーティングキーが一致するキューへメッセージを配送します。
メッセージを特定のキューへ配送するのに向いています。
topic
directと似ていますが、メッセージのルーティングキーとバインドのルーティングキーパターンがマッチングするキューへメッセージを配送します。
headers
メッセージのヘッダーに基づいて配送先のキューを決定します。ルーティングキーは使用しません。
組み込みエクスチェンジ
事前に組み込まれているエクスチェンジがあります。これらのエクスチェンジはバーチャルホスト毎に用意されています。これらをそのまま使用することも、独自のエクスチェンジを作成することも可能です。
| name | type |
|---|---|
| (AMQP default) | direct |
amq.direct |
direct |
amq.fanout |
fanout |
amq.headers |
headers |
amq.match |
headers |
amq.rabbitmq.log |
topic |
amq.rabbitmq.trace |
topic |
amq.topic |
topic |
AMQP default : メッセージの配信先(エクスチェンジ)を明示的に指定しなかったときに使用されるデフォルトのエクスチェンジです。このエクスチェンジは暗黙的にすべてのキューにバインドされており、また削除することもできません。
エクスチェンジの作成
エクスチェンジの作成はrabbitmqctlコマンドではできませんので、managementプラグインかclientアプリケーションを使用します。
managementプラグインを使用する
メニューバーの[Exchanges]を選択します。
エクスチェンジはバーチャルホスト毎に作成する必要があります。作成対象のVirtual Hostを選び、Nameにエクスチェンジ名を入力します。ただし名前の先頭に"amp."を付けることはできません。
v1バーチャルホストにnotice.directエクスチェンジを作成します。
| field | value |
|---|---|
| Virtual Host | v1 |
| Name | notice.direct |
| Type | direct |
| Durability | Durable |
| Auto Delete | No |
| Internal | No |
| Arguments | (nothing) |
v1バーチャルホストにbroadcast.fanoutエクスチェンジを作成します。
| field | value |
|---|---|
| Virtual Host | v1 |
| Name | broadcast.fanout |
| Type | fanout |
| Durability | Durable |
| Auto Delete | No |
| Internal | No |
| Arguments | (nothing) |
v2バーチャルホストにnotice.topicエクスチェンジを作成します。
| field | value |
|---|---|
| Virtual Host | v2 |
| Name | notice.topic |
| Type | topic |
| Durability | Durable |
| Auto Delete | No |
| Internal | No |
| Arguments | (nothing) |
登録後はこのようになります。(*青線で囲った部分が登録したエクスチェンジです。その他は事前定義のエクスチェンジです。)
properties
エクスチェンジに設定できるプロパティは下記の通りです。
| property | value | description |
|---|---|---|
Durability |
|
Durable:エクスチェンジを永続化します、Transient:一時的なエクスチェンジとします。 |
Auto Delete |
|
YES:バインドするキューが無くなった場合にエクスチェンジを削除します。 |
Internal |
|
YES:エクスチェンジからしかメッセージを受け取りません。エクスチェンジ to エクスチェンジ用です。 |
arguments
| arguments | type | description |
|---|---|---|
alternate-exchange |
String | If messages to this exchange cannot otherwise be routed, send them to the alternate exchange named here. |
キュー (queue)
エクスチェンジから配送されてきたメッセージの保管場所です。
1つのキューは1つのノード上に作成します(RabbitMQサーバーをクラスター構成にしていても)が、ポリシーの設定によってキューをミラーリングすることができます。
キューの作成
キューの作成はrabbitmqctlコマンドではできませんので、managementプラグインかclientアプリケーションを使用します。
managementプラグインを使用する
メニューバーの[Queues]を選択します。
キューはバーチャルホスト毎に作成する必要があります。作成対象のVirtual Hostを選び、Nameにキュー名を入力します。ただし名前の先頭に"amp."を付けることはできません。
v1バーチャルホストにuserid:1000、userid:1001、userid:2002を作成します。
| field | value |
|---|---|
| Virtual Host | v1 |
| Name | userid:1000、userid:1001、userid:2002 |
| Durability | Durable |
| Node | rabbit_node1@localhost |
| Auto Delete | No |
| Arguments | x-max-priority = 9 |
v2バーチャルホストにuserid:1000.info、userid:1000.warn、userid:1001.info、userid:1001.warnを作成します。
| field | value |
|---|---|
| Virtual Host | v2 |
| Name | userid:1000.info、userid:1000.warn、userid:1001.info、userid:1001.warn |
| Durability | Durable |
| Node | rabbit_node1@localhost |
| Auto Delete | No |
| Arguments | x-max-priority = 9 |
登録後はこのようになります。
properties
キューに設定できるプロパティは下記の通りです。
| property | value | description |
|---|---|---|
Durability |
|
Durable:キューを永続化します、Transient:一時的なキューとします。 |
Node |
リストから選択します | キューを作成するノードを指定します |
Auto delete |
|
YES:consumerからの接続が無くなった場合にキューを削除します。 |
Durabilityとは、キューの登録情報をRabbitMQサーバーの停止後も保存するかを指定するプロパティです。Transientの場合、RabbitMQサーバーを停止させるとそのキューは無くなります。
arguments
| arguments | type | description |
|---|---|---|
x-message-ttl |
Number | メッセージの生存期間(ミリ秒)を指定します。 |
x-expires |
Number | キューの有効期間(ミリ秒)を指定します。ここで指定した時間使用されなかった場合キューを削除します。 |
x-max-length |
Number | キューに保存するメッセージの最大数(これを超えると古いメッセージから削除)を指定します。 |
x-max-length-bytes |
Number | キューに保存するメッセージの最大合計サイズ数 (これを超えると古いメッセージから削除)を指定します。 |
x-dead-letter-exchange |
String | dead-letterになったメッセージの転送先のエクスチェンジ名を指定します。 |
x-dead-letter-routing-key |
String | dead-letterになったメッセージに付けるルーティングキーを指定します。 |
x-max-priority |
Number | プライオリティの最大値を指定します。大きい値ほど優先度が高くなります。 |
x-match |
String |
all:すべての条件がマッチする場合に受信する。any:条件が1つでもマッチする場合に受信する。 |
バインド (binding)
エクスチェンジとキューを関連付けます。関連は「エクスチェンジからキュー」と「エクスチェンジからエクスチェンジ」の2通りが指定できます。
エクスチェンジが受け取ったメッセージの配送先は関連付けられたキューの中から選択されますが、エクスチェンジのタイプがdirectかtopicの場合は、ルーティングキーによって配送のキューを分岐させることができます。
headersの場合は、ヘッダーに指定した条件によって配送先のキューを分岐させることができます。
ルーティングキー (routing key)
エクスチェンジのタイプがdirectかtopicのとき、バインドにルーティングキーを設定します。
このルーティングキーとメッセージが持つルーティングキーが比較され配送するキューが決定されます。
direct
directの場合、メッセージのルーティングキーがバインドのルーティングキーと一致した場合に、そのバインド先のキューが選択されます。
+- [message]--------------+ +-[ direct exchange]------------+
| | | |
| routing key : ? |----->| +-[binding] A ----------+ | +-[ queue ]-----+
| | | | routing key : 1000 |-------->| |
| payload { | | | | | +---------------+
| ... | | +-----------------------+ |
| } | | +-[binding] B ----------+ | +-[ queue ]-----+
+-------------------------+ | | routing key : 1001 |-------->| |
| | | | +---------------+
| +-----------------------+ |
| +-[binding] C ----------+ | +-[ queue ]-----+
| | routing key : 2000 |-------->| |
| | | | +---------------+
| +-----------------------+ |
+-------------------------------+
上図のdirect_exchange例では
- メッセージのrouting keyが1000の場合、binding Aが使用され、その先のキューへメッセージが配送されます。
- メッセージのrouting keyが1001の場合、binding Bが使用され、その先のキューへメッセージが配送されます。
topic
topicの場合、メッセージのルーティングキーがバインドのルーティングキーパターンと一致する場合に、そのバインド先のキューが選択されます。一見すると正規表現のように見えますがそうではありませんので注意が必要です。
バインドに記述するルーティングキーは
- .(ドット)で区切られた単語から構成します。
- キーの最大文字長は255バイトです。
- 単語には特殊な意味を持つ
*、#を使用することができます。 -
*は1個の単語と置き換えられます。 -
#は0個または1個以上の単語と置き換えられます。
| pattern | マッチするパターン | マッチしないパターン |
|---|---|---|
A.* |
A.A、A.B、A.Cなど |
A、A.B.Cなど |
A.# |
A、A.A、A.A.A
|
AA、AA.Bなど |
A.#.Z |
A.Z、A.B.Z、A.B.C.Z
|
A、A.B、A.B.Cなど |
+- [message]--------------+ +-[ topic exchange]----------------+
| | | |
| routing key : ? |----->| +-[binding] A -------------+ | +-[ queue ]-----+
| | | | routing key : id.*.info |-------->| |
| payload { | | | | | +---------------+
| ... | | +--------------------------+ |
| } | | +-[binding] B -------------+ | +-[ queue ]-----+
+-------------------------+ | | routing key : id.*.warn |-------->| |
| | | | +---------------+
| +--------------------------+ |
| +-[binding] C -------------+ | +-[ queue ]-----+
| | routing key : broad.# |-------->| |
| | | | +---------------+
| +--------------------------+ |
+----------------------------------+
上図のtopics_exchange例では
- メッセージのrouting keyがid.1000.infoの場合、binding Aが使用され、その先のキューへメッセージが配送されます。
- メッセージのrouting keyがid.1001.warnの場合、binding Bが使用され、その先のキューへメッセージが配送されます。
- メッセージのrouting keyがbroad.update.2000や、broad.delete.2001の場合、binding Cが使用され、その先のキューへメッセージが配送されます。
引数 (arguments)
headers
エクスチェンジのタイプがheadersのときに、バインドの引数に任意の分岐条件を設定できます。
この分岐条件とメッセージのヘッダーが比較され配送先のキューが決定されます。
headers exchangeのバインドは引数にx-matchを設定する必要があります。設定できる値はanyかallです。
-
any設定は、バインドの分岐条件の何れかを満たした場合に使用されます。 -
all設定は、バインドの分岐条件のすべてを満たした場合に使用されます。
+- [message]--------------+ +-[ headers exchange]-----------+
| | | |
| arguments |----->| +-[binding] A --------+ | +-[ queue ]-----+
| arg1 = ? | | | x-match : any |---------->| |
| arg2 = ? | | | arg1 : 100 | | +---------------+
| arg3 = ? | | | arg2 : 200 | |
| arg4 = ? | | +---------------------+ |
| | | +-[binding] B --------+ | +-[ queue ]-----+
| payload { | | | x-match : any |---------->| |
| ... | | | arg3 : 300 | | +---------------+
| } | | | arg4 : 400 | |
+-------------------------+ | +---------------------+ |
| +-[binding] C --------+ | +-[ queue ]-----+
| | x-match : all |---------->| |
| | arg1 : 100 | | +---------------+
| | arg3 : 300 | |
| +---------------------+ |
+-------------------------------+
上図のheaders_exchange例では
- メッセージのarg1引数が100、またはarg2引数が200の場合、binding Aが使用され、その先のキューへメッセージが配送されます。
- メッセージのarg3引数が300、またはarg4引数が400の場合、binding Bが使用され、その先のキューへメッセージが配送されます。
- メッセージのarg1引数が100、且つarg3引数が300の場合、binding A,B,Cが使用され、それぞれのキューへメッセージが配送されます。
エクスチェンジとキューのバインド
バインドはrabbitmqctlコマンドではできませんので、managementプラグインかclientアプリケーションを使用します。
managementプラグインを使用する
メニューバーの[Exchanges]を選択します。
エクスチェンジの一覧が表示されますので、その中からバインドの設定を行いたいエクスチェンジの名前をクリックします。
Bindings欄にある「Add binding from this exchange」というフォームからバインドの設定内容を登録します。
v1バーチャルホストのnotice.directにuserid:1000、userid:1001、userid:2002へのバインドを作成します。
| field | value |
|---|---|
| To queue | userid:1000 |
| Routing key | 1000 |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:1001 |
| Routing key | 1001 |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:2002 |
| Routing key | 2002 |
| Arguments | (nothing) |
登録後はこうなります。
v1バーチャルホストのbroadcast.fanoutにuserid:1000、userid:1001、userid:2002へのバインドを作成します。
| field | value |
|---|---|
| To queue | userid:1000 |
| Routing key | (nothing) |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:1001 |
| Routing key | (nothing) |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:2002 |
| Routing key | (nothing) |
| Arguments | (nothing) |
登録後はこうなります。
v2バーチャルホストのnotice.topicにuserid:1000.info、userid:1000.warn、userid:1001.info、userid:1001.warnへのバインドを作成します。
| field | value |
|---|---|
| To queue | userid:1000.info |
| Routing key | *.info |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:1000.warn |
| Routing key | *.warn |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:1001.info |
| Routing key | *.info |
| Arguments | (nothing) |
| field | value |
|---|---|
| To queue | userid:1001.warn |
| Routing key | *.warn |
| Arguments | (nothing) |
登録後はこうなります。
ポリシー (policy)
エクスチェンジ、キューの動作をポリシーによって制御できます。ポリシーはバーチャルホスト毎に作成します。
ポリシーはrabbitmqctlコマンドかmanagementプラグインで作成することができます。
ポリシーの一覧を表示します
list_policies [-p vhostpath]
> rabbitmqctl.bat -n rabbit_node1@localhost list_policies -p /
Listing policies ...
> rabbitmqctl.bat -n rabbit_node1@localhost list_policies -p v1
Listing policies ...
> rabbitmqctl.bat -n rabbit_node1@localhost list_policies -p v2
Listing policies ...
ポリシーの作成
rabbitmqctlコマンドを使用する場合
rabbitmqctlコマンドでポリシーを作成するにはset_policyを使用します。
set_policy [-p vhostpath] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
| option | value |
|---|---|
-p |
ポリシーを適用するバーチャルホストを指定します。 |
--priority |
ポリシーの優先度を指定します。 |
--apply-to |
all,exchanges,queuesから選びます。 |
{name} |
ポリシーの名前を指定します。 |
{pattern} |
ポリシーを適用する対象を正規表現で指定します。 |
{definition} |
ポリシーの内容を指定します。 |
v1バーチャルホストにuser1xポリシーを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost -p v1 set_policy --priority 0 --apply-to queues "user1x" "^userid:[1].*" "{""max-length"":50}"
Setting policy "user1x" for pattern "^userid:[1].*" to "{\"max-length\":50}" with priority "0" ...
v1バーチャルホストにuser2xポリシーを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost -p v1 set_policy --priority 0 --apply-to queues "user2x" "^userid:[2].*" "{""max-length"":100}"
Setting policy "user2x" for pattern "^userid:[2].*" to "{\"max-length\":100}" with priority "0" ...
v2バーチャルホストにinfoポリシーを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 set_policy --priority 0 --apply-to queues "info" "^userid:.*\.info" "{""message-ttl"":1800000}"
Setting policy "info" for pattern "^userid:.*\\.info" to "{\"message-ttl\":1800000}" with priority "0" ...
v2バーチャルホストにwarnポリシーを作成します。
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 set_policy --priority 0 --apply-to queues "warn" "^userid:.*\.warn" "{""message-ttl"":3600000}"
Setting policy "warn" for pattern "^userid:.*\\.warn" to "{\"message-ttl\":3600000}" with priority "0" ...
managementプラグインを使用する場合
メニューバーの[Admin]→[Policies]を選択します。
ポリシーはバーチャルホスト毎に作成する必要があります。作成対象のVirtual Hostを選び、Nameにポリシー名を入力します。
v1バーチャルホストにuser1xポリシーを作成します。
| field | value |
|---|---|
| Virtual Host | v1 |
| Name | user1x |
| Pattern | ^userid:[1].* |
| Apply to | queues |
| Priority | 0 |
| Definition | max-length = 50 |
v1バーチャルホストにuser2xポリシーを作成します。
| field | value |
|---|---|
| Virtual Host | v1 |
| Name | user2x |
| Pattern | ^userid:[2].* |
| Apply to | queues |
| Priority | 0 |
| Definition | max-length = 100 |
v2バーチャルホストにinfoポリシーを作成します。
| field | value |
|---|---|
| Virtual Host | v2 |
| Name | info |
| Pattern | ^userid:.*.info |
| Apply to | queues |
| Priority | 0 |
| Definition | message-ttl = 1800000 |
v2バーチャルホストにwarnポリシーを作成します。
| field | value |
|---|---|
| Virtual Host | v2 |
| Name | warn |
| Pattern | ^userid:.*.warn |
| Apply to | queues |
| Priority | 0 |
| Definition | message-ttl = 3600000 |
登録後はこのようになります。
また、キューの一覧でも適用されるポリシーが確認できます。
properties
ポリシーに設定できるプロパティは下記の通りです。
Pattern
ポリシーを適用するリソース名を正規表現で指定します。
".*"とすると全てのリソースに適用されます。
Apply to
適用するリソースをリストから選択します。
- Exchanges and queues
- Exchanges
- Queues
Priority
ポリシーの優先度を設定します。
パターンにマッチするポリシーが複数あった場合、優先度の高いポリシーが適用されます。大きい値がより優先度が高いと判断されます。
Definition
ポリシーの内容を設定します。
HA
キューのミラーリングに関するポリシーです。
| arguments | type | description |
|---|---|---|
ha-mode |
String | ミラーリングのモードを指定します。 |
ha-params |
String | ミラーリングのモードに対するパラメータを指定します。 |
ha-sync-mode |
String | メッセージの同期モードを指定します。 |
Federation
| arguments | type | description |
|---|---|---|
federation-upstream-set |
String | A string; only if the federation plugin is enabled. Chooses the name of a set of upstreams to use with federation, or "all" to use all upstreams. Incompatible with federation-upstream. |
federation-upstream |
String | A string; only if the federation plugin is enabled. Chooses a specific upstream set to use for federation. Incompatible with federation-upstream-set. |
Queues
キューに関するポリシーです。
| arguments | type | description |
|---|---|---|
message-ttl |
Number | メッセージの生存期間(ミリ秒)を指定します。 |
expires |
Number | キューの有効期間(ミリ秒)を指定します。ここで指定した時間使用されなかった場合キューを削除します。 |
max-length |
Number | キューに保存するメッセージの最大数(これを超えると古いメッセージから削除)を指定します。 |
max-length-bytes |
Number | キューに保存するメッセージの最大合計サイズ数 (これを超えると古いメッセージから削除)を指定します。 |
dead-letter-exchange |
String | dead-letterになったメッセージの転送先のエクスチェンジ名を指定します。 |
dead-letter-routing-key |
String | dead-letterになったメッセージに付けるルーティングキーを指定します。 |
Exchanges
エクスチェンジに関するポリシーです。
| arguments | type | description |
|---|---|---|
alternate-exchange |
String | 代替エクスチェンジの名前を指定します。 |
Highly Available Queues
クラスター構成のRabbitMQではキューをミラーリングすることができます。逆にクラスター構成にしても初期状態ではキューはミラーリングされません。
キューのミラーリングはポリシーの設定によって行い、ポリシーの設定以降に受信したメッセージからミラーリングが行われます。
ミラーリング用のポリシーの作成
rabbitmqctを使用する場合
set_policy [-p vhostpath] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
definitionにjsonフォーマットでミラーリングの方法を記述します。
ha-modeにキューのミラーリングの方法を下記の3種類から指定します。
| ha-mode | description |
|---|---|
all |
クラスター内のすべてのノードでミラーリングを行います。 |
exactly |
クラスター内の指定数のノードでミラーリングを行います。 |
nodes |
クラスター内の指名したノードでミラーリングを行います。 |
ha-sync-modeに同期モードを下記の2種類から指定します。
| ha-sync-mode | description |
|---|---|
manual |
デフォルトです。非同期のメッセージが発生した場合、手動で同期する必要があります。 |
automatic |
非同期メッセージが発生した場合、自動で同期を行います。 |
手動で同期するにはrabbitmqctlコマンドかmanagementプラグインを使用します。
rabbitmqctlコマンドを使用する場合はsync_queueを使用します。
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 sync_queue userid:2002
Synchronising queue 'userid:2002' in vhost 'v2' ...
[RabbitMQ: Why is the default ha-sync-mode manual?] (http://serverfault.com/questions/543160/rabbitmq-why-is-the-default-ha-sync-mode-manual)
allは、すべてのノードでミラーします。
{"ha-mode":"all", "ha-sync-mode": "automatic"}
exactlyは、ha-paramsで指定した数のノードでミラーします。
{"ha-mode":"exactly", "ha-params":2, "ha-sync-mode":"automatic"}
nodesは、ha-paramsで指名したノードでミラーします。
{"ha-mode":"nodes", "ha-params":["rabbit_node1@localhost","rabbit_node2@localhost"]}
この例では、v2バーチャルホストのuserid:1000.info、userid:1001.infoキューをミラーリングキューに設定します。
ただし、すでにポリシーが設定されているのでプライオリティーを高くして優先されるようにします。
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 set_policy --priority 1 --apply-to queues "ha_info" "^userid:.*\.info" "{""ha-mode"":""all"", ""ha-sync-mode"":""automatic"", ""message-ttl"":1800000}"
Setting policy "ha_info" for pattern "^userid:.*\\.info" to "{\"ha-mode\":\"all\", \"ha-sync-mode\":\"automatic\", \"message-ttl\":1800000}" with priority "1" ...
managementプラグインを使用する場合
ミラーリング用のポリシーも通常のポリシーと同様の方法で登録します。
同じパターンのポリシーが2つ作成されますが、Priorityの高い方(ha_info)が適用されます。
キューの一覧で新しいポリシーが適用されていることを確認することができます。
この例ではノード名の右側にある"+1"がミラーしているノードの数を表しています。
メッセージ (message)
通常はclientアプリケーションより送信されるデータです。
メッセージを送信する
managementプラグインを使用する
managementプラグインからも検証目的のためにメッセージを送信することが可能です。
ここで、これまでに登録したエクスチェンジを使ってメッセージの配信テストを行ってみます。
メニューバーの[Exchanges]→リスト中から「notice.direct」をクリックします。
「Publish message」欄から下記の内容を入力して「Publish message」ボタンをクリックします。
緑色のダイアログに"Message published."と表示されればエクスチェンジへのメッセージ送信は成功です。
| field | value |
|---|---|
| Routing key | 1000 |
| Delivery mode | 2 - Persistent |
| Headers | (nothing) |
| Properties | (nothing) |
| payload | userid:1000へテスト配信 |
次に、キューを確認します。
メニューバーの[Queues]→リスト中から「userid:1000」をクリックします。
「Details」欄の"Messages"が下図のようになっていれば、キューまでメッセージが配送されていることになります。
また「Get messages」欄で受信したメッセージの内容を確認することができます。
Requeueを「No」に変えて「Get Message(s)」ボタンをクリックします。
properties
メッセージ送信時に設定できるプロパティは下記の通りです。
Delivery mode
配信モードを指定します。
- 1 Non-persistent : メッセージをdiskに保存しません。RabbitMQサーバーを停止したりクラッシュした場合、メッセージは失われます。
- 2 Persistent : キューが
Durabilityのときにメッセージをdiskに保存します。
Headers
メッセージにヘッダーを指定します。
ヘッダーには任意の名前と値を指定することができます。エクスチェンジのheadersはこの値を見て配送先のキューを選択します。
Properties
You can set other message properties here (delivery mode and headers are pulled out as the most common cases).
Invalid properties will be ignored. Valid properties are:
設定できるプロパティの一覧です。
- content_type
- content_encoding
- priority
- correlation_id
- reply_to
- expiration
- message_id
- timestamp
- type
- user_id
- app_id
- cluster_id
Payload
メッセージの本体です。
Priority Queue Support
プライオリティキューが使用できます。
RabbitMQ has priority queue implementation in the core as of version 3.5.0.
プライオリティキューはx-max-priorityに最大優先度を指定して登録します。
プライオリティを付けたメッセージをプライオリティーキューへ配送するとコンシューマーアプリケーションはプライオリティーの高いメッセージから順に受信します。
プライオリティは数値の大きい方がより優先度が高いと判断されます。指定できる値の範囲は0~9です。
なおメッセージのプライオリティが最大優先度を超えて設定されていても拒否されることは無く最大値とみなされます。(たとえば、プライオリティーを100にしても最大値の9として扱われます。)
下表は、最大優先度を5に設定したプライオリティーキューへ、プライオリティーを1~5の範囲で設定したメッセージの送信順と受信順の関係です。
| 送信順 | プライオリティ | 受信順 |
|---|---|---|
| 1 | 5 | 1 |
| 2 | 4 | 5 |
| 3 | 3 | 7 |
| 4 | 5 | 2 |
| 5 | 2 | 8 |
| 6 | 1 | 10 |
| 7 | 5 | 3 |
| 8 | 6 | 4 |
| 9 | 2 | 9 |
| 10 | 4 | 6 |
[Priority Queue Support] (https://www.rabbitmq.com/priority.html)
3. rabbitmqctlコマンドのマニュアル
rabbitmqctl [-n node] [-q] {command} [command options...]
参考
[rabbitmqctl(1) manual page] (https://www.rabbitmq.com/man/rabbitmqctl.1.man.html)
4. Tip`s
ちょっと便利な小技を紹介します。
erl.exeプロセスの実行引数の確認
RabbitMQサーバーを実行する際の引数を確認する方法です。
> wmic.exe process where "name=\"erl.exe\"" get name,commandline
File descriptors欄の表示を有効にする
managementプラグインでFile descriptors欄の表示を有効にするには別途MicrosoftよりHandle.exeを取得する必要があります。
取得したHandle.exeはPathの通った場所であればどこに置いても構いません。






















