RabbitMQの起動
DockerでRabbitMQを起動します。
# DockerHub: https://hub.docker.com/_/rabbitmq
$ docker pull rabbitmq
$ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4e0ebfaec33e rabbitmq:3 "docker-entrypoint.s…" 3 seconds ago Up 2 seconds 4369/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp some-rabbit
ライブラリインストール
$ pip install pika
$ pip list | grep pika
pika 1.2.0
メッセージの送信
RabbitMQの接続
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
メッセージ送信
送信する前にキューを作成する必要があるので次のように作成します。
channel.queue_declare(queue='sample')
作成したらbasic_publishで送信。
routing_keyに上記で作成したキューを指定します。
channel.basic_publish(exchange='', routing_key='sample', body='Hello World!')
送信できているかは一旦直接list_queuesコマンドで確認してみます。
キュー名sampleにメッセージが1件送信されていることがわかります。
$ docker exec some-rabbit rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
sample 1
送信側のソースコードまとめ
send.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='sample')
channel.basic_publish(exchange='', routing_key='sample', body='Hello World!')
connection.close()
メッセージの受信
受信した際の処理をcallbackとして実装しておき、basic_consumeでcallbackを指定します。
あとはstart_consuming()を呼ぶことで指定したキューにメッセージが送信されるとcallback関数が呼ばれbodyに送信されたメッセージが入ってきます。
receive.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='sample')
def callback(ch, method, properties, body):
print('Received:', body)
channel.basic_consume(queue='sample', auto_ack=True, on_message_callback=callback)
channel.start_consuming()
実際に上記を実行した状態でsend.pyを実行すると下記がprintされました。
Received: b'Hello World!'
Json形式で送信
辞書型のデータを送信する場合はjson.dumps -> json.loadsを組み合わせるだけなので下記のようにしてみました。
send.py
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='sample')
user = {
'id': 1,
'name': 'sample1',
'email': 'sample1@sample.com'
}
channel.basic_publish(exchange='', routing_key='sample', body=json.dumps(user))
connection.close()
recevie.py
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='sample')
def callback(ch, method, properties, body):
user = json.loads(body)
print('Received:', user)
channel.basic_consume(queue='sample', auto_ack=True, on_message_callback=callback)
channel.start_consuming()
結果
Received: {'id': 1, 'name': 'sample1', 'email': 'sample1@sample.com'}
参考