0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PythonでRabbitMQ使ってみる

Last updated at Posted at 2022-07-02

RabbitMQの起動

DockerでRabbitMQを起動します。

# DockerHub: https://hub.docker.com/_/rabbitmq
$ docker pull rabbitmq
$ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3
$ docker ps
CONTAINER ID   IMAGE        COMMAND                  CREATED         STATUS         PORTS                                                                    NAMES
4e0ebfaec33e   rabbitmq:3   "docker-entrypoint.s…"   3 seconds ago   Up 2 seconds   4369/tcp, 5671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp   some-rabbit

ライブラリインストール

$ pip install pika
$ pip list | grep pika
pika                   1.2.0

メッセージの送信

RabbitMQの接続

import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

メッセージ送信

送信する前にキューを作成する必要があるので次のように作成します。

channel.queue_declare(queue='sample')

作成したらbasic_publishで送信。
routing_keyに上記で作成したキューを指定します。

channel.basic_publish(exchange='', routing_key='sample', body='Hello World!')

送信できているかは一旦直接list_queuesコマンドで確認してみます。
キュー名sampleにメッセージが1件送信されていることがわかります。

$ docker exec some-rabbit rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
sample	1
送信側のソースコードまとめ
send.py
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='sample')

channel.basic_publish(exchange='', routing_key='sample', body='Hello World!')

connection.close()

メッセージの受信

受信した際の処理をcallbackとして実装しておき、basic_consumeでcallbackを指定します。
あとはstart_consuming()を呼ぶことで指定したキューにメッセージが送信されるとcallback関数が呼ばれbodyに送信されたメッセージが入ってきます。

receive.py
import pika


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='sample')

def callback(ch, method, properties, body):
    print('Received:', body)

channel.basic_consume(queue='sample', auto_ack=True, on_message_callback=callback)

channel.start_consuming()

実際に上記を実行した状態でsend.pyを実行すると下記がprintされました。

Received: b'Hello World!'

Json形式で送信

辞書型のデータを送信する場合はjson.dumps -> json.loadsを組み合わせるだけなので下記のようにしてみました。

send.py
import pika
import json


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='sample')

user = {
    'id': 1,
    'name': 'sample1',
    'email': 'sample1@sample.com'
}
channel.basic_publish(exchange='', routing_key='sample', body=json.dumps(user))

connection.close()
recevie.py
import pika
import json


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='sample')

def callback(ch, method, properties, body):
    user = json.loads(body)
    print('Received:', user)

channel.basic_consume(queue='sample', auto_ack=True, on_message_callback=callback)

channel.start_consuming()
結果
Received: {'id': 1, 'name': 'sample1', 'email': 'sample1@sample.com'}

参考

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?