0
1

Cloud Pub/Sub に対して Python から push/pull

Last updated at Posted at 2024-06-25

リアルタイムなデータ同期システムの検証時などに、Google Cloud の Pub/Sub に流れるデータの中身を確認したい時がある。
そんな時に(もちろん違うモチベーションの時も)使える、Pub/Sub トピックにテキストメッセージを投げて(Push)、サブスクリプションから抜き出す(Pull)だけの簡単な Python スクリプトを作成していく。

公式ドキュメント

事前準備

まずメッセージのやり取りをするための Pub/Sub トピックとサブスクリプションを作成していく。
本番運用するリソースの場合は Terraform などで作成するべきだが、今回は検証用なので gcloud コマンドでサクッと作成。

トピックを作成

トピック名は pokoyakazan-topic とする。

pokoyakazan は私のハンドルネーム、特に意味はない

gcloud pubsub topics create pokoyakazan-topic

サブスクリプション作成

サブスクリプション名は pokoyakazan-sub として、先ほど作成した pokoyakazan-topic に紐づける。
今回は Python スクリプトからメッセージを抜き出す(Pullする)ので、pullサブスクリプションを作成する。

gcloud pubsub subscriptions create pokoyakazan-sub --topic=pokoyakazan-topic

Python 環境準備

pipgoogle-cloud-pubsub をインストールすればひとまず OK

pip install google-cloud-pubsub

Push 用 Python スクリプト作成

トピック pokoyakazan-topic にメッセージを投げる Python スクリプト(pubsub_push.py)を作成。

pubsub_push.py
import argparse
import time

from google.cloud import pubsub_v1


def publish(project_id, topic_id, msg_str):
    _publisher = pubsub_v1.PublisherClient()
    topic_path = _publisher.topic_path(project_id, topic_id)
    # メッセージをバイト列に変換
    msg = msg_str.encode("utf-8")
    future = _publisher.publish(topic_path, msg)
    message_id = future.result(timeout=300)
    print(f"Published {msg.decode()} to {topic_id}: {message_id}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("project_id", help="Google Cloud project ID")
    parser.add_argument("topic_id", help="Pub/Sub topic ID")
    args = parser.parse_args()
    for i in range(100):
        msg = f"{i}: Hello"
        publish(args.project_id, args.topic_id, msg)
        time.sleep(1)

配信先の、プロジェクト名、トピック名を引数で受け取るようにしており、1秒感覚で合計100個のメッセージを送信する。

topic_path メソッドを使うと、projects/{project}/topics/{topic} という形のトピックの FQDN を作ってくれる。

Pull 用 Python スクリプト作成

サブスクリプション pokoyakazan-sub からメッセージを引っ張ってくる Python スクリプト(pubsub_pull.py)を作成。

pubsub_pull.py
import argparse

from google.cloud import pubsub_v1


def subscribe(project_id, subscription_id):
    _subscriber = pubsub_v1.SubscriberClient()
    subscription_path = _subscriber.subscription_path(project_id, subscription_id)

    def callback(message):
        print(message)
        message.ack()

    future = _subscriber.subscribe(
        subscription_path, callback=callback
    )
    print(f"Listening for messages on {subscription_id}..\n")

    try:
        future.result(timeout=300)
    except KeyboardInterrupt:
        future.cancel()
        future.result()

    _subscriber.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "project_id", help="Google Cloud project ID"
    )
    parser.add_argument(
        "subscription_id", help="Pub/Sub subscription ID"
    )
    args = parser.parse_args()
    subscribe(args.project_id, args.subscription_id)

メッセージを引っ張ってくる、プロジェクト名、サブスクリプション名を引数で受け取るようにしている。
subscription_path メソッドを使うと、projects/{project_id}/subscriptions/{subscription_id} という形のサブスクリプションの FQDN を作ってくれる。

実行

2つのターミナルウィンドウを開いて、まず1つ目のウィンドウでサブスクリプション(pubsub_pull.py)を実行。

python pubsub_pull.py ${PROJECT_ID} pokoyakazan-sub
# 出力
Listening for messages on pokoyakazan-sub..

続いて、もう1つのウィンドウでトピックにメッセージを投げる処理(pubsub_push.py)を実行。

python pubsub_push.py ${PROJECT_ID} pokoyakazan-topic
# 出力
Published 0: Hello to pokoyakazan-topic: 11611560812232285
Published 1: Hello to pokoyakazan-topic: 10212785273198229
Published 2: Hello to pokoyakazan-topic: 11611575733495444
Published 3: Hello to pokoyakazan-topic: 11612216330053931
Published 4: Hello to pokoyakazan-topic: 11612173002154105
...

ここでサブスクリプション側のウィンドウを見るとメッセージが受信できていることがわかる。

スクリーンショット 2024-06-25 15.47.40.png

まとめ

簡単な Pub/Sub へのメッセージ Pus/Sub スクリプトを Python で実装した。
また余裕があれば BigQuery への配信方法なども書いていく。

めでたし!

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1