やりたいこと
- FastAPI+RabbitMQでメッセージを送受信できるようにする
- クライアントからメッセージを送信するためのAPIを作成
- API経由でRabbitMQにメッセージをキューイング
- 受信用サーバでメッセージを受け取る
デモ
FastAPIとは
2018年12月にリリースされたPython製APIフレームワークです
https://fastapi.tiangolo.com/
特徴としては,Node.jsやGoと同等のパフォーマンス性能でかつFlaskライクな書き方で簡単にAPI構築することができます.
ハッカソンなど時間が限られた開発で有効なフレームワークだと思います.
RabbitMQとは
何百万ものメッセージを非同期的に仲介することができるオープンソースソフトウェアです.
AMQP(Advanced Message Queuing Protocol)という組織間でビジネスメッセージを送受信するための公開規格(プロトコル)を実装したシステムになります.
準備
動作に必要なもの
Docker及び,Docker Composeが使えれば大丈夫です!
ファイル構成
.
├── consumer
│ ├── Dockerfile
│ ├── consumer.py
│ └── requirements.txt
├── docker-compose.yml
└── producer
├── Dockerfile
├── producer.py
└── requirements.txt
コード
APIサーバ
ヘルスチェック用に/
と,RabbitMQサーバにメッセージをプッシュするための/add-job/{message}
の2本を作成します.
RabbitMQへ通信を行うためのパッケージとしてpikaを使用します.
/add-job/{message}
処理の流れは以下の通りです.
- pikaを使ってRabbitMQサーバと接続
- チャンネルとキューの作成
- 作成したキューに受信したメッセージをプッシュ
- セッションクローズ
from fastapi import FastAPI
import pika
app = FastAPI()
# ヘルスチェック用
@app.get("/")
def read_root():
return {"Status": "OK"}
# RabbitMQ用
@app.get("/add-job/{message}")
def add_job(message: str):
# RabbitMQサーバと接続(ホスト名にはコンテナ名を指定しているが,Dockerを使ってない場合はIPアドレスを指定)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="rabbitmq"))
# チャンネルの確立
channel = connection.channel()
# メッセージを格納するためのキュー(task_queue)を作成
channel.queue_declare(queue="task_queue", durable=True)
# メッセージをキュー(task_queue)に格納
channel.basic_publish(
exchange="",
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # メッセージの永続化
))
# 接続のクローズ及びメッセージが配信されたことを確認
connection.close()
return {"send": message}
受信サーバ
受信側でもRabbitMQサーバと接続するためにpikaパッケージを利用します.
処理の流れは以下の通りです.
- RabbitMQサーバと接続
- チャンネルの作成&キューの存在確認
- 受信待機
- 受信時にcallback関数を呼び出し
送信側と主な違いは,メッセージ受取時にcallback関数が呼び出されることです.
callback関数では,受け取ったメッセージを元に細かに操作することができます.
import pika
# メッセージ受信のたびに呼び出される関数
def callback(channel, method, properties, body):
print(f" [x] Received {body}")
message = body.decode()
if message == "hey":
print("hey there")
elif message == "hello":
print("well hello there")
else:
print(f"sorry i did not understand {body}")
print(" [x] Done")
# 受信したことをキューに知らせる
channel.basic_ack(delivery_tag=method.delivery_tag)
def main():
# 初期設定
print(" [*] Connecting to server ...")
# RabbitMQサーバと接続(ホスト名にはコンテナ名を指定しているが,Dockerを使ってない場合はIPアドレスを指定)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="rabbitmq"))
# チャンネルの確立
channel = connection.channel()
# メッセージを受信するためのキュー(task_queue)が存在することを確認
channel.queue_declare(queue="task_queue", durable=True)
# 前のメッセージの処理が完了してACKが返るまで次のメッセージを送信しないようにするオプション
channel.basic_qos(prefetch_count=1)
# キュー(task_queue)にcallback関数をサブスクライブしてメッセージ受信のたびに実行
channel.basic_consume(queue="task_queue", on_message_callback=callback)
try:
print(" [*] Waiting for messages. To exit press Ctrl+C")
channel.start_consuming()
except KeyboardInterrupt:
print(" [x] Done")
if __name__ == '__main__':
main()
Dockerコンテナ
Docker Composeを使って3つのコンテナ(RabbitMQ,FastAPI,受信サーバ)を立ち上げます.
APIサーバ
FROM python:3.8-alpine
WORKDIR /app
COPY requirements.txt .
# コンテナ内で必要なパッケージをインストール
RUN apk add --no-cache build-base \
&& pip install --no-cache-dir --trusted-host pypi.python.org -r requirements.txt \
&& apk del build-base
COPY producer.py .
EXPOSE 8000
# FastAPIを8000ポートで待機
CMD ["uvicorn", "producer:app", "--reload", "--host", "0.0.0.0", "--port", "8000"]
必要パッケージ
uvicorn
fastapi
pika
受信サーバ
FROM python:3.8-alpine
WORKDIR /app
COPY requirements.txt .
# コンテナ内で必要なパッケージをインストール
RUN apk add --no-cache build-base \
&& pip install --no-cache-dir --trusted-host pypi.python.org -r requirements.txt \
&& apk del build-base
COPY consumer.py .
必要パッケージ
pika
Docker Compose
version: '3.0'
services:
# FastAPI
producer:
container_name: producer
build: ./producer
restart: always
tty: true
ports:
- 8000:8000
# Python Client
consumer:
container_name: consumer
build: ./consumer
# RabbitMQ
rabbitmq:
container_name: rabbitmq
restart: always
tty: true
image: rabbitmq:3.8.9-management-alpine
ports:
- '5672:5672'
- '15672:15672'
# データの永続化が必要な場合は以下をアンコメント
# volumes:
# - rabbitmq-data:/var/lib/rabbitmq
#
#volumes:
# rabbitmq-data:
実行
コンテナの構築&起動
$ docker-compose up -d --build
コンテナの起動確認
下記コマンドで**RabbitMQ(rabbitmq)とFastAPI(producer)**用コンテナがUPステータスであればOKです!
$ docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------------------------------
consumer python3 Exit 0
producer uvicorn producer:app --rel ... Up 0.0.0.0:8000->8000/tcp
rabbitmq docker-entrypoint.sh rabbi ... Up 15671/tcp, 0.0.0.0:15672->15672/tcp, 15691/tcp, 15692/tcp,
25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp
受信側の起動
起動に成功すると, [*] Connecting to server ...
と表示されます.
$ docker-compose run --rm consumer python consumer.py
Creating sample-rabbitmq_consumer_run ... done
[*] Connecting to server ...
[*] Waiting for messages. To exit press Ctrl+C
Swaggerを使ってメッセージを送信
FastAPIでは,APIを作成すると自動でSwaggerドキュメントが生成されます.
今回は,せっかくなのでSwaggerからAPIを叩いてみましょう
Swaggerへアクセス
SwaggerからAPIを叩く
ページアクセス後,/add-job/{message}
を選択->Try it outを選択すると画像のように表示されるので
messageにhelloを記入してExecuteを押すとAPIが叩けます.
curlを使ってメッセージを送信
もちろん,curlを使ってAPIを叩くこともできます.
$ curl http://localhost:8000/add-job/hello
{"send":"hello"}
メッセージの受け取り
送信と同時に受信側でメッセージが表示されたことがわかります
[x] Received b'hello'
well hello there
[x] Done
RabbitMQ管理画面
RabbitMQコンテナではキューブローカーだけではなく,管理画面も提供されています.
http://localhost:15672/
上記リンクにアクセスすると画像の様な管理画面が表示されます.
ここでは,時間あたりの受信したメッセージ数やチャンネルの状態など様々な情報を見ることができます.
詳しくは下記記事で説明されているので,気になる方は参照してください.
https://qiita.com/ptiringo/items/c554fa66f0d985394fed
まとめ
FastAPI+RabbitMQ環境を構築し,キューイングシステムを使ってメッセージの送受信を実現することができました.
軽量APIフレームワークであるFastAPIは社内システムで使われることが多いので,RabbitMQと合わせるとより出来ることの幅が広がるかもしれません.
参考サイト
- Background Processing With RabbitMQ, Python, and Flask
https://medium.com/better-programming/background-processing-with-rabbitmq-python-and-flask-5ca62acf409c - RabbitMQ tutorial
https://www.rabbitmq.com/tutorials/tutorial-one-python.html