AMQP について
AMQPはメッセージングサービスのためのプロトコル。
誰かがQueueにメッセージを突っ込んで、誰かがQueueからメッセージを取り出すというもの。
という理解をしている。
詳しいことは以下のページがとても参考になる。
GREEのエンジニアさんのページ
読みやすくて、とても素晴らしい。
が、動かさないと理解できない質なので、実験。
実験に使う道具は以下のとおり
- RabbitMQ
- AMQPのサーバ
- https://www.rabbitmq.com/
- kombu
- PythonでAMQPを使うライブラリ
- https://pypi.python.org/pypi/kombu
- http://kombu.readthedocs.org/en/latest/
環境構築手順は後述
画面はRabbitMQのManagement Pluginのもの。
Exchangeを作る
GREEのエンジニアさんのページによると、Message
を受け取るのがExchange
で、Exchange
がQueue
に渡すらしい。
とりあえず、Exchange
からつくろうと思い、kombu
のサンプルコードを見て出来たものがこちら。
from kombu import Connection,Exchange
exchange = Exchange('foo_exc', type='direct')
with Connection('amqp://guest:guest@localhost:5672//') as c:
bound = exchange(c.default_channel)
bound.declare()
Exchange
が出来た。一番下の、foo_exc
が上の処理で作ったもの。
Queueを作る
引き続き、サンプルコードを元にQueue
を作成。
from kombu import Connection,Exchange,Queue
exchange = Exchange('foo_exc', type='direct')
queue = Queue('bar_queue', exchange=exchange, routing_key='hoge.fuga')
with Connection('amqp://guest:guest@localhost:5672//') as c:
bound = queue(c.default_channel)
bound.declare()
Queue
が出来た。
Messageを作って投げる
いよいよMessage
を作ってExchange
に突っ込む。
こちらはサンプルコードというよりはリファレンスを参照。
from kombu import Connection,Exchange
exchange = Exchange('foo_exc', type='direct')
with Connection('amqp://guest:guest@localhost:5672//') as c:
bound_exc = exchange(c.default_channel)
msg = bound_exc.Message("Hello, World")
bound_exc.publish(msg, routing_key='hoge.fuga')
ちゃんとHello, World
が入ったもよう。Get messagesのあたり。
Messageを取る
取られないMessage
はただのゴミなので、取る。
サンプルコード
from kombu import Connection,Exchange,Queue,Consumer
exchange = Exchange('foo_exc', type='direct')
queue = Queue('bar_queue', exchange=exchange, routing_key='hoge.fuga')
def callback(body, message):
print body
message.ack()
with Connection('amqp://guest:guest@localhost:5672//') as c:
with Consumer(c.default_channel, queues=[queue], callbacks=[callback]):
c.drain_events()
結果
ubuntu@ubuntu:~$ python consume.py
Hello, World
Hello, World
が表示された。
message.ack()
を呼ばないと、Queue
からメッセージが消えないもよう。
環境構築
RabbitMQ
Docker + docker-composeで構築
docker-compose.yml
rabbit:
image: rabbitmq:3-management
hostname: rabbit001
ports:
- "15672:15672"
- "5672:5672"
kombu
普通にpipでインストール
pip install kombu