13
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ZOZOテクノロジーズ #2Advent Calendar 2020

Day 2

FastAPIでRabbitMQを使ってみる

Last updated at Posted at 2020-12-01

やりたいこと

  • FastAPI+RabbitMQでメッセージを送受信できるようにする
  • クライアントからメッセージを送信するためのAPIを作成
  • API経由でRabbitMQにメッセージをキューイング
  • 受信用サーバでメッセージを受け取る

デモ

au1o6-yldc5.gif

FastAPIとは

2018年12月にリリースされたPython製APIフレームワークです
https://fastapi.tiangolo.com/

特徴としては,Node.jsやGoと同等のパフォーマンス性能でかつFlaskライクな書き方で簡単にAPI構築することができます.
ハッカソンなど時間が限られた開発で有効なフレームワークだと思います.

RabbitMQとは

何百万ものメッセージを非同期的に仲介することができるオープンソースソフトウェアです.

AMQP(Advanced Message Queuing Protocol)という組織間でビジネスメッセージを送受信するための公開規格(プロトコル)を実装したシステムになります.

準備

動作に必要なもの

Docker及び,Docker Composeが使えれば大丈夫です!

ファイル構成

.
├── consumer
│   ├── Dockerfile
│   ├── consumer.py
│   └── requirements.txt
├── docker-compose.yml
└── producer
    ├── Dockerfile
    ├── producer.py
    └── requirements.txt

コード

APIサーバ

ヘルスチェック用に/と,RabbitMQサーバにメッセージをプッシュするための/add-job/{message}の2本を作成します.

RabbitMQへ通信を行うためのパッケージとしてpikaを使用します.

/add-job/{message}処理の流れは以下の通りです.

  • pikaを使ってRabbitMQサーバと接続
  • チャンネルとキューの作成
  • 作成したキューに受信したメッセージをプッシュ
  • セッションクローズ
./producer/producer.py
from fastapi import FastAPI
import pika

app = FastAPI()


# ヘルスチェック用
@app.get("/")
def read_root():
    return {"Status": "OK"}


# RabbitMQ用
@app.get("/add-job/{message}")
def add_job(message: str):
    # RabbitMQサーバと接続(ホスト名にはコンテナ名を指定しているが,Dockerを使ってない場合はIPアドレスを指定)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host="rabbitmq"))
    # チャンネルの確立
    channel = connection.channel()
    # メッセージを格納するためのキュー(task_queue)を作成
    channel.queue_declare(queue="task_queue", durable=True)
    # メッセージをキュー(task_queue)に格納
    channel.basic_publish(
        exchange="",
        routing_key="task_queue",
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # メッセージの永続化
        ))
    # 接続のクローズ及びメッセージが配信されたことを確認
    connection.close()

    return {"send": message}

受信サーバ

受信側でもRabbitMQサーバと接続するためにpikaパッケージを利用します.

処理の流れは以下の通りです.

  • RabbitMQサーバと接続
  • チャンネルの作成&キューの存在確認
  • 受信待機
  • 受信時にcallback関数を呼び出し

送信側と主な違いは,メッセージ受取時にcallback関数が呼び出されることです.

callback関数では,受け取ったメッセージを元に細かに操作することができます.

./consumer/consumer.py
import pika


# メッセージ受信のたびに呼び出される関数
def callback(channel, method, properties, body):
    print(f" [x] Received {body}")
    message = body.decode()

    if message == "hey":
        print("hey there")
    elif message == "hello":
        print("well hello there")
    else:
        print(f"sorry i did not understand {body}")

    print(" [x] Done")
    # 受信したことをキューに知らせる
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    # 初期設定
    print(" [*] Connecting to server ...")
    # RabbitMQサーバと接続(ホスト名にはコンテナ名を指定しているが,Dockerを使ってない場合はIPアドレスを指定)
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host="rabbitmq"))
    # チャンネルの確立
    channel = connection.channel()
    # メッセージを受信するためのキュー(task_queue)が存在することを確認
    channel.queue_declare(queue="task_queue", durable=True)
    # 前のメッセージの処理が完了してACKが返るまで次のメッセージを送信しないようにするオプション
    channel.basic_qos(prefetch_count=1)
    # キュー(task_queue)にcallback関数をサブスクライブしてメッセージ受信のたびに実行
    channel.basic_consume(queue="task_queue", on_message_callback=callback)

    try:
        print(" [*] Waiting for messages. To exit press Ctrl+C")
        channel.start_consuming()
    except KeyboardInterrupt:
        print(" [x] Done")


if __name__ == '__main__':
    main()

Dockerコンテナ

Docker Composeを使って3つのコンテナ(RabbitMQ,FastAPI,受信サーバ)を立ち上げます.

APIサーバ

./producer/Dockerfile
FROM python:3.8-alpine

WORKDIR /app

COPY requirements.txt .
# コンテナ内で必要なパッケージをインストール
RUN apk add --no-cache build-base \
 && pip install --no-cache-dir --trusted-host pypi.python.org -r requirements.txt \
 && apk del build-base

COPY producer.py .
EXPOSE 8000
# FastAPIを8000ポートで待機
CMD ["uvicorn", "producer:app", "--reload", "--host", "0.0.0.0", "--port", "8000"]

必要パッケージ

./producer/requirements.txt
uvicorn
fastapi
pika

受信サーバ

./consumer/Dockerfile
FROM python:3.8-alpine

WORKDIR /app

COPY requirements.txt .
# コンテナ内で必要なパッケージをインストール
RUN apk add --no-cache build-base \
 && pip install --no-cache-dir --trusted-host pypi.python.org -r requirements.txt \
 && apk del build-base

COPY consumer.py .

必要パッケージ

./consumer/requirements.txt
pika

Docker Compose

./docker-compose.yml
version: '3.0'

services:
  # FastAPI
  producer:
    container_name: producer
    build: ./producer
    restart: always
    tty: true
    ports:
      - 8000:8000
  # Python Client
  consumer:
    container_name: consumer
    build: ./consumer
  # RabbitMQ
  rabbitmq:
    container_name: rabbitmq
    restart: always
    tty: true
    image: rabbitmq:3.8.9-management-alpine
    ports:
      - '5672:5672'
      - '15672:15672'
#    データの永続化が必要な場合は以下をアンコメント
#    volumes:
#      - rabbitmq-data:/var/lib/rabbitmq
#
#volumes:
#  rabbitmq-data:

実行

コンテナの構築&起動

$ docker-compose up -d --build

コンテナの起動確認

下記コマンドで**RabbitMQ(rabbitmq)FastAPI(producer)**用コンテナがUPステータスであればOKです!

$ docker-compose ps
  Name                Command               State                                   Ports
------------------------------------------------------------------------------------------------------------------------
consumer   python3                          Exit 0
producer   uvicorn producer:app --rel ...   Up       0.0.0.0:8000->8000/tcp
rabbitmq   docker-entrypoint.sh rabbi ...   Up       15671/tcp, 0.0.0.0:15672->15672/tcp, 15691/tcp, 15692/tcp,
                                                     25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp

受信側の起動

起動に成功すると, [*] Connecting to server ...と表示されます.

$ docker-compose run --rm consumer python consumer.py
Creating sample-rabbitmq_consumer_run ... done
 [*] Connecting to server ...
 [*] Waiting for messages. To exit press Ctrl+C

Swaggerを使ってメッセージを送信

FastAPIでは,APIを作成すると自動でSwaggerドキュメントが生成されます.

今回は,せっかくなのでSwaggerからAPIを叩いてみましょう

Swaggerへアクセス

SwaggerからAPIを叩く

ページアクセス後,/add-job/{message}を選択->Try it outを選択すると画像のように表示されるので
messagehelloを記入してExecuteを押すとAPIが叩けます.

swagger.PNG

curlを使ってメッセージを送信

もちろん,curlを使ってAPIを叩くこともできます.

$ curl http://localhost:8000/add-job/hello
{"send":"hello"}

メッセージの受け取り

送信と同時に受信側でメッセージが表示されたことがわかります

 [x] Received b'hello'
well hello there
 [x] Done

RabbitMQ管理画面

RabbitMQコンテナではキューブローカーだけではなく,管理画面も提供されています.
http://localhost:15672/

上記リンクにアクセスすると画像の様な管理画面が表示されます.
gui.PNG

ここでは,時間あたりの受信したメッセージ数やチャンネルの状態など様々な情報を見ることができます.
詳しくは下記記事で説明されているので,気になる方は参照してください.
https://qiita.com/ptiringo/items/c554fa66f0d985394fed

まとめ

FastAPI+RabbitMQ環境を構築し,キューイングシステムを使ってメッセージの送受信を実現することができました.
軽量APIフレームワークであるFastAPIは社内システムで使われることが多いので,RabbitMQと合わせるとより出来ることの幅が広がるかもしれません.

参考サイト

13
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?