1 rabbitmqctlとは?
RabbitMQサーバーに対して、vhost、ユーザ等の作成、削除を行うためのコマンドです。
2 構成
VMware Workstation 12 Player上のゲストマシンを使っています。
[root@server ~]# cat /etc/redhat-release
CentOS Linux release 7.3.1611 (Core)
[root@server ~]# uname -r
3.10.0-514.el7.x86_64
3 事前準備
3.1 rabbitmqのインストール
erlangはepelリポジトリにあるので、まず、epelリポジトリをインストールする。
[root@server ~]# yum install epel-release
erlangをインストールする。
[root@server ~]# yum install erlang
[root@server ~]# erl +V
Erlang (SMP,ASYNC_THREADS,HIPE) (BEAM) emulator version 5.10.4
rabbitmqをインストールする。
[root@server ~]# rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
[root@server ~]# yum install rabbitmq-server
3.2 pikaのインストール
pikaとは、Pythonのライブラリです。
テスト用で作成するConsumer,Producerで使用するので、事前にインストールしておきます。
[root@server ~]# yum install python-setuptools
[root@server ~]# easy_install pip
[root@server ~]# pip --version
pip 9.0.1 from /usr/lib/python2.7/site-packages/pip-9.0.1-py2.7.egg (python 2.7)
pikaをインストールする。
[root@server ~]# pip install pika
Collecting pika
Downloading pika-0.11.2-py2.py3-none-any.whl (107kB)
100% |????????????????????????????????| 112kB 585kB/s
Installing collected packages: pika
Successfully installed pika-0.11.2
4 管理画面からRabbitMQサーバにアクセスする方法
4.1 rabbitmqの起動
[root@server ~]# systemctl start rabbitmq-server
[root@server ~]# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
Active: active (running) since 月 2018-01-22 20:57:57 JST; 50s ago
Main PID: 2194 (beam.smp)
CGroup: /system.slice/rabbitmq-server.service
tq2194 /usr/lib64/erlang/erts-5.10.4/bin/beam.smp -W w -K true -A30 -P 1048576 -- -root /usr/lib64/...
tq2209 /usr/lib64/erlang/erts-5.10.4/bin/epmd -daemon
tq2282 inet_gethost 4
mq2283 inet_gethost 4
1月 22 20:57:47 server systemd[1]: rabbitmq-server.service: Got notification message from PID 2209, but... 2194
1月 22 20:57:49 server systemd[1]: rabbitmq-server.service: Got notification message from PID 2238, but... 2194
1月 22 20:57:53 server rabbitmq-server[2194]: RabbitMQ 3.3.5. Copyright (C) 2007-2014 GoPivotal, Inc.
1月 22 20:57:53 server rabbitmq-server[2194]: ## ## Licensed under the MPL. See http://www.rabbitmq.com/
1月 22 20:57:53 server rabbitmq-server[2194]: ## ##
1月 22 20:57:53 server rabbitmq-server[2194]: ########## Logs: /var/log/rabbitmq/rabbit@server.log
1月 22 20:57:53 server rabbitmq-server[2194]: ###### ## /var/log/rabbitmq/rabbit@server-sasl.log
1月 22 20:57:53 server rabbitmq-server[2194]: ##########
1月 22 20:57:57 server systemd[1]: Started RabbitMQ broker.
1月 22 20:57:57 server rabbitmq-server[2194]: Starting broker... completed with 0 plugins.
Hint: Some lines were ellipsized, use -l to show in full.
4.2 ポート番号の解放
[root@server ~]# firewall-cmd --add-port=5672/tcp
success
[root@server ~]# firewall-cmd --add-port=15672/tcp
success
ポート番号 | 用途 |
---|---|
5672(RabbitMQ Node Port) | rabbitmqサーバとProducer,Consumerがメッセージの送受信に使う |
5671(RabbitMQ Node Port) | rabbitmqサーバとProducer,Consumerがメッセージの送受信に使う(TLSで暗号化) |
15672(RabbitMQ Management Port) | Webブラウザの管理画面よりrabbitmqにアクセスするときに使う |
4.3 Webブラウザよりアクセスする
ユーザを登録する。ID/Passはadmin/11111とする。
[root@server ~]# rabbitmqctl add_user admin 11111
Creating user "admin" ...
...done.
[root@server ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
...done.
[root@server ~]# rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
...done.
[root@server ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
RabbitMQをGUIで管理できるプラグインの追加
[root@server ~]# rabbitmq-plugins list
[e] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[e] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[E] rabbitmq_management 3.3.5
[e] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[e] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[e] webmachine 1.10.3-rmq3.3.5-gite9359c7
[root@server ~]# rabbitmq-plugins enable rabbitmq_management
Plugin configuration unchanged.
[root@server ~]# rabbitmq-plugins list|grep management
[ ] rabbitmq_federation_management 3.3.5
[E] rabbitmq_management 3.3.5
[e] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
5 vhostの追加、削除、表示方法
vhostはここを参照しました。
vhostは、ユーザ、キュー、コネクション等のリソースの集合です。
vhostは複数定義することが可能で、名前を持っています。
そして、Producerは名前を指定してRabbitMQに接続します。
5.1 vhostを追加する方法(add_vhost)
vhost(/test1)を追加する。
[root@server ~]# rabbitmqctl add_vhost /test1
Creating vhost "/test1" ...
...done.
vhost(/test2)を追加する。
[root@server ~]# rabbitmqctl add_vhost /test2
Creating vhost "/test2" ...
...done.
vhostを確認する。/test1と/test2が追加されたことがわかる。
/はデフォルトで存在するvhostです。
[root@server ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
/test1
/test2
...done.
5.2 vhostを削除する方法(delete_vhost)
vhost(/test1)を削除する。
[root@server ~]# rabbitmqctl delete_vhost /test1
Deleting vhost "/test1" ...
...done.
vhost(/test2)を削除する。
[root@server ~]# rabbitmqctl delete_vhost /test2
Deleting vhost "/test2" ...
...done.
vhostを確認する。/test1と/test2が削除されたことがわかる。
[root@server ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
...done.
6 ユーザの追加、削除、表示方法
6.1 ユーザの追加方法(add_user)
書式は以下のとおり。
rabbitmqctl add_user <user
> <password
>
登録されているユーザを確認する。adminとguestが登録されていることがわかる。
[root@server ~]# rabbitmqctl list_users
Listing users ...
admin [administrator]
guest [administrator]
...done.
ユーザ(user1)を追加する。
[root@server ~]# rabbitmqctl add_user user1 11111
Creating user "user1" ...
...done.
ユーザを確認する。user1が追加されたことがわかる。
[root@server ~]# rabbitmqctl list_users
Listing users ...
admin [administrator]
guest [administrator]
user1 []
...done.
6.2 ユーザの削除方法(delete_user)
ユーザ(user1)を削除する。
[root@server ~]# rabbitmqctl delete_user user1
Deleting user "user1" ...
...done.
ユーザを確認する。user1が削除されたことがわかる。
[root@server ~]# rabbitmqctl list_users
Listing users ...
admin [administrator]
guest [administrator]
...done.
7 権限の設定、削除方法
7.1 権限の設定方法(set_permissions)
[root@server ~]# rabbitmqctl add_vhost /test1
Creating vhost "/test1" ...
...done.
[root@server ~]# rabbitmqctl add_user user1 11111
Creating user "user1" ...
...done.
[root@server ~]# rabbitmqctl set_permissions -p /test1 user1 ".*" ".*" ".*"
Setting permissions for user "user1" in vhost "/test1" ...
...done.
[root@server ~]# rabbitmqctl list_permissions -p /test1
Listing permissions in vhost "/test1" ...
user1 .* .* .*
...done.
7.2 権限の削除方法(clear_permissions)
[root@server ~]# rabbitmqctl clear_permissions -p /test1 user1
Clearing permissions for user "user1" in vhost "/test1" ...
...done.
[root@server ~]# rabbitmqctl list_permissions -p /test1
Listing permissions in vhost "/test1" ...
...done.
8 キューの表示
"/"に存在するキューを表示する。"hello"が存在することがわかる。
[root@server ~]# rabbitmqctl list_queues -p /
Listing queues ...
hello 0
...done.
9 エクスチェンジ(Exchanges)
エクスチェンジは、Producerからメッセージを受信する役割を実行します。
受信したメッセージは、バインディング(後述)にしたがって、適切なキューにキューイングします。
エクスチェンジを表示する。
[root@server ~]# rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
...done.
10 バインディング(Binding)
どのキューにメッセージをキューイングするかを決めたルールです。
メッセージはバインディングに従って、適切なキューにキューイングされます。
[root@server ~]# rabbitmqctl list_bindings
Listing bindings ...
...done.
11 動作確認(Hellow World)
テストプログラムは、ここ(Hello World!")にあるものを使いました。
11.1 受信側(Consumer)
テストプログラムを作成する。
[root@server ~]# vi receive.py
[root@server ~]# cat receive.py
# !/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
キューを確認する。
[root@server ~]# rabbitmqctl list_queues -p /
Listing queues ...
hello 0
...done.
11.2 送信側(Producer)
テストプログラムを作成する。
[root@server ~]# vi send.py
[root@server ~]# cat send.py
# !/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
11.3 実行結果
[root@server ~]# python receive.py
[*] Waiting for messages. To exit press CTRL+C
もう1つターミナルを開く
[root@server ~]# python send.py
[x] Sent 'Hello World!'
[root@server ~]# python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
12 動作確認(Work Queues)
テストプログラムは、ここ(Work Queues)にあるものを使いました。
12.1 受信側(Consumer)
テストプログラムを作成する。
[root@server ~]# vi worker.py
[root@server ~]# cat worker.py
# !/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
12.2 送信側(Producer)
テストプログラムを作成する。
[root@server ~]# vi new_task.py
[root@server ~]# cat new_task.py
# !/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
12.3 実行結果
Consumerを起動する。
[root@server ~]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
もう1つターミナルを開く。Consumerを起動する。この時点で2つConsumerを起動したことになる。
[root@server ~]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
もう1つターミナルを開く。Producerを起動する。メッセージ("11111")を送信する。
[root@server ~]# python new_task.py 11111
[x] Sent '11111'
[root@server ~]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received '11111'
[x] Done
Producerを起動する。メッセージ("22222")を送信する。
[root@server ~]# python new_task.py 22222
[x] Sent '22222'
[root@server 02_work]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received '22222'
[x] Done
12.4 すこしだけ深堀
lsofコマンドを実行する。IPv6ソケットを使用していることがわかる。
サーバ(beam.smp)は、TCPの5672番ポートでListen(★)していることがわかる。
また、ConsumerはサーバとTCPコネクションを2つ(■)確立していることも確認できます。
[root@server ~]# lsof -i:5672 -P -n
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
beam.smp 1247 rabbitmq 16u IPv6 28059 0t0 TCP *:5672 (LISTEN) ★
beam.smp 1247 rabbitmq 19u IPv6 43405 0t0 TCP [::1]:5672->[::1]:41104 (ESTABLISHED)
beam.smp 1247 rabbitmq 20u IPv6 44153 0t0 TCP [::1]:5672->[::1]:41106 (ESTABLISHED)
python 2567 root 4u IPv6 44134 0t0 TCP [::1]:41104->[::1]:5672 (ESTABLISHED) ■
python 2580 root 4u IPv6 43503 0t0 TCP [::1]:41106->[::1]:5672 (ESTABLISHED) ■
Producerを実行する。Producerはサーバの5672番ポート(★)にTCPコネクションを確立していることがわかります。
[root@server ~]# strace -ttT -f -e trace=setsockopt,socket,connect,sendto python new_task.py 22222
11:07:01.827581 socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP) = 4 <0.000126>
11:07:01.828260 setsockopt(4, SOL_TCP, TCP_NODELAY, [1], 4) = 0 <0.000042>
11:07:01.828800 connect(4, {sa_family=AF_INET6, sin6_port=htons(★5672), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = -1 EINPROGRESS (Operation now in progress) <0.002546>
11:07:01.863561 sendto(4, "\3\0\1\0\0\0\00522222\316", 13, 0, NULL, 0) = 13 <0.001367>
[x] Sent '22222'
5672番ポートで送受信されるメッセージを確認する。
[root@server ~]# tcpdump -i lo tcp port 5672 -nn
ProducerがサーバとTCPコネクションを確立する。
11:38:57.768532 IP6 ::1.41172 > ::1.5672: Flags [S], seq 3976291393, win 43690, options [mss 65476,sackOK,TS val 11607996 ecr 0,nop,wscale 6], length 0
09:52:46.512335 IP6 ::1.5672 > ::1.41172: Flags [S.], seq 3217423699, ack 3976291394, win 43690, options [mss 65476,sackOK,TS val 11607996 ecr 11607996,nop,wscale 6], length 0
11:38:57.768652 IP6 ::1.41172 > ::1.5672: Flags [.], ack 1, win 683, options [nop,nop,TS val 11607996 ecr 11607996], length 0
TCPコネクション確立後、Producerとサーバ間でメッセージがやり取りされる。
11:38:57.771227 IP6 ::1.41172 > ::1.5672: Flags [P.], seq 1:9, ack 1, win 683, options [nop,nop,TS val 11607999 ecr 11607996], length 8
11:38:57.771282 IP6 ::1.5672 > ::1.41172: Flags [.], ack 9, win 683, options [nop,nop,TS val 11607999 ecr 11607999], length 0
-中略-
ここで、サーバからConsumerにメッセージが送信される。
11:38:57.848765 IP6 ::1.5672 > ::1.41172: Flags [.], ack 449, win 700, options [nop,nop,TS val 11608077 ecr 11608073], length 0
11:38:57.850534 IP6 ::1.5672 > ::1.41104: Flags [P.], seq 1612595414:1612595522, ack 1216607427, win 700, options [nop,nop,TS val 11608078 ecr 11446882], length 108
11:38:57.853411 IP6 ::1.41104 > ::1.5672: Flags [P.], seq 1:22, ack 108, win 700, options [nop,nop,TS val 11608079 ecr 11608078], length 21
11:38:57.853472 IP6 ::1.5672 > ::1.41104: Flags [.], ack 22, win 700, options [nop,nop,TS val 11608079 ecr 11608079], length 0
11:38:57.860847 IP6 ::1.41172 > ::1.5672: Flags [P.], seq 449:483, ack 543, win 700, options [nop,nop,TS val 11608089 ecr 11608077], length 34
11:38:57.862794 IP6 ::1.5672 > ::1.41172: Flags [P.], seq 543:555, ack 483, win 700, options [nop,nop,TS val 11608090 ecr 11608089], length 12
11:38:57.871747 IP6 ::1.41172 > ::1.5672: Flags [P.], seq 483:517, ack 555, win 700, options [nop,nop,TS val 11608099 ecr 11608090], length 34
11:38:57.872622 IP6 ::1.5672 > ::1.41172: Flags [P.], seq 555:567, ack 517, win 700, options [nop,nop,TS val 11608100 ecr 11608099], length 12
ProducerからサーバにTCPコネクション切断を開始する。
11:38:57.874594 IP6 ::1.41172 > ::1.5672: Flags [F.], seq 517, ack 567, win 700, options [nop,nop,TS val 11608102 ecr 11608100], length 0
なぜかサーバはRstで応答している。
11:38:57.876173 IP6 ::1.5672 > ::1.41172: Flags [R.], seq 567, ack 518, win 700, options [nop,nop,TS val 0 ecr 11608102], length 0
サーバ(beam.smp),Producer,Consumerの関係を図示すると、以下のようになります。
+----------------------------------- server------------------------------------------------+
| |
| +----- beam.smp -----+ |
| | (PID=1247) | |
| | | |
| | +--------------+ | |
| | | Port 5672 | | |
| | +--------------+ | |
| | A A A | |
| +---|-------|----|---+ |
| | | | |
| | | | "11111" ---> |
| | | +--------------------> Consumer1 (PID=2567) |
| | | [Port 41104] |
| | | |
| | | |
| "22222"-> "11111"-> | | "22222" ---> |
| Producer <----------------------+ +-------------------------> Consumer2 (PID=2580) |
| [Port 41106] |
| |
+------------------------------------------------------------------------------------------+
13 動作確認(Publish/Subscribe)
テストプログラムは、ここ(Publish/Subscribe)にあるものを使いました。
13.1 受信側(Consumer)
テストプログラムを作成する。
[root@server ~]# vi receive_logs.py
[root@server ~]# cat receive_logs.py
# !/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
13.2 送信側(Producer)
テストプログラムを作成する。
[root@server ~]# vi emit_log.py
[root@server ~]# cat emit_log.py
# !/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
13.3 実行結果
Consumerを起動する。
[root@server ~]# python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
エクスチェンジを表示する。
[root@server ~]# rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
バインディングを表示する。
[root@server ~]# rabbitmqctl list_bindings
Listing bindings ...
exchange amq.gen-kvvcLx2_LaA9nm8oWunr3A queue amq.gen-kvvcLx2_LaA9nm8oWunr3A []
exchange task_queue queue task_queue []
logs exchange amq.gen-kvvcLx2_LaA9nm8oWunr3A queue amq.gen-kvvcLx2_LaA9nm8oWunr3A []
...done.
Producerを実行する。
[root@server 03_publish]# python emit_log.py
[x] Sent 'info: Hello World!'
X 参考情報
RabbitMQ公式ページ
CentOS7.3にRabbitMQをインストールしてGUIで管理できるようにする
Management Command Line Tool
RabbitMQ : インストール
rabbitmqやってみた
tutorial
RabbitMQでPublish/Subscribeして遊ぶ
AMQPによるメッセージング