LoginSignup
11
10

More than 5 years have passed since last update.

rabbitmqctlコマンドの使い方

Last updated at Posted at 2018-02-16

1 rabbitmqctlとは?

RabbitMQサーバーに対して、vhost、ユーザ等の作成、削除を行うためのコマンドです。

2 構成

VMware Workstation 12 Player上のゲストマシンを使っています。

[root@server ~]# cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)

[root@server ~]# uname -r
3.10.0-514.el7.x86_64

3 事前準備

3.1 rabbitmqのインストール

erlangはepelリポジトリにあるので、まず、epelリポジトリをインストールする。
[root@server ~]# yum install epel-release

erlangをインストールする。
[root@server ~]# yum install erlang
[root@server ~]# erl +V
Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version 5.10.4

rabbitmqをインストールする。
[root@server ~]# rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
[root@server ~]# yum install rabbitmq-server

3.2 pikaのインストール

pikaとは、Pythonのライブラリです。
テスト用で作成するConsumer,Producerで使用するので、事前にインストールしておきます。

[root@server ~]# yum install python-setuptools
[root@server ~]# easy_install pip
[root@server ~]# pip --version
pip 9.0.1 from /usr/lib/python2.7/site-packages/pip-9.0.1-py2.7.egg (python 2.7)

pikaをインストールする。
[root@server ~]# pip install pika
Collecting pika
  Downloading pika-0.11.2-py2.py3-none-any.whl (107kB)
    100% |????????????????????????????????| 112kB 585kB/s
Installing collected packages: pika
Successfully installed pika-0.11.2

4 管理画面からRabbitMQサーバにアクセスする方法

4.1 rabbitmqの起動

[root@server ~]# systemctl start rabbitmq-server
[root@server ~]# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
   Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
   Active: active (running) since 月 2018-01-22 20:57:57 JST; 50s ago
 Main PID: 2194 (beam.smp)
   CGroup: /system.slice/rabbitmq-server.service
           tq2194 /usr/lib64/erlang/erts-5.10.4/bin/beam.smp -W w -K true -A30 -P 1048576 -- -root /usr/lib64/...
           tq2209 /usr/lib64/erlang/erts-5.10.4/bin/epmd -daemon
           tq2282 inet_gethost 4
           mq2283 inet_gethost 4


 1月 22 20:57:47 server systemd[1]: rabbitmq-server.service: Got notification message from PID 2209, but... 2194
 1月 22 20:57:49 server systemd[1]: rabbitmq-server.service: Got notification message from PID 2238, but... 2194
 1月 22 20:57:53 server rabbitmq-server[2194]: RabbitMQ 3.3.5. Copyright (C) 2007-2014 GoPivotal, Inc.
 1月 22 20:57:53 server rabbitmq-server[2194]: ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
 1月 22 20:57:53 server rabbitmq-server[2194]: ##  ##
 1月 22 20:57:53 server rabbitmq-server[2194]: ##########  Logs: /var/log/rabbitmq/rabbit@server.log
 1月 22 20:57:53 server rabbitmq-server[2194]: ######  ##        /var/log/rabbitmq/rabbit@server-sasl.log
 1月 22 20:57:53 server rabbitmq-server[2194]: ##########
 1月 22 20:57:57 server systemd[1]: Started RabbitMQ broker.
 1月 22 20:57:57 server rabbitmq-server[2194]: Starting broker... completed with 0 plugins.
Hint: Some lines were ellipsized, use -l to show in full.

4.2 ポート番号の解放

[root@server ~]# firewall-cmd --add-port=5672/tcp
success
[root@server ~]# firewall-cmd --add-port=15672/tcp
success
ポート番号 用途
5672(RabbitMQ Node Port) rabbitmqサーバとProducer,Consumerがメッセージの送受信に使う
5671(RabbitMQ Node Port) rabbitmqサーバとProducer,Consumerがメッセージの送受信に使う(TLSで暗号化)
15672(RabbitMQ Management Port) Webブラウザの管理画面よりrabbitmqにアクセスするときに使う

