RabbitMQをインストールする
ダウンロードする場所
http://www.rabbitmq.com/releases/rabbitmq-server/
# wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm
# rpm -ivh rabbitmq-server-3.6.0-1.noarch.rpm
# systemctl start rabbitmq-server.service
# rabbitmqctl status
rabbitmq-management プラグインはHTTPベースのAPIを提供している. 使用する前に有効化する必要がある
# rabbitmq-plugins enable rabbitmq_management
クラスタリングはこのページを参照するといいです。
RabbitMQで新しいユーザー(wang)を作成して、パスワード(123456)を設定する
# rabbitmqctl add_user wang 123456
ユーザー一覧をリストアップ
# rabbitmqctl list_users
Listing users ...
guest [administrator]
wang []
wangユーザーをguestユーザーのようにadministratorにしたいときに
これでwangユーザーが管理コンソールに入れるだけ、vhostに対する操作はできないはずです。
# rabbitmqctl set_user_tags wang administrator
Setting tags for user "wang" to [administrator] ...
# rabbitmqctl list_users
Listing users ...
guest [administrator]
wang [administrator]
このタイミングで list_permissions命令をの出力はwangユーザーのパーミッションの情報がない
# rabbitmqctl list_permissions -p /
Listing permissions in vhost "/" ...
guest .* .* .*
wangユーザーがvhostに対する操作ができるようにしたい場合
# rabbitmqctl set_permissions -p / wang ".*" ".*" ".*"
# rabbitmqctl list_permissions -p /
Listing permissions in vhost "/" ...
guest .* .* .*
wang .* .* .*
マネージメントUIに入る前にツール入れないと入れないです。クラスタリングの場合はすべてのRabbitmqに下のコマンドを実行する必要があります。
# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
IDCクラウドを使っているのでhttp://パブリックIP:15672/
にアクセスする前にポートフォワーディングの設定が必要です。
ErlangとRabbitmqと連携するためにrabbitmq-erlang-clientを入れる必要があります。githubのrabbitmq-tutorialsを利用することにします。
$ git clone https://github.com/rabbitmq/rabbitmq-tutorials
$ cd rabbitmq-tutorials/erlang
$ wget http://www.rabbitmq.com/releases/rabbitmq-erlang-client/v3.6.0/rabbit_common-3.6.0.ez
$ unzip rabbit_common-3.6.0.ez
$ ln -s rabbit_common-3.6.0 rabbit_common
$ wget http://www.rabbitmq.com/releases/rabbitmq-erlang-client/v3.6.0/amqp_client-3.6.0.ez
$ unzip amqp_client-3.6.0.ez
$ ln -s amqp_client-3.6.0 amqp_client
$ ./send.erl
[x] Sent 'Hello World!'
$ ./receive.erl
[*] Waiting for messages. To exit press CTRL+C
[x] Received <<"Hello World!">>
^C
How does a message reach a queue?
Meet AMQP bindings and exchanges. Whenever you want to deliver a message to a queue,
you do it by sending it to an exchange. Then, based on certain rules, RabbitMQ will
decide to which queue it should deliver the message. Those rules are called routing
keys. A queue is said to be bound to an exchange by a routing key.
ElixirのRabbitMQクライアントの実際の利用例
ローカルホストにRabbitMQが必要
iex(1)> {:ok, conn} = AMQP.Connection.open
{:ok, %AMQP.Connection{pid: #PID<0.124.0>}}
iex(2)> {:ok, chan} = AMQP.Channel.open(conn)
{:ok,
%AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.124.0>}, pid: #PID<0.136.0>}}
iex(3)> AMQP.Queue.declare chan, "test_queue"
{:ok, %{consumer_count: 0, message_count: 0, queue: "test_queue"}}
iex(4)> AMQP.Exchange.declare chan, "test_exchange"
:ok
iex(5)> AMQP.Queue.bind chan, "test_queue", "test_exchange"
:ok
iex(6)> AMQP.Basic.publish chan, "test_exchange", "", "Hello, World!"
:ok
iex(7)> {:ok, payload, meta} = AMQP.Basic.get chan, "test_queue"
{:ok, "Hello, World!",
%{app_id: :undefined, cluster_id: :undefined, content_encoding: :undefined,
content_type: :undefined, correlation_id: :undefined, delivery_tag: 1,
exchange: "test_exchange", expiration: :undefined, headers: :undefined,
message_count: 0, message_id: :undefined, persistent: false,
priority: :undefined, redelivered: false, reply_to: :undefined,
routing_key: "", timestamp: :undefined, type: :undefined,
user_id: :undefined}}
iex(8)> payload
"Hello, World!"
iex(9)> AMQP.Queue.subscribe chan, "test_queue", fn(payload, _meta) -> IO.puts("Received: #{payload}") end
{:ok, "amq.ctag-E0tW1BA2uOIHbkGvLBJlNA"}
iex(10)> AMQP.Basic.publish chan, "test_exchange", "", "Hello, World!"
:ok
Received: Hello, World!
チャンネルについて
Before you consume from or publish to Rabbit, you first have to connect to it. Once the TCP connection is open (and you’re authenticated), your app then creates an AMQP channel. This channel is a virtual connection inside the “real” TCP
connection, and it’s over the channel that you issue AMQP commands. Every channel has a unique ID assigned to it. Whether you’re publishing a message, subscribing to a queue, or receiving a message, it’s all done over a channel.
Fair Dispatchについて
デフォルトではRabbitMQはn個目のメッセージをn個目のconsumerに渡すだけ、workerからAckedされていないメッセージは見ていない。そういう動きがあるから、一個のworkerがずっと忙しくて、もう一個のworkerがずっと暇という状況があります。それを克服するためにbasic.qos(prefetch_count=n)
メソッドを使う必要があります。意味合いとしてはn
個のメッセージがAckedされないと、それ以上そのworkerにメッセージを送らない。
Message durability
For a message that’s in flight inside Rabbit to survive a crash, the message must
- Have its delivery mode option set to 2 (persistent)
- Be published into a durable exchange
- Arrive in a durable queue
fanout exchangeについて
The fanout exchange is for ensuring that each queue bound to it gets a copy of the message, not each consumer. If you want each consumer to get a copy of the message, typically you would have each consumer create their own queue and then bind to the exchange.
consumer tagについて
Specifies the identifier for the consumer. The consumer tag is local to a channel. The consumer tag is valid only within the channel from which the consumer was created. Client must not create a consumer in on channel and then use it in another.
Consumer Cancel Notification
When a channel is consuming from a queue, there are various reasons which could cause the consumption to stop. One of these is obviously if the client issues a basic.cancel on the same channel, which will cause the consumer to be cancelled and the server replies with a basic.cancel-ok. Other events, such as the queue being deleted, or in a clustered scenario, the node on which the queue is located failing, will cause the consumption to be cancelled, but the client channel will not be informed, which is frequently unhelpful.
Queueのpassiveオプション
You can set the passive
option of queue.declare
to true. With passive
set to true, queue.declare
will return successfully if the queue exists, and return an error without creating the queue if it doesn't exist.
Message redeliver
If message was not acknowledged and application fails, it will be redelivered automatically and redelivered
property on envelope will be set to true
(unless you consume them with no-ack
= true
flag).