LoginSignup
0
1

ローカル環境でCloud Pub/Subエミュレータを使用する方法

Posted at

はじめに

業務でCloud Pub/Subを使用する機会がありました。その際、まずはローカルでPub/Subを構築して動かせるようにしないと話にならんと思いましたが、トライアンドエラーで多少躓いたので、そのときのノウハウを共有したいと思いました。
ここでは、ローカル環境でCloud Pub/Subエミュレータを使用する方法について解説します。Docker Composeを使用して、Pub/Subエミュレータ、Publisher、Subscriberの3つのコンテナを構築し、それらを連携させる方法を説明します。

ソース

ここで解説に使用しているソースコードは以下のレポジトリに公開しています。
ご自由に使用ください!
環境構築方法はREADME.mdに記載しています。

言語はPythonを使用しています。

詳細

ディレクトリ構成

.
├── docker-compose.yaml
├── publisher
│   ├── Dockerfile
│   ├── publisher.py
│   └── requirements.txt
├── pubsub
│   ├── Dockerfile
│   └── entrypoint.sh
└── subscriber
    ├── Dockerfile
    ├── requirements.txt
    └── subscriber.py

publisher

まずは、publisherを作成します。

以下のライブラリを使用しています。

requirements.txt
google-cloud-pubsub==2.21.1
Flask==3.0.3
publisher/Dockerfile
FROM python:3.9

ENV PYTHONUNBUFFERED 1

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY publisher.py .
publisher/publisher.py
import os
from google.cloud import pubsub_v1
from flask import Flask, request

app = Flask(__name__)

project_id = os.environ["PUBSUB_PROJECT_ID"]
topic_id = os.environ["PUBSUB_TOPIC_ID"]

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

@app.route('/publish', methods=['POST'])
def publish_message():
    future = publisher.publish(topic_path, request.data)
    future.result()
    return 'Message published.'

if __name__ == '__main__':
    app.run(debug=True)

このサンプルでは、APIを叩かれたときに、そのリクエストボディをPub/Subにpublishするよう作成しています。

future.result()で、publishが成功するまで待機します。

subscriber

続いて、subscriberを作成します。

subscriber/requirements.txt
google-cloud-pubsub==2.21.1
subscriber/Dockerfile
FROM python:3.9

ENV PYTHONUNBUFFERED 1

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY subscriber.py .
subscriber/subscriber.py
import os
from google.cloud import pubsub_v1

project_id = os.environ["PUBSUB_PROJECT_ID"] 
subscription_id = os.environ["PUBSUB_SUBSCRIPTION_ID"]

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received {message.data.decode('utf-8')}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

with subscriber:
    try:
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
    except Exception as e:
        print(e)

subscriber.subscribeメソッドは、指定されたサブスクリプションからメッセージをプルし、受信したメッセージに対してcallback関数を呼び出します。
このメソッドは、StreamingPullFutureオブジェクトを返します。これは、メッセージの受信を非同期で行うためのオブジェクトです。
そしてstreaming_pull_future.result()で、メッセージの受信が終了するまでブロックします。
メッセージの受信中にエラーが発生した場合、対応する例外が発生します。
with文を使用してsubscriberオブジェクトのコンテキストマネージャを利用しているため、withブロックを抜けると自動的にサブスクライバーのクローズ処理が行われます。

pubsub-emulator

pubsub/Dockerfile
FROM gcr.io/google.com/cloudsdktool/cloud-sdk:474.0.0-emulators

RUN apt-get update && \
    apt-get install -y git python3-pip netcat && \
    git clone https://github.com/googleapis/python-pubsub.git

WORKDIR /python-pubsub/samples/snippets
RUN pip3 install virtualenv && \
    virtualenv env && \
    . env/bin/activate && \
    pip3 install -r requirements.txt

COPY ./entrypoint.sh ./
EXPOSE 8085
ENTRYPOINT ["./entrypoint.sh"]
pubsub/entrypoint.sh
#!/bin/bash

set -em

gcloud beta emulators pubsub start --project=$PUBSUB_PROJECT_ID --host-port=$PUBSUB_EMULATOR_HOST --quiet &

while ! nc -z localhost 8085; do
  sleep 0.1
done

. env/bin/activate
python3 publisher.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID
python3 subscriber.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID $PUBSUB_SUBSCRIPTION_ID

fg %1

上記スクリプトをpubsub起動時に実行します。

gcloud beta emulators pubsub start --project=$PUBSUB_PROJECT_ID --host-port=$PUBSUB_EMULATOR_HOST --quiet &
で、pubsub emulatorを起動しています。

python3 publisher.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID
では、トピックを作成しています。
また、python3 subscriber.py $PUBSUB_PROJECT_ID create $PUBSUB_TOPIC_ID $PUBSUB_SUBSCRIPTION_IDでは、サブスクリプションを作成しています。

docker-compose.yaml

services:
  pubsub-emulator:
    container_name: pubsub-emulator
    build:
      context: ./pubsub
      dockerfile: Dockerfile
    restart: always
    platform: linux/amd64
    environment:
      - PUBSUB_PROJECT_ID=my-project
      - PUBSUB_TOPIC_ID=my-topic
      - PUBSUB_SUBSCRIPTION_ID=my-subscription
      - PUBSUB_EMULATOR_HOST=0.0.0.0:8085
    extra_hosts:
      - host.docker.internal:host-gateway
    volumes:
      - ./pubsub:/code
    command: 
      ./pubsub/entrypoint.sh
    ports:
      - 8085:8085
  publisher:
    container_name: publisher
    build:
      context: ./publisher
      dockerfile: Dockerfile
    environment:
      - PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
      - PUBSUB_PROJECT_ID=my-project
      - PUBSUB_TOPIC_ID=my-topic
      - FLASK_APP=publisher.py
      - FLASK_ENV=development
    command: flask run --host=0.0.0.0 --port=8080
    ports:
      - 8080:8080
  subscriber:
    container_name: subscriber
    build:
      context: ./subscriber
      dockerfile: Dockerfile
    environment:
      - PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
      - PUBSUB_PROJECT_ID=my-project
      - PUBSUB_SUBSCRIPTION_ID=my-subscription
    working_dir: /app
    command: python subscriber.py

ここで注意すべき点は、各サービスで設定するPUBSUB_EMULATOR_HOSTの値です。
pubsub-emulatorがリッスンするホストとポートは、
PUBSUB_EMULATOR_HOST=0.0.0.0:8085
としています。
そしてpublisherとsubscriberのPUBSUB_EMULATOR_HOSTはそれぞれ
PUBSUB_EMULATOR_HOST=pubsub-emulator:8085
としています。

こうすることでpubsubを通じてメッセージのやり取りができるようになります。

動作確認

では、動作確認をしてみます。

以下のようにFlaskサーバーにPOSTリクエストします。

curl -X POST -H "Content-Type: application/json" -d '{"message": "Hello, Pub/Sub!"}' http://localhost:8080/publish

すると、subscriberで

Received {"message": "Hello, Pub/Sub!"}.

とでるはずです。

これでPub/Subのローカル環境構築成功です🎉

最後に

以上がローカル環境でCloud Pub/Subエミュレータを使用する方法の解説でした。
HOSTの設定でも多少ハマったのですが、Dockerfileのイメージがslimだと必要なパッケージがないようで、subscriberが起動しなくなったりして、混乱してしまいましたが、なんとかミニマムでPubSubの動作確認がローカルでできるようになりました。

もし同じ悩みを抱えている方がいましたら、参考にしていただけたら嬉しいです!

参考

以下のサイトを参考にさせていただきました!
ありがとうございました🙇‍♂️

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1