4.3 Webブラウザよりアクセスする

ユーザを登録する。ID/Passはadmin/11111とする。
[root@server ~]# rabbitmqctl add_user admin 11111
Creating user "admin" ...
...done.

[root@server ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
...done.

[root@server ~]# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
...done.

[root@server ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.

RabbitMQをGUIで管理できるプラグインの追加
[root@server ~]# rabbitmq-plugins list
[e] amqp_client                       3.3.5
[ ] cowboy                            0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap                             3.3.5-gite309de4
[e] mochiweb                          2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0                  3.3.5
[ ] rabbitmq_auth_backend_ldap        3.3.5
[ ] rabbitmq_auth_mechanism_ssl       3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation               3.3.5
[ ] rabbitmq_federation_management    3.3.5
[E] rabbitmq_management               3.3.5
[e] rabbitmq_management_agent         3.3.5
[ ] rabbitmq_management_visualiser    3.3.5
[ ] rabbitmq_mqtt                     3.3.5
[ ] rabbitmq_shovel                   3.3.5
[ ] rabbitmq_shovel_management        3.3.5
[ ] rabbitmq_stomp                    3.3.5
[ ] rabbitmq_test                     3.3.5
[ ] rabbitmq_tracing                  3.3.5
[e] rabbitmq_web_dispatch             3.3.5
[ ] rabbitmq_web_stomp                3.3.5
[ ] rabbitmq_web_stomp_examples       3.3.5
[ ] sockjs                            0.3.4-rmq3.3.5-git3132eb9
[e] webmachine                        1.10.3-rmq3.3.5-gite9359c7

[root@server ~]# rabbitmq-plugins enable rabbitmq_management
Plugin configuration unchanged.

[root@server ~]# rabbitmq-plugins list|grep management
[ ] rabbitmq_federation_management    3.3.5
[E] rabbitmq_management               3.3.5
[e] rabbitmq_management_agent         3.3.5
[ ] rabbitmq_management_visualiser    3.3.5
[ ] rabbitmq_shovel_management        3.3.5

rabbit.png

5 vhostの追加、削除、表示方法

vhostはここを参照しました。
vhostは、ユーザ、キュー、コネクション等のリソースの集合です。
vhostは複数定義することが可能で、名前を持っています。
そして、Producerは名前を指定してRabbitMQに接続します。

5.1 vhostを追加する方法(add_vhost)

vhost(/test1)を追加する。
[root@server ~]# rabbitmqctl add_vhost /test1
Creating vhost "/test1" ...
...done.

vhost(/test2)を追加する。
[root@server ~]# rabbitmqctl add_vhost /test2
Creating vhost "/test2" ...
...done.

vhostを確認する。/test1と/test2が追加されたことがわかる。
/はデフォルトで存在するvhostです。
[root@server ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
/test1
/test2
...done.

5.2 vhostを削除する方法(delete_vhost)

vhost(/test1)を削除する。
[root@server ~]# rabbitmqctl delete_vhost /test1
Deleting vhost "/test1" ...
...done.

vhost(/test2)を削除する。
[root@server ~]# rabbitmqctl delete_vhost /test2
Deleting vhost "/test2" ...
...done.

vhostを確認する。/test1と/test2が削除されたことがわかる。
[root@server ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
...done.

6 ユーザの追加、削除、表示方法

6.1 ユーザの追加方法(add_user)

書式は以下のとおり。
rabbitmqctl add_user <user> <password>

登録されているユーザを確認する。adminとguestが登録されていることがわかる。
[root@server ~]# rabbitmqctl list_users
Listing users ...
admin   [administrator]
guest   [administrator]
...done.

ユーザ(user1)を追加する。
[root@server ~]# rabbitmqctl add_user user1 11111
Creating user "user1" ...
...done.

ユーザを確認する。user1が追加されたことがわかる。
[root@server ~]# rabbitmqctl list_users
Listing users ...
admin   [administrator]
guest   [administrator]
user1   []
...done.

6.2 ユーザの削除方法(delete_user)

ユーザ(user1)を削除する。
[root@server ~]# rabbitmqctl delete_user user1
Deleting user "user1" ...
...done.

ユーザを確認する。user1が削除されたことがわかる。
[root@server ~]# rabbitmqctl list_users
Listing users ...
admin   [administrator]
guest   [administrator]
...done.

7 権限の設定、削除方法

7.1 権限の設定方法(set_permissions)

[root@server ~]# rabbitmqctl add_vhost /test1
Creating vhost "/test1" ...
...done.
[root@server ~]# rabbitmqctl add_user user1 11111
Creating user "user1" ...
...done.

[root@server ~]# rabbitmqctl set_permissions -p /test1 user1 ".*" ".*" ".*"
Setting permissions for user "user1" in vhost "/test1" ...
...done.

[root@server ~]# rabbitmqctl list_permissions -p /test1
Listing permissions in vhost "/test1" ...
user1   .*      .*      .*
...done.

7.2 権限の削除方法(clear_permissions)

[root@server ~]# rabbitmqctl clear_permissions -p /test1 user1
Clearing permissions for user "user1" in vhost "/test1" ...
...done.
[root@server ~]# rabbitmqctl list_permissions -p /test1
Listing permissions in vhost "/test1" ...
...done.

8 キューの表示

"/"に存在するキューを表示する。"hello"が存在することがわかる。
[root@server ~]# rabbitmqctl list_queues -p /
Listing queues ...
hello   0
...done.

9 エクスチェンジ(Exchanges)

エクスチェンジは、Producerからメッセージを受信する役割を実行します。
受信したメッセージは、バインディング(後述)にしたがって、適切なキューにキューイングします。

エクスチェンジを表示する。
[root@server ~]# rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
...done.

10 バインディング(Binding)

どのキューにメッセージをキューイングするかを決めたルールです。
メッセージはバインディングに従って、適切なキューにキューイングされます。

[root@server ~]# rabbitmqctl list_bindings
Listing bindings ...
...done.

11 動作確認(Hellow World)

テストプログラムは、ここ(Hello World!")にあるものを使いました。

11.1 受信側(Consumer)

テストプログラムを作成する。
[root@server ~]# vi receive.py
[root@server ~]# cat receive.py
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

キューを確認する。
[root@server ~]# rabbitmqctl list_queues -p /
Listing queues ...
hello   0
...done.

11.2 送信側(Producer)

テストプログラムを作成する。
[root@server ~]# vi send.py
[root@server ~]# cat send.py
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

11.3 実行結果

[root@server ~]# python receive.py
 [*] Waiting for messages. To exit press CTRL+C

もう1つターミナルを開く
[root@server ~]# python send.py
 [x] Sent 'Hello World!'

[root@server ~]# python receive.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

12 動作確認(Work Queues)

テストプログラムは、ここ(Work Queues)にあるものを使いました。

12.1 受信側(Consumer)

テストプログラムを作成する。
[root@server ~]# vi worker.py
[root@server ~]# cat worker.py
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

12.2 送信側(Producer)

テストプログラムを作成する。
[root@server ~]# vi new_task.py
[root@server ~]# cat new_task.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

12.3 実行結果

Consumerを起動する。
[root@server ~]# python worker.py
 [*] Waiting for messages. To exit press CTRL+C

もう1つターミナルを開く。Consumerを起動する。この時点で2つConsumerを起動したことになる。
[root@server ~]# python worker.py
 [*] Waiting for messages. To exit press CTRL+C

もう1つターミナルを開く。Producerを起動する。メッセージ("11111")を送信する。
[root@server ~]# python new_task.py 11111
 [x] Sent '11111'

[root@server ~]# python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '11111'
 [x] Done

Producerを起動する。メッセージ("22222")を送信する。
[root@server ~]# python new_task.py 22222
 [x] Sent '22222'
[root@server 02_work]# python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '22222'
 [x] Done

12.4 すこしだけ深堀

lsofコマンドを実行する。IPv6ソケットを使用していることがわかる。
サーバ(beam.smp)は、TCPの5672番ポートでListen(★)していることがわかる。
また、ConsumerはサーバとTCPコネクションを2つ(■)確立していることも確認できます。
[root@server ~]# lsof -i:5672 -P -n
COMMAND   PID     USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
beam.smp 1247 rabbitmq   16u  IPv6  28059      0t0  TCP *:5672 (LISTEN) ★
beam.smp 1247 rabbitmq   19u  IPv6  43405      0t0  TCP [::1]:5672->[::1]:41104 (ESTABLISHED)
beam.smp 1247 rabbitmq   20u  IPv6  44153      0t0  TCP [::1]:5672->[::1]:41106 (ESTABLISHED)
python   2567     root    4u  IPv6  44134      0t0  TCP [::1]:41104->[::1]:5672 (ESTABLISHED) ■
python   2580     root    4u  IPv6  43503      0t0  TCP [::1]:41106->[::1]:5672 (ESTABLISHED) ■

Producerを実行する。Producerはサーバの5672番ポート(★)にTCPコネクションを確立していることがわかります。
[root@server ~]# strace -ttT -f -e trace=setsockopt,socket,connect,sendto python new_task.py 22222
11:07:01.827581 socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP) = 4 <0.000126>
11:07:01.828260 setsockopt(4, SOL_TCP, TCP_NODELAY, [1], 4) = 0 <0.000042>
11:07:01.828800 connect(4, {sa_family=AF_INET6, sin6_port=htons(★5672), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = -1 EINPROGRESS (Operation now in progress) <0.002546>
11:07:01.863561 sendto(4, "\3\0\1\0\0\0\00522222\316", 13, 0, NULL, 0) = 13 <0.001367>
 [x] Sent '22222'

5672番ポートで送受信されるメッセージを確認する。
[root@server ~]# tcpdump -i lo tcp port 5672 -nn

ProducerがサーバとTCPコネクションを確立する。
11:38:57.768532 IP6 ::1.41172 > ::1.5672: Flags [S], seq 3976291393, win 43690, options [mss 65476,sackOK,TS val 11607996 ecr 0,nop,wscale 6], length 0
09:52:46.512335 IP6 ::1.5672 > ::1.41172: Flags [S.], seq 3217423699, ack 3976291394, win 43690, options [mss 65476,sackOK,TS val 11607996 ecr 11607996,nop,wscale 6], length 0
11:38:57.768652 IP6 ::1.41172 > ::1.5672: Flags [.], ack 1, win 683, options [nop,nop,TS val 11607996 ecr 11607996], length 0

TCPコネクション確立後、Producerとサーバ間でメッセージがやり取りされる。
11:38:57.771227 IP6 ::1.41172 > ::1.5672: Flags [P.], seq 1:9, ack 1, win 683, options [nop,nop,TS val 11607999 ecr 11607996], length 8
11:38:57.771282 IP6 ::1.5672 > ::1.41172: Flags [.], ack 9, win 683, options [nop,nop,TS val 11607999 ecr 11607999], length 0
-中略-

ここで、サーバからConsumerにメッセージが送信される。
11:38:57.848765 IP6 ::1.5672 > ::1.41172: Flags [.], ack 449, win 700, options [nop,nop,TS val 11608077 ecr 11608073], length 0
11:38:57.850534 IP6 ::1.5672 > ::1.41104: Flags [P.], seq 1612595414:1612595522, ack 1216607427, win 700, options [nop,nop,TS val 11608078 ecr 11446882], length 108
11:38:57.853411 IP6 ::1.41104 > ::1.5672: Flags [P.], seq 1:22, ack 108, win 700, options [nop,nop,TS val 11608079 ecr 11608078], length 21
11:38:57.853472 IP6 ::1.5672 > ::1.41104: Flags [.], ack 22, win 700, options [nop,nop,TS val 11608079 ecr 11608079], length 0
11:38:57.860847 IP6 ::1.41172 > ::1.5672: Flags [P.], seq 449:483, ack 543, win 700, options [nop,nop,TS val 11608089 ecr 11608077], length 34
11:38:57.862794 IP6 ::1.5672 > ::1.41172: Flags [P.], seq 543:555, ack 483, win 700, options [nop,nop,TS val 11608090 ecr 11608089], length 12
11:38:57.871747 IP6 ::1.41172 > ::1.5672: Flags [P.], seq 483:517, ack 555, win 700, options [nop,nop,TS val 11608099 ecr 11608090], length 34
11:38:57.872622 IP6 ::1.5672 > ::1.41172: Flags [P.], seq 555:567, ack 517, win 700, options [nop,nop,TS val 11608100 ecr 11608099], length 12

ProducerからサーバにTCPコネクション切断を開始する。
11:38:57.874594 IP6 ::1.41172 > ::1.5672: Flags [F.], seq 517, ack 567, win 700, options [nop,nop,TS val 11608102 ecr 11608100], length 0

なぜかサーバはRstで応答している。
11:38:57.876173 IP6 ::1.5672 > ::1.41172: Flags [R.], seq 567, ack 518, win 700, options [nop,nop,TS val 0 ecr 11608102], length 0

サーバ(beam.smp),Producer,Consumerの関係を図示すると、以下のようになります。
 +----------------------------------- server------------------------------------------------+
 |                                                                                          |
 |                             +----- beam.smp -----+                                       |
 |                             |     (PID=1247)     |                                       |
 |                             |                    |                                       |
 |                             |  +--------------+  |                                       |
 |                             |  |   Port 5672  |  |                                       |
 |                             |  +--------------+  |                                       |
 |                             |   A       A    A   |                                       |
 |                             +---|-------|----|---+                                       |
 |                                 |       |    |                                           |
 |                                 |       |    |   "11111" --->                            |
 |                                 |       |    +--------------------> Consumer1 (PID=2567) |
 |                                 |       |                         [Port 41104]           |
 |                                 |       |                                                |
 |                                 |       |                                                |
 |          "22222"->   "11111"->  |       |        "22222" --->                            |
 | Producer <----------------------+       +-------------------------> Consumer2 (PID=2580) |
 |                                                                   [Port 41106]           |
 |                                                                                          |
 +------------------------------------------------------------------------------------------+

13 動作確認(Publish/Subscribe)

テストプログラムは、ここ(Publish/Subscribe)にあるものを使いました。

13.1 受信側(Consumer)

テストプログラムを作成する。
[root@server ~]# vi receive_logs.py
[root@server ~]# cat receive_logs.py
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

13.2 送信側(Producer)

テストプログラムを作成する。
[root@server ~]# vi emit_log.py
[root@server ~]# cat emit_log.py
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

13.3 実行結果

Consumerを起動する。
[root@server ~]# python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C

エクスチェンジを表示する。
[root@server ~]# rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

バインディングを表示する。
[root@server ~]# rabbitmqctl list_bindings
Listing bindings ...
        exchange        amq.gen-kvvcLx2_LaA9nm8oWunr3A  queue   amq.gen-kvvcLx2_LaA9nm8oWunr3A  []
        exchange        task_queue      queue   task_queue      []
logs    exchange        amq.gen-kvvcLx2_LaA9nm8oWunr3A  queue   amq.gen-kvvcLx2_LaA9nm8oWunr3A  []
...done.

Producerを実行する。
[root@server 03_publish]# python emit_log.py
 [x] Sent 'info: Hello World!'

X 参考情報

RabbitMQ公式ページ
CentOS7.3にRabbitMQをインストールしてGUIで管理できるようにする
Management Command Line Tool
RabbitMQ : インストール
rabbitmqやってみた
tutorial
RabbitMQでPublish/Subscribeして遊ぶ
AMQPによるメッセージング

11
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
10