はじめに
業務でCloud Pub/Subを使用する機会がありました。その際、まずはローカルでPub/Subを構築して動かせるようにしないと話にならんと思いましたが、トライアンドエラーで多少躓いたので、そのときのノウハウを共有したいと思いました。
ここでは、ローカル環境でCloud Pub/Subエミュレータを使用する方法について解説します。Docker Composeを使用して、Pub/Subエミュレータ、Publisher、Subscriberの3つのコンテナを構築し、それらを連携させる方法を説明します。
ソース
ここで解説に使用しているソースコードは以下のレポジトリに公開しています。
ご自由に使用ください!
環境構築方法はREADME.mdに記載しています。
言語はPythonを使用しています。
詳細
ディレクトリ構成
.
├── docker-compose.yaml
├── publisher
│ ├── Dockerfile
│ ├── publisher.py
│ └── requirements.txt
├── pubsub
│ ├── Dockerfile
│ └── entrypoint.sh
└── subscriber
├── Dockerfile
├── requirements.txt
└── subscriber.py
publisher
まずは、publisherを作成します。
以下のライブラリを使用しています。
google-cloud-pubsub==2.21.1
Flask==3.0.3
FROM python:3.9
ENV PYTHONUNBUFFERED 1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY publisher.py .
import os
from google.cloud import pubsub_v1
from flask import Flask, request
app = Flask(__name__)
project_id = os.environ["PUBSUB_PROJECT_ID"]
topic_id = os.environ["PUBSUB_TOPIC_ID"]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
@app.route('/publish', methods=['POST'])
def publish_message():
future = publisher.publish(topic_path, request.data)
future.result()
return 'Message published.'
if __name__ == '__main__':
app.run(debug=True)
このサンプルでは、APIを叩かれたときに、そのリクエストボディをPub/Subにpublishするよう作成しています。
future.result()
で、publishが成功するまで待機します。
subscriber
続いて、subscriberを作成します。
google-cloud-pubsub==2.21.1
FROM python:3.9
ENV PYTHONUNBUFFERED 1
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY subscriber.py .
import os
from google.cloud import pubsub_v1
project_id = os.environ["PUBSUB_PROJECT_ID"]
subscription_id = os.environ["PUBSUB_SUBSCRIPTION_ID"]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message.data.decode('utf-8')}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
with subscriber:
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
except Exception as e:
print(e)
subscriber.subscribeメソッドは、指定されたサブスクリプションからメッセージをプルし、受信したメッセージに対してcallback関数を呼び出します。
このメソッドは、StreamingPullFutureオブジェクトを返します。これは、メッセージの受信を非同期で行うためのオブジェクトです。
そしてstreaming_pull_future.result()で、メッセージの受信が終了するまでブロックします。
メッセージの受信中にエラーが発生した場合、対応する例外が発生します。
with文を使用してsubscriberオブジェクトのコンテキストマネージャを利用しているため、withブロックを抜けると自動的にサブスクライバーのクローズ処理が行われます。
pubsub-emulator
FROM gcr.io/google.com/cloudsdktool/cloud-sdk:474.0.0-emulators
RUN apt-get update && \
apt-get install -y git python3-pip netcat && \
git clone https://github.com/googleapis/python-pubsub.git
WORKDIR /python-pubsub/samples/snippets
RUN pip3 install virtualenv && \
virtualenv env && \
. env/bin/activate && \
pip3 install -r requirements.txt
COPY ./entrypoint.sh ./
EXPOSE 8085
ENTRYPOINT ["./entrypoint.sh"]
#!/bin/bash
set -em
gcloud beta emulators pubsub start --project=$PUBSUB_PROJECT_ID --host-port=$PUBSUB_EMULATOR_HOST --quiet &
while ! nc -z localhost 8085; do
sleep 0.1
done
. env/bin/activate
python3 publisher.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID
python3 subscriber.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID $PUBSUB_SUBSCRIPTION_ID
fg %1
上記スクリプトをpubsub起動時に実行します。
gcloud beta emulators pubsub start --project=$PUBSUB_PROJECT_ID --host-port=$PUBSUB_EMULATOR_HOST --quiet &
で、pubsub emulatorを起動しています。
python3 publisher.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID
では、トピックを作成しています。
また、python3 subscriber.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID $PUBSUB_SUBSCRIPTION_ID
では、サブスクリプションを作成しています。
docker-compose.yaml
services:
pubsub-emulator:
container_name: pubsub-emulator
build:
context: ./pubsub
dockerfile: Dockerfile
restart: always
platform: linux/amd64
environment:
- PUBSUB_PROJECT_ID=my-project
- PUBSUB_TOPIC_ID=my-topic
- PUBSUB_SUBSCRIPTION_ID=my-subscription
- PUBSUB_EMULATOR_HOST=0.0.0.0:8085
extra_hosts:
- host.docker.internal:host-gateway
volumes:
- ./pubsub:/code
command:
./pubsub/entrypoint.sh
ports:
- 8085:8085
publisher:
container_name: publisher
build:
context: ./publisher
dockerfile: Dockerfile
environment:
- PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
- PUBSUB_PROJECT_ID=my-project
- PUBSUB_TOPIC_ID=my-topic
- FLASK_APP=publisher.py
- FLASK_ENV=development
command: flask run --host=0.0.0.0 --port=8080
ports:
- 8080:8080
subscriber:
container_name: subscriber
build:
context: ./subscriber
dockerfile: Dockerfile
environment:
- PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
- PUBSUB_PROJECT_ID=my-project
- PUBSUB_SUBSCRIPTION_ID=my-subscription
working_dir: /app
command: python subscriber.py
ここで注意すべき点は、各サービスで設定するPUBSUB_EMULATOR_HOST
の値です。
pubsub-emulatorがリッスンするホストとポートは、
PUBSUB_EMULATOR_HOST=0.0.0.0:8085
としています。
そしてpublisherとsubscriberのPUBSUB_EMULATOR_HOSTはそれぞれ
PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
としています。
こうすることでpubsubを通じてメッセージのやり取りができるようになります。
動作確認
では、動作確認をしてみます。
以下のようにFlaskサーバーにPOSTリクエストします。
curl -X POST -H "Content-Type: application/json" -d '{"message": "Hello, Pub/Sub!"}' http://localhost:8080/publish
すると、subscriberで
Received {"message": "Hello, Pub/Sub!"}.
とでるはずです。
これでPub/Subのローカル環境構築成功です🎉
最後に
以上がローカル環境でCloud Pub/Subエミュレータを使用する方法の解説でした。
HOSTの設定でも多少ハマったのですが、Dockerfileのイメージがslim
だと必要なパッケージがないようで、subscriberが起動しなくなったりして、混乱してしまいましたが、なんとかミニマムでPubSubの動作確認がローカルでできるようになりました。
もし同じ悩みを抱えている方がいましたら、参考にしていただけたら嬉しいです!
参考
以下のサイトを参考にさせていただきました!
ありがとうございました🙇♂️