9
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQ 3.5.3 - Installing on Windows 7

Posted at

概要

1台のWindows7上に2ノードから成るRabbitMQのクラスターを構築する方法を説明します。
また、簡単ですがRabbitMQの特徴やrabbitmqctlコマンドによるRabbitMQサーバーの操作方法なども説明します。

この記事の内容は下記のバージョンで動作確認を行いました。

  • Windows7 (64bit)
  • RabbitMQ 3.5.3
  • Erlang/OTP 17.5

1. インストールと初期設定

本格的な説明はRabbitMQのInstalling on Windows (manual)ページにあります。

Erlnagのインストール

RabbitMQの実行にはErlangが必要になりますので事前にインストールを行います。

[ダウンロードページ] (http://www.erlang.org/download.html)より、インストーラーをダウンロードして実行します。
今回使用したファイルはotp_win64_17.5.exeです。

初期設定

環境変数のERLANG_HOMEにerlangのインストールディレクトリを設定します。
環境変数のpath%ERLANG_HOME%\binを追加します。

インストール後の確認

念のためコマンドプロンプトからerl.exeを実行してshellが起動するか確認します。

RabbitMQのインストール

1台のPCにRabbitMQを2ノード起動しクラスターを構成します。
構成は下表の通りです。

node name port port(ssl) port(management)
rabbit_node1@localhost 5672 5671 15672
rabbit_node2@localhost 5674 5673 15674

[ダウンロードページ] (http://www.rabbitmq.com/install-windows-manual.html)より、アーカイブファイルをダウンロードします。
今回ダウンロードしたファイルはrabbitmq-server-windows-3.5.3.zipです。

アーカイブファイルを適当な場所へ解凍します。
この記事では下記の場所へ準備しました。
node1
C:\rabbitmq-3.5.3\rabbitmq_server-3.5.3_node1
node2
C:\rabbitmq-3.5.3\rabbitmq_server-3.5.3_node2

初期設定

baseディレクトリを作成します。
node1用
C:\rabbitmq-3.5.3\node1
node2用
C:\rabbitmq-3.5.3\node2

db、logファイル用のディレクトリを作成します。
node1用
C:\rabbitmq-3.5.3\node1\db
C:\rabbitmq-3.5.3\node1\log
node2用
C:\rabbitmq-3.5.3\node2\db
C:\rabbitmq-3.5.3\node2\log

confファイルを用意します。
テンプレートファイルがありますのでこれを利用します。

C:\rabbitmq-3.5.3\rabbitmq_server-3.5.3_node1\etc\rabbitmq.config.exampleファイルをrabbitmq.configへリネームして下記の場所へコピーします。

node1用
C:\rabbitmq-3.5.3\node1\rabbitmq.config
node2用
C:\rabbitmq-3.5.3\node2\rabbitmq.config

次にrabbitmq.configを編集します。
下記の内容は変更点だけ抜粋したものです。(ポートやログファイルの出力先を記述します。)

node1用

rabbitmq.config
{rabbit,
[
  {tcp_listeners, [5672]},
  {ssl_listeners, [5671]},
]},

{rabbitmq_management,
[
  {http_log_dir, "C:\rabbitmq-3.5.3\node1\log\access.log"},
  {listener, [{port, 15672},
              {ip, "127.0.0.1"}]
  },
  {rates_mode, basic}
]},

node2用

rabbitmq.config
{rabbit,
[
  {tcp_listeners, [5674]},
  {ssl_listeners, [5673]},
]},

{rabbitmq_management,
[
  {http_log_dir, "C:\rabbitmq-3.5.3\node2\log\access.log"},
  {listener, [{port, 15674},
              {ip, "127.0.0.1"}]
  },
  {rates_mode, basic}
]},

次にbatファイルを修正します。
batファイルはインストールディレクトリのsbin下にあります。

node1用
rabbitmqctl.bat

rabbitmqctl.bat
setlocal enabledelayedexpansion

+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node1
+ set RABBITMQ_NODENAME=rabbit_node1@localhost

if "!RABBITMQ_BASE!"=="" (

rabbitmq-server.bat

rabbitmq-server.bat
setlocal enabledelayedexpansion

+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node1
+ set RABBITMQ_NODENAME=rabbit_node1@localhost
+ set RABBITMQ_NODE_PORT=5672

if "!RABBITMQ_USE_LONGNAME!"=="" (

rabbitmq-plugins.bat

rabbitmq-plugins.bat
setlocal enabledelayedexpansion

+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node1
+ set RABBITMQ_NODENAME=rabbit_node1@localhost

if "!RABBITMQ_SERVICENAME!"=="" (

node2用
rabbitmqctl.bat

rabbitmqctl.bat
setlocal enabledelayedexpansion

+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node2
+ set RABBITMQ_NODENAME=rabbit_node2@localhost

if "!RABBITMQ_BASE!"=="" (

rabbitmq-server.bat

rabbitmq-server.bat
setlocal enabledelayedexpansion

+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node2
+ set RABBITMQ_NODENAME=rabbit_node2@localhost
+ set RABBITMQ_NODE_PORT=5674

if "!RABBITMQ_USE_LONGNAME!"=="" (

rabbitmq-plugins.bat

rabbitmq-plugins.bat
setlocal enabledelayedexpansion

+ set RABBITMQ_BASE=C:/rabbitmq-3.5.3/node2
+ set RABBITMQ_NODENAME=rabbit_node2@localhost

if "!RABBITMQ_SERVICENAME!"=="" (

インストール後の確認

node1、node2でRabbitMQサーバーを起動します。

> rabbitmq-server.bat -detached

起動ステータスを確認します。

> rabbitmqctl.bat status

また、RabbitMQサーバーの停止は下記のコマンドで行います。

> rabbitmqctl.bat stop

rabbitmq_managementプラグインの有効化

managementプラグインはRabbitMQサーバーの監視やエクスチェンジ、キュー、ユーザーなどを操作できるブラウザベースの管理機能です。

node1、node2でプラグインを有効化します。

node1

> rabbitmq-plugins.bat enable rabbitmq_management 
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit_node1@localhost... started 6 plugins.

node2

> rabbitmq-plugins.bat enable rabbitmq_management 
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit_node2@localhost... started 6 plugins.

managementプラグインにアクセスします。
ブラウザから下記のURLにアクセスしてログインページが表示されるか確認します。
デフォルトのユーザー/パスワードはguest/guestです。

node1
http://localhost:15672/

node2
http://localhost:15674/

managementプラグイン用の管理者ユーザーの作成

最初から用意されているguestユーザーを削除し、その代わりの管理者ユーザーを作成します。
ユーザーの作成はmanagementプラグインとrabbitmqctlコマンドから行うことができます。

この記事では下記の情報でユーザーを作成しました。

name password tag vhost
admin admin administrator /

タグでロールを設定します。
タグに使用できる名前は下表の通りです。

name description
(none) managementプラグインにアクセスできません。
management
policymaker management+
monitoring management+
administrator policymaker+, management+

rabbitmqctlコマンドを使用してユーザーを作成します

> rabbitmqctl.bat -n rabbit_node1@localhost add_user admin admin
Creating user "admin" ...

> rabbitmqctl.bat -n rabbit_node1@localhost set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...

上記ではデフォルトのバーチャルホスト(/)にアクセス権限を付与していますが、バーチャルホストを増やした場合は、そのバーチャルホストに対してもアクセス権限を付与する必要があります。(権限を付与しないとmanagementプラグイン上で操作ができません。)

guestユーザーを削除します

example
> rabbitmqctl.bat -n rabbit_node1@localhost delete_user guest
Deleting user "guest" ...

参考

[Management Plugin] (https://www.rabbitmq.com/management.html)

クラスターの構築

クラスターの構築はnode1、node2を起動した状態で行います。
今回はnode2をnode1へ参加させます。(node1がマスター、node2がスレーブになります。)

構築前のクラスターの状態を確認します

> rabbitmqctl.bat -n rabbit_node1@localhost cluster_status
Cluster status of node rabbit_node1@localhost ...
[{nodes,[{disc,[rabbit_node1@localhost]}]},
 {running_nodes,[rabbit_node1@localhost]},
 {cluster_name,<<"rabbit_node1@localhost">>},
 {partitions,[]}]

クラスターを構築します

> rabbitmqctl.bat -n rabbit_node2@localhost stop_app
Stopping node rabbit_node2@localhost ...

> rabbitmqctl.bat -n rabbit_node2@localhost join_cluster rabbit_node1@localhost
Clustering node rabbit_node2@localhost with rabbit_node1@localhost ...

> rabbitmqctl.bat -n rabbit_node2@localhost start_app
Starting node rabbit_node2@localhost ...

構築後のクラスターの状態を確認します

> rabbitmqctl.bat -n rabbit_node1@localhost cluster_status
Cluster status of node rabbit_node2@localhost ...
[{nodes,[{disc,[rabbit_node1@localhost,rabbit_node2@localhost]}]},
 {running_nodes,[rabbit_node1@localhost,rabbit_node2@localhost]},
 {cluster_name,<<"rabbit_node1@localhost">>},
 {partitions,[]}]

クラスターの解除方法

一時的にクラスターから外す方法

forget_cluster_nodeを使用します。クラスターへ参加させるにはもう一度join_clusterを使用します。

> rabbitmqctl.bat -n rabbit_node2@localhost stop_app
Stopping node rabbit_node2@localhost ...

> rabbitmqctl.bat -n rabbit_node1@localhost forget_cluster_node rabbit_node2@localhost
Resetting node rabbit_node2@localhost from cluster ...

クラスターから外す方法

example
> rabbitmqctl.bat -n rabbit_node2@localhost stop_app
Stopping node rabbit_node2@localhost ...

> rabbitmqctl.bat -n rabbit_node2@localhost reset
Resetting node rabbit_node2@localhost ...

> rabbitmqctl.bat -n rabbit_node2@localhost start_app
Starting node rabbit_node2@localhost ...

参考

[Clustering Guide] (https://www.rabbitmq.com/clustering.html)
[Highly Available Queues] (https://www.rabbitmq.com/ha.html)

2. RabbitMQの構成

ここからは次のシナリオに基づいて登録を行いながら各要素を説明します。

  • バーチャルホストを2つ(v1v2)登録します。
  • それぞれのバーチャルホストへアクセスできるユーザー(v1_userv2_user)を登録します。

v1バーチャルホスト

  • v1に2つのエクスチェンジ(notice.directbroadcast.fanout)を登録します。
  • v1に3つのキュー(userid:1000userid:1001userid:2002)を登録します。
  • notice.directからキューへのルーティングは固有のルーティングキーによって行います。
  • broadcast.fanoutからはすべてのキューへルーティングします。

v2バーチャルホスト

  • v2に1つのエクスチェンジ(notice.topic)を登録します。
  • v2に4つのキュー(userid:1000.infouserid:1000.warnuserid:1001.infouserid:1001.warn)を登録します。
  • notice.topicからキューへのルーティングはルーティングキーの部分マッチによって行います。

キューのポリシー

v1のキューuserid:1000userid:1001に「user1x」ポリシーを適用します。
v1のキューuserid:2002に「user2x」ポリシーを適用します。
v2のキューuserid:1000.infouserid:1001.infoに「info」ポリシーを適用します。
v2のキューuserid:1000.warnuserid:1001.warnに「warn」ポリシーを適用します。

ポリシーの定義内容

name pattern definition description
user1x ^userid:[1].* max-length:50 メッセージをキューに最大50件まで保持
user2x ^userid:[2].* max-length:100 メッセージをキューに最大100件まで保持
info ^userid:.*.info message-ttl:1800000 メッセージの生存期間を30分まで
warn ^userid:.*.warn message-ttl:3600000 メッセージの生存期間を60分まで
 [vhost]      [exchange]            [routing key]       [queue]
----------+----------------------+------------------+-------------------
   v1    ---> notice.direct     --->  '1000'       ---> userid:1000
                                --->  '1001'       ---> userid:1001
                                --->  '2002'       ---> userid:2002

         ---> broadcast.fanout  ----------------------> userid:1000
                                ----------------------> userid:1001
                                ----------------------> userid:2002

   v2    ---> notice.topic      --->   '*.info'    ---> userid:1000.info
                                --->   '*.info'    ---> userid:1001.info
                                --->   '*.warn'    ---> userid:1000.warn
                                --->   '*.warn'    ---> userid:1001.warn

登録の説明はrabbitmqctlコマンドとmanagementプラグインの両方で行いますが、一部の登録はrabbitmqctlではできません。
それを下表にまとめました。

操作 rabbitmqctlコマンド managementプラグイン desc
バーチャルホストの作成 可 (add_vhost)
ユーザーの登録 可 (add_user)
アクセスコントロール 可 (set_permissions)
エクスチェンジの作成 不可 バーチャルホスト別
キューの作成 不可 バーチャルホスト別
バインド 不可
ポリシーの作成 可 (set_policy) バーチャルホスト別

プロデューサー (producer) / コンシューマー (consumer)

  • プロデューサーはメッセージを配信するアプリケーションで、メッセージの配信をブローカーへ依頼します。
  • コンシューマーはメッセージを受信するアプリケーションで、メッセージをブローカーから受け取ります。

ブローカー (broker)

RabbitMQサーバーのことです。メッセージの中継を担当します。ブローカーはバーチャルホスト、エクスチェンジ、キューといったコンポーネントから構成されます。
アプリケーション間のメッセージ送受信をブローカーが中継することでアプリケーション同士を疎結合にすることができます。

バーチャルホスト (virtual host)

バーチャルホストは、エクスチェンジやキューを意味のあるグループに分割します。
デフォルトのバーチャルホスト(/)が事前に定義されていて、rabbitmqctlコマンドでバーチャルホストの指定を省略した場合、このデフォルトのバーチャルホストが使用されることがあります。

バーチャルホストの作成はrabbitmqctlコマンド、managementプラグインで行うことができます。

バーチャルホストの一覧を表示します

Synopsis
list_vhosts [vhostinfoitem ...]
list_vhosts
> rabbitmqctl.bat -n rabbit_node1@localhost list_vhosts
Listing vhosts ...
/

バーチャルホストの作成

rabbitmqctを使用する場合

rabbitmqctlコマンドで作成するにはadd_vhostを使用します。

Synopsis
add_vhost {vhostpath}

v1バーチャルホストを作成します。

add_vhost
> rabbitmqctl.bat -n rabbit_node1@localhost add_vhost v2
Creating vhost "v2" ...

v2バーチャルホストを作成します。

add_vhost
> rabbitmqctl.bat -n rabbit_node1@localhost add_vhost v2
Creating vhost "v2" ...

managementプラグインを使用する場合

メニューバーの[Admin]→[Virtual Hosts]を選択します。
Nameにバーチャルホスト名を入力して登録します。

add_new_virtual_host.png

v1バーチャルホストを作成します。

field value
Name v1

v2バーチャルホストを作成します。

field value
Name v2

登録後はこうなります。

all_virtual_hosts.png

作成直後のバーチャルホストにはアクセスできるユーザーが存在しませんので、続いてユーザーの登録とそのユーザーにアクセスコントロールの設定を行う必要があります。

ユーザー (user)

バーチャルホストへアクセスするには適切な権限が付与されたユーザーが必要です。

ユーザーの一覧を表示します

Synopsis
list_users
list_users
> rabbitmqctl -n rabbit_node1@localhost list_users
Listing users ...
admin   [administrator]

ユーザーの登録

RabbitMQのユーザーは、エクスチェンジへメッセージを送信するproducerアプリケーション、キューからメッセージを受信するconsumerアプリケーションでも使用します。

ユーザーの登録はrabbitmqctlコマンド、managementプラグインで行うことができます。

rabbitmqctを使用する場合

rabbitmqctlコマンドで作成するにはadd_userを使用します。

Synopsis
add_user {username} {password}

v1_userを作成します。

add_user
> rabbitmqctl.bat -n rabbit_node1@localhost add_user v1_user pass
Creating user "v1_user" ...

v2_userを作成します。

add_user
> rabbitmqctl.bat -n rabbit_node1@localhost add_user v2_user pass
Creating user "v2_user" ...

managementプラグインを使用する場合

メニューバーの[Admin]→[Users]を選択します。
Usernameにユーザー名、Passwordにパスワードを入力して登録します。Tagsは入力不要です。

add_new_user.png

v1_userを作成します。

field value
Username v1_user
Password pass
Tags (nothing)

v2_userを作成します。

field value
Username v2_user
Password pass
Tags (nothing)

登録後はこうなります。

add_users.png

続いてユーザーへアクセスコントロールの設定を行います。

アクセスコントロール

ユーザーがアクセスできるバーチャルホストと権限を設定します。

rabbitmqctを使用する場合

rabbitmqctlコマンドでアクセスコントロールを設定するにはset_permissionsを使用します。

Synopsis
set_permissions [-p vhostpath] {user} {conf} {write} {read}
option value
-p ユーザーのアクセス権限を付与するバーチャルホストを指定します。デフォルトは/です
{user} ユーザーを指定します。
{conf} 設定権限を与えるリソース名を正規表現で指定します。
{write} 書き込み権限を与えるリソース名を正規表現で指定します。
{read} 読み込み権限を与えるリソース名を正規表現で指定します。

全権限を与える場合は".*"、権限を与えない場合は"^$"

v1_userにv1バーチャルホストに対する権限を作成します。

set_permissions
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v1 v1_user "^$" ".*" ".*"
Setting permissions for user "v1_user" in vhost "v1" ...

v2_userにv2バーチャルホストに対する権限を作成します。

set_permissions
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v2 v2_user "^$" ".*" ".*"
Setting permissions for user "v2_user" in vhost "v2" ...

managemanetプラグインの管理ユーザーにも権限を与えます。

set_permissions
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v1 admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "v1" ...
> rabbitmqctl.bat -n rabbit_node1@localhost set_permissions -p v2 admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "v2" ...

ユーザーの付与されたパーミッションの一覧を表示します

Synopsis
list_user_permissions {username}
list_user_permissions
> rabbitmqctl.bat -n rabbit_node1@localhost list_user_permissions admin
Listing permissions for user "admin" ...
/       .*      .*      .*
v1      .*      .*      .*
v2      .*      .*      .*
> rabbitmqctl.bat -n rabbit_node1@localhost list_user_permissions v1_user
Listing permissions for user "v1_user" ...
v1      ^$      .*      .*
> rabbitmqctl.bat -n rabbit_node1@localhost list_user_permissions v2_user
Listing permissions for user "v2_user" ...
v2      ^$      .*      .*

managementプラグインを使用する場合

メニューバーの[Admin]→[Users]を選択しアクセスコントロールを設定したいユーザー名をクリックします。
アクセスを許可するVirtual Hostをリストから選択し、権限を与えるリソースを正規表現で指定します。

set_permissions.png

v1_userにv1バーチャルホストに対する権限を作成します。

field value
Virtual Host v1
Configure regexp ^$
Write regexp .*
Read regexp .*

v2_userにv2バーチャルホストに対する権限を作成します。

field value
Virtual Host v2
Configure regexp ^$
Write regexp .*
Read regexp .*

adminにもv1、v2バーチャルホストに対する権限を作成します。

登録後はこのようになります。

all_users.png

エクスチェンジ (exchange)

プロデューサーから受け取ったメッセージを、バインドの設定にしたがってキューへ配送します。

エクスチェンジの種類

fanout

エクスチェンジにバインドされているすべてのキューへメッセージを配送します。ルーティングキーは使用しません。
メッセージのブロードキャストに向いています。

direct

エクスチェンジにバインドされているキューのうち、メッセージのルーティングキーとバインドのルーティングキーが一致するキューへメッセージを配送します。
メッセージを特定のキューへ配送するのに向いています。

topic

directと似ていますが、メッセージのルーティングキーとバインドのルーティングキーパターンがマッチングするキューへメッセージを配送します。

headers

メッセージのヘッダーに基づいて配送先のキューを決定します。ルーティングキーは使用しません。

組み込みエクスチェンジ

事前に組み込まれているエクスチェンジがあります。これらのエクスチェンジはバーチャルホスト毎に用意されています。これらをそのまま使用することも、独自のエクスチェンジを作成することも可能です。

name type
(AMQP default) direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic

AMQP default : メッセージの配信先(エクスチェンジ)を明示的に指定しなかったときに使用されるデフォルトのエクスチェンジです。このエクスチェンジは暗黙的にすべてのキューにバインドされており、また削除することもできません。

エクスチェンジの作成

エクスチェンジの作成はrabbitmqctlコマンドではできませんので、managementプラグインかclientアプリケーションを使用します。

managementプラグインを使用する

メニューバーの[Exchanges]を選択します。
エクスチェンジはバーチャルホスト毎に作成する必要があります。作成対象のVirtual Hostを選び、Nameにエクスチェンジ名を入力します。ただし名前の先頭に"amp."を付けることはできません。

add_new_exchange.png

v1バーチャルホストにnotice.directエクスチェンジを作成します。

field value
Virtual Host v1
Name notice.direct
Type direct
Durability Durable
Auto Delete No
Internal No
Arguments (nothing)

v1バーチャルホストにbroadcast.fanoutエクスチェンジを作成します。

field value
Virtual Host v1
Name broadcast.fanout
Type fanout
Durability Durable
Auto Delete No
Internal No
Arguments (nothing)

v2バーチャルホストにnotice.topicエクスチェンジを作成します。

field value
Virtual Host v2
Name notice.topic
Type topic
Durability Durable
Auto Delete No
Internal No
Arguments (nothing)

登録後はこのようになります。(*青線で囲った部分が登録したエクスチェンジです。その他は事前定義のエクスチェンジです。)

all_exchanges.png

properties

エクスチェンジに設定できるプロパティは下記の通りです。

property value description
Durability
  • Durable
  • Transient
Durable:エクスチェンジを永続化します、Transient:一時的なエクスチェンジとします。
Auto Delete
  • YES
  • NO
YES:バインドするキューが無くなった場合にエクスチェンジを削除します。
Internal
  • YES
  • NO
YES:エクスチェンジからしかメッセージを受け取りません。エクスチェンジ to エクスチェンジ用です。
arguments
arguments type description
alternate-exchange String If messages to this exchange cannot otherwise be routed, send them to the alternate exchange named here.

キュー (queue)

エクスチェンジから配送されてきたメッセージの保管場所です。
1つのキューは1つのノード上に作成します(RabbitMQサーバーをクラスター構成にしていても)が、ポリシーの設定によってキューをミラーリングすることができます。

キューの作成

キューの作成はrabbitmqctlコマンドではできませんので、managementプラグインかclientアプリケーションを使用します。

managementプラグインを使用する

メニューバーの[Queues]を選択します。
キューはバーチャルホスト毎に作成する必要があります。作成対象のVirtual Hostを選び、Nameにキュー名を入力します。ただし名前の先頭に"amp."を付けることはできません。

add_new_queue.png

v1バーチャルホストにuserid:1000userid:1001userid:2002を作成します。

field value
Virtual Host v1
Name userid:1000、userid:1001、userid:2002
Durability Durable
Node rabbit_node1@localhost
Auto Delete No
Arguments x-max-priority = 9

v2バーチャルホストにuserid:1000.infouserid:1000.warnuserid:1001.infouserid:1001.warnを作成します。

field value
Virtual Host v2
Name userid:1000.info、userid:1000.warn、userid:1001.info、userid:1001.warn
Durability Durable
Node rabbit_node1@localhost
Auto Delete No
Arguments x-max-priority = 9

登録後はこのようになります。

all_queues.png

properties

キューに設定できるプロパティは下記の通りです。

property value description
Durability
  • Durable
  • Transient
Durable:キューを永続化します、Transient:一時的なキューとします。
Node リストから選択します キューを作成するノードを指定します
Auto delete
  • YES
  • NO
YES:consumerからの接続が無くなった場合にキューを削除します。

Durabilityとは、キューの登録情報をRabbitMQサーバーの停止後も保存するかを指定するプロパティです。Transientの場合、RabbitMQサーバーを停止させるとそのキューは無くなります。

arguments
arguments type description
x-message-ttl Number メッセージの生存期間(ミリ秒)を指定します。
x-expires Number キューの有効期間(ミリ秒)を指定します。ここで指定した時間使用されなかった場合キューを削除します。
x-max-length Number キューに保存するメッセージの最大数(これを超えると古いメッセージから削除)を指定します。
x-max-length-bytes Number キューに保存するメッセージの最大合計サイズ数 (これを超えると古いメッセージから削除)を指定します。
x-dead-letter-exchange String dead-letterになったメッセージの転送先のエクスチェンジ名を指定します。
x-dead-letter-routing-key String dead-letterになったメッセージに付けるルーティングキーを指定します。
x-max-priority Number プライオリティの最大値を指定します。大きい値ほど優先度が高くなります。
x-match String all:すべての条件がマッチする場合に受信する。
any:条件が1つでもマッチする場合に受信する。

バインド (binding)

エクスチェンジとキューを関連付けます。関連は「エクスチェンジからキュー」と「エクスチェンジからエクスチェンジ」の2通りが指定できます。
エクスチェンジが受け取ったメッセージの配送先は関連付けられたキューの中から選択されますが、エクスチェンジのタイプがdirecttopicの場合は、ルーティングキーによって配送のキューを分岐させることができます。
headersの場合は、ヘッダーに指定した条件によって配送先のキューを分岐させることができます。

ルーティングキー (routing key)

エクスチェンジのタイプがdirecttopicのとき、バインドにルーティングキーを設定します。
このルーティングキーとメッセージが持つルーティングキーが比較され配送するキューが決定されます。

direct

directの場合、メッセージのルーティングキーがバインドのルーティングキーと一致した場合に、そのバインド先のキューが選択されます。

direct_exchange例
 +- [message]--------------+      +-[ direct exchange]------------+
 |                         |      |                               |
 |  routing key : ?        |----->|   +-[binding] A ----------+   |     +-[ queue ]-----+
 |                         |      |   | routing key : 1000    |-------->|               |
 |  payload {              |      |   |                       |   |     +---------------+
 |     ...                 |      |   +-----------------------+   |
 |  }                      |      |   +-[binding] B ----------+   |     +-[ queue ]-----+
 +-------------------------+      |   | routing key : 1001    |-------->|               |
                                  |   |                       |   |     +---------------+
                                  |   +-----------------------+   |
                                  |   +-[binding] C ----------+   |     +-[ queue ]-----+
                                  |   | routing key : 2000    |-------->|               |
                                  |   |                       |   |     +---------------+
                                  |   +-----------------------+   |
                                  +-------------------------------+

上図のdirect_exchange例では

  • メッセージのrouting keyが1000の場合、binding Aが使用され、その先のキューへメッセージが配送されます。
  • メッセージのrouting keyが1001の場合、binding Bが使用され、その先のキューへメッセージが配送されます。

topic

topicの場合、メッセージのルーティングキーがバインドのルーティングキーパターンと一致する場合に、そのバインド先のキューが選択されます。一見すると正規表現のように見えますがそうではありませんので注意が必要です。
バインドに記述するルーティングキーは

  • .(ドット)で区切られた単語から構成します。
  • キーの最大文字長は255バイトです。
  • 単語には特殊な意味を持つ*#を使用することができます。
  • *は1個の単語と置き換えられます。
  • #は0個または1個以上の単語と置き換えられます。
pattern マッチするパターン マッチしないパターン
A.* A.AA.BA.Cなど AA.B.Cなど
A.# AA.AA.A.A AAAA.Bなど
A.#.Z A.ZA.B.ZA.B.C.Z AA.BA.B.Cなど
topics_exchange例
 +- [message]--------------+      +-[ topic exchange]----------------+
 |                         |      |                                  |
 |  routing key : ?        |----->|   +-[binding] A -------------+   |     +-[ queue ]-----+
 |                         |      |   | routing key : id.*.info  |-------->|               |
 |  payload {              |      |   |                          |   |     +---------------+
 |     ...                 |      |   +--------------------------+   |
 |  }                      |      |   +-[binding] B -------------+   |     +-[ queue ]-----+
 +-------------------------+      |   | routing key : id.*.warn  |-------->|               |
                                  |   |                          |   |     +---------------+
                                  |   +--------------------------+   |
                                  |   +-[binding] C -------------+   |     +-[ queue ]-----+
                                  |   | routing key : broad.#    |-------->|               |
                                  |   |                          |   |     +---------------+
                                  |   +--------------------------+   |
                                  +----------------------------------+

上図のtopics_exchange例では

  • メッセージのrouting keyがid.1000.infoの場合、binding Aが使用され、その先のキューへメッセージが配送されます。
  • メッセージのrouting keyがid.1001.warnの場合、binding Bが使用され、その先のキューへメッセージが配送されます。
  • メッセージのrouting keyがbroad.update.2000や、broad.delete.2001の場合、binding Cが使用され、その先のキューへメッセージが配送されます。

引数 (arguments)

headers

エクスチェンジのタイプがheadersのときに、バインドの引数に任意の分岐条件を設定できます。
この分岐条件とメッセージのヘッダーが比較され配送先のキューが決定されます。

headers exchangeのバインドは引数にx-matchを設定する必要があります。設定できる値はanyallです。

  • any設定は、バインドの分岐条件の何れかを満たした場合に使用されます。
  • all設定は、バインドの分岐条件のすべてを満たした場合に使用されます。
headers_exchangeの例
 +- [message]--------------+      +-[ headers exchange]-----------+
 |                         |      |                               |
 |  arguments              |----->|   +-[binding] A --------+     |     +-[ queue ]-----+
 |   arg1 = ?              |      |   | x-match : any       |---------->|               |
 |   arg2 = ?              |      |   | arg1 : 100          |     |     +---------------+
 |   arg3 = ?              |      |   | arg2 : 200          |     |
 |   arg4 = ?              |      |   +---------------------+     |
 |                         |      |   +-[binding] B --------+     |     +-[ queue ]-----+
 | payload {               |      |   | x-match : any       |---------->|               |
 |    ...                  |      |   | arg3 : 300          |     |     +---------------+
 | }                       |      |   | arg4 : 400          |     |
 +-------------------------+      |   +---------------------+     |
                                  |   +-[binding] C --------+     |     +-[ queue ]-----+
                                  |   | x-match : all       |---------->|               |
                                  |   | arg1 : 100          |     |     +---------------+
                                  |   | arg3 : 300          |     |
                                  |   +---------------------+     |
                                  +-------------------------------+

上図のheaders_exchange例では

  • メッセージのarg1引数が100、またはarg2引数が200の場合、binding Aが使用され、その先のキューへメッセージが配送されます。
  • メッセージのarg3引数が300、またはarg4引数が400の場合、binding Bが使用され、その先のキューへメッセージが配送されます。
  • メッセージのarg1引数が100、且つarg3引数が300の場合、binding A,B,Cが使用され、それぞれのキューへメッセージが配送されます。

エクスチェンジとキューのバインド

バインドはrabbitmqctlコマンドではできませんので、managementプラグインかclientアプリケーションを使用します。

managementプラグインを使用する

メニューバーの[Exchanges]を選択します。
エクスチェンジの一覧が表示されますので、その中からバインドの設定を行いたいエクスチェンジの名前をクリックします。
Bindings欄にある「Add binding from this exchange」というフォームからバインドの設定内容を登録します。

bindings.png

v1バーチャルホストのnotice.directにuserid:1000、userid:1001、userid:2002へのバインドを作成します。

field value
To queue userid:1000
Routing key 1000
Arguments (nothing)
field value
To queue userid:1001
Routing key 1001
Arguments (nothing)
field value
To queue userid:2002
Routing key 2002
Arguments (nothing)

登録後はこうなります。

notice_direct_binding.png

v1バーチャルホストのbroadcast.fanoutにuserid:1000、userid:1001、userid:2002へのバインドを作成します。

field value
To queue userid:1000
Routing key (nothing)
Arguments (nothing)
field value
To queue userid:1001
Routing key (nothing)
Arguments (nothing)
field value
To queue userid:2002
Routing key (nothing)
Arguments (nothing)

登録後はこうなります。

broadcast_fanout_binding.png

v2バーチャルホストのnotice.topicにuserid:1000.info、userid:1000.warn、userid:1001.info、userid:1001.warnへのバインドを作成します。

field value
To queue userid:1000.info
Routing key *.info
Arguments (nothing)
field value
To queue userid:1000.warn
Routing key *.warn
Arguments (nothing)
field value
To queue userid:1001.info
Routing key *.info
Arguments (nothing)
field value
To queue userid:1001.warn
Routing key *.warn
Arguments (nothing)

登録後はこうなります。

notice_topic_binding.png

ポリシー (policy)

エクスチェンジ、キューの動作をポリシーによって制御できます。ポリシーはバーチャルホスト毎に作成します。
ポリシーはrabbitmqctlコマンドかmanagementプラグインで作成することができます。

ポリシーの一覧を表示します

Synopsis
list_policies [-p vhostpath]
list_policies
> rabbitmqctl.bat -n rabbit_node1@localhost list_policies -p /
Listing policies ...
> rabbitmqctl.bat -n rabbit_node1@localhost list_policies -p v1
Listing policies ...
> rabbitmqctl.bat -n rabbit_node1@localhost list_policies -p v2
Listing policies ...

ポリシーの作成

rabbitmqctlコマンドを使用する場合

rabbitmqctlコマンドでポリシーを作成するにはset_policyを使用します。

Synopsis
set_policy [-p vhostpath] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
option value
-p ポリシーを適用するバーチャルホストを指定します。
--priority ポリシーの優先度を指定します。
--apply-to all,exchanges,queuesから選びます。
{name} ポリシーの名前を指定します。
{pattern} ポリシーを適用する対象を正規表現で指定します。
{definition} ポリシーの内容を指定します。

v1バーチャルホストにuser1xポリシーを作成します。

set_policy
> rabbitmqctl.bat -n rabbit_node1@localhost -p v1 set_policy --priority 0 --apply-to queues "user1x" "^userid:[1].*" "{""max-length"":50}"
Setting policy "user1x" for pattern "^userid:[1].*" to "{\"max-length\":50}" with priority "0" ...

v1バーチャルホストにuser2xポリシーを作成します。

set_policy
> rabbitmqctl.bat -n rabbit_node1@localhost -p v1 set_policy --priority 0 --apply-to queues "user2x" "^userid:[2].*" "{""max-length"":100}"
Setting policy "user2x" for pattern "^userid:[2].*" to "{\"max-length\":100}" with priority "0" ...

v2バーチャルホストにinfoポリシーを作成します。

set_policy
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 set_policy --priority 0 --apply-to queues "info" "^userid:.*\.info" "{""message-ttl"":1800000}"
Setting policy "info" for pattern "^userid:.*\\.info" to "{\"message-ttl\":1800000}" with priority "0" ...

v2バーチャルホストにwarnポリシーを作成します。

set_policy
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 set_policy --priority 0 --apply-to queues "warn" "^userid:.*\.warn" "{""message-ttl"":3600000}"
Setting policy "warn" for pattern "^userid:.*\\.warn" to "{\"message-ttl\":3600000}" with priority "0" ...

managementプラグインを使用する場合

メニューバーの[Admin]→[Policies]を選択します。
ポリシーはバーチャルホスト毎に作成する必要があります。作成対象のVirtual Hostを選び、Nameにポリシー名を入力します。

add_policy.png

v1バーチャルホストにuser1xポリシーを作成します。

field value
Virtual Host v1
Name user1x
Pattern ^userid:[1].*
Apply to queues
Priority 0
Definition max-length = 50

v1バーチャルホストにuser2xポリシーを作成します。

field value
Virtual Host v1
Name user2x
Pattern ^userid:[2].*
Apply to queues
Priority 0
Definition max-length = 100

v2バーチャルホストにinfoポリシーを作成します。

field value
Virtual Host v2
Name info
Pattern ^userid:.*.info
Apply to queues
Priority 0
Definition message-ttl = 1800000

v2バーチャルホストにwarnポリシーを作成します。

field value
Virtual Host v2
Name warn
Pattern ^userid:.*.warn
Apply to queues
Priority 0
Definition message-ttl = 3600000

登録後はこのようになります。

all_policies.png

また、キューの一覧でも適用されるポリシーが確認できます。

all_queues_policies.png

properties

ポリシーに設定できるプロパティは下記の通りです。

Pattern

ポリシーを適用するリソース名を正規表現で指定します。
".*"とすると全てのリソースに適用されます。

Apply to

適用するリソースをリストから選択します。

  • Exchanges and queues
  • Exchanges
  • Queues

Priority

ポリシーの優先度を設定します。
パターンにマッチするポリシーが複数あった場合、優先度の高いポリシーが適用されます。大きい値がより優先度が高いと判断されます。

Definition

ポリシーの内容を設定します。

HA
キューのミラーリングに関するポリシーです。

arguments type description
ha-mode String ミラーリングのモードを指定します。
ha-params String ミラーリングのモードに対するパラメータを指定します。
ha-sync-mode String メッセージの同期モードを指定します。

Federation

arguments type description
federation-upstream-set String A string; only if the federation plugin is enabled. Chooses the name of a set of upstreams to use with federation, or "all" to use all upstreams. Incompatible with federation-upstream.
federation-upstream String A string; only if the federation plugin is enabled. Chooses a specific upstream set to use for federation. Incompatible with federation-upstream-set.

Queues
キューに関するポリシーです。

arguments type description
message-ttl Number メッセージの生存期間(ミリ秒)を指定します。
expires Number キューの有効期間(ミリ秒)を指定します。ここで指定した時間使用されなかった場合キューを削除します。
max-length Number キューに保存するメッセージの最大数(これを超えると古いメッセージから削除)を指定します。
max-length-bytes Number キューに保存するメッセージの最大合計サイズ数 (これを超えると古いメッセージから削除)を指定します。
dead-letter-exchange String dead-letterになったメッセージの転送先のエクスチェンジ名を指定します。
dead-letter-routing-key String dead-letterになったメッセージに付けるルーティングキーを指定します。

Exchanges
エクスチェンジに関するポリシーです。

arguments type description
alternate-exchange String 代替エクスチェンジの名前を指定します。

Highly Available Queues

クラスター構成のRabbitMQではキューをミラーリングすることができます。逆にクラスター構成にしても初期状態ではキューはミラーリングされません。
キューのミラーリングはポリシーの設定によって行い、ポリシーの設定以降に受信したメッセージからミラーリングが行われます。

ミラーリング用のポリシーの作成

rabbitmqctを使用する場合

Synopsis
set_policy [-p vhostpath] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}

definitionにjsonフォーマットでミラーリングの方法を記述します。
ha-modeにキューのミラーリングの方法を下記の3種類から指定します。

ha-mode description
all クラスター内のすべてのノードでミラーリングを行います。
exactly クラスター内の指定数のノードでミラーリングを行います。
nodes クラスター内の指名したノードでミラーリングを行います。

ha-sync-modeに同期モードを下記の2種類から指定します。

ha-sync-mode description
manual デフォルトです。非同期のメッセージが発生した場合、手動で同期する必要があります。
automatic 非同期メッセージが発生した場合、自動で同期を行います。

手動で同期するにはrabbitmqctlコマンドかmanagementプラグインを使用します。

rabbitmqctlコマンドを使用する場合はsync_queueを使用します。

sync_queue
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 sync_queue userid:2002
Synchronising queue 'userid:2002' in vhost 'v2' ...

[RabbitMQ: Why is the default ha-sync-mode manual?] (http://serverfault.com/questions/543160/rabbitmq-why-is-the-default-ha-sync-mode-manual)

allは、すべてのノードでミラーします。

all
{"ha-mode":"all", "ha-sync-mode": "automatic"}

exactlyは、ha-paramsで指定した数のノードでミラーします。

exactly
{"ha-mode":"exactly", "ha-params":2, "ha-sync-mode":"automatic"}

nodesは、ha-paramsで指名したノードでミラーします。

nodes
{"ha-mode":"nodes", "ha-params":["rabbit_node1@localhost","rabbit_node2@localhost"]}

この例では、v2バーチャルホストのuserid:1000.info、userid:1001.infoキューをミラーリングキューに設定します。
ただし、すでにポリシーが設定されているのでプライオリティーを高くして優先されるようにします。

set_policy
> rabbitmqctl.bat -n rabbit_node1@localhost -p v2 set_policy --priority 1 --apply-to queues "ha_info" "^userid:.*\.info" "{""ha-mode"":""all"", ""ha-sync-mode"":""automatic"", ""message-ttl"":1800000}"
Setting policy "ha_info" for pattern "^userid:.*\\.info" to "{\"ha-mode\":\"all\", \"ha-sync-mode\":\"automatic\", \"message-ttl\":1800000}" with priority "1" ...

managementプラグインを使用する場合

ミラーリング用のポリシーも通常のポリシーと同様の方法で登録します。

add_ha_mirror_policy.png

同じパターンのポリシーが2つ作成されますが、Priorityの高い方(ha_info)が適用されます。

all_policies_ha.png

キューの一覧で新しいポリシーが適用されていることを確認することができます。
この例ではノード名の右側にある"+1"がミラーしているノードの数を表しています。

ha_queues.png

メッセージ (message)

通常はclientアプリケーションより送信されるデータです。

メッセージを送信する

managementプラグインを使用する

managementプラグインからも検証目的のためにメッセージを送信することが可能です。

publish_message.png

ここで、これまでに登録したエクスチェンジを使ってメッセージの配信テストを行ってみます。
メニューバーの[Exchanges]→リスト中から「notice.direct」をクリックします。

「Publish message」欄から下記の内容を入力して「Publish message」ボタンをクリックします。
緑色のダイアログに"Message published."と表示されればエクスチェンジへのメッセージ送信は成功です。

field value
Routing key 1000
Delivery mode 2 - Persistent
Headers (nothing)
Properties (nothing)
payload userid:1000へテスト配信

次に、キューを確認します。
メニューバーの[Queues]→リスト中から「userid:1000」をクリックします。
「Details」欄の"Messages"が下図のようになっていれば、キューまでメッセージが配送されていることになります。

queue_details.png

また「Get messages」欄で受信したメッセージの内容を確認することができます。
Requeueを「No」に変えて「Get Message(s)」ボタンをクリックします。

get_message.png

properties

メッセージ送信時に設定できるプロパティは下記の通りです。

Delivery mode

配信モードを指定します。

  • 1 Non-persistent : メッセージをdiskに保存しません。RabbitMQサーバーを停止したりクラッシュした場合、メッセージは失われます。
  • 2 Persistent : キューがDurabilityのときにメッセージをdiskに保存します。

Headers

メッセージにヘッダーを指定します。

ヘッダーには任意の名前と値を指定することができます。エクスチェンジのheadersはこの値を見て配送先のキューを選択します。

Properties

You can set other message properties here (delivery mode and headers are pulled out as the most common cases).
Invalid properties will be ignored. Valid properties are:

設定できるプロパティの一覧です。

  • content_type
  • content_encoding
  • priority
  • correlation_id
  • reply_to
  • expiration
  • message_id
  • timestamp
  • type
  • user_id
  • app_id
  • cluster_id

Payload

メッセージの本体です。

Priority Queue Support

プライオリティキューが使用できます。

RabbitMQ has priority queue implementation in the core as of version 3.5.0.

プライオリティキューはx-max-priorityに最大優先度を指定して登録します。
プライオリティを付けたメッセージをプライオリティーキューへ配送するとコンシューマーアプリケーションはプライオリティーの高いメッセージから順に受信します。
プライオリティは数値の大きい方がより優先度が高いと判断されます。指定できる値の範囲は0~9です。
なおメッセージのプライオリティが最大優先度を超えて設定されていても拒否されることは無く最大値とみなされます。(たとえば、プライオリティーを100にしても最大値の9として扱われます。)

下表は、最大優先度を5に設定したプライオリティーキューへ、プライオリティーを1~5の範囲で設定したメッセージの送信順と受信順の関係です。

送信順 プライオリティ 受信順
1 5 1
2 4 5
3 3 7
4 5 2
5 2 8
6 1 10
7 5 3
8 6 4
9 2 9
10 4 6

[Priority Queue Support] (https://www.rabbitmq.com/priority.html)

3. rabbitmqctlコマンドのマニュアル

Synopsis
rabbitmqctl [-n node] [-q] {command} [command options...]

参考

[rabbitmqctl(1) manual page] (https://www.rabbitmq.com/man/rabbitmqctl.1.man.html)

4. Tip`s

ちょっと便利な小技を紹介します。

erl.exeプロセスの実行引数の確認

RabbitMQサーバーを実行する際の引数を確認する方法です。

> wmic.exe process where "name=\"erl.exe\"" get name,commandline

File descriptors欄の表示を有効にする

managementプラグインでFile descriptors欄の表示を有効にするには別途MicrosoftよりHandle.exeを取得する必要があります。
取得したHandle.exeはPathの通った場所であればどこに置いても構いません。

9
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?