リアルタイムなデータ同期システムの検証時などに、Google Cloud の Pub/Sub に流れるデータの中身を確認したい時がある。
そんな時に(もちろん違うモチベーションの時も)使える、Pub/Sub トピックにテキストメッセージを投げて(Push)、サブスクリプションから抜き出す(Pull)だけの簡単な Python スクリプトを作成していく。
公式ドキュメント
事前準備
まずメッセージのやり取りをするための Pub/Sub トピックとサブスクリプションを作成していく。
本番運用するリソースの場合は Terraform などで作成するべきだが、今回は検証用なので gcloud
コマンドでサクッと作成。
トピックを作成
トピック名は pokoyakazan-topic
とする。
※ pokoyakazan
は私のハンドルネーム、特に意味はない
gcloud pubsub topics create pokoyakazan-topic
サブスクリプション作成
サブスクリプション名は pokoyakazan-sub
として、先ほど作成した pokoyakazan-topic
に紐づける。
今回は Python スクリプトからメッセージを抜き出す(Pullする)ので、pullサブスクリプションを作成する。
gcloud pubsub subscriptions create pokoyakazan-sub --topic=pokoyakazan-topic
Python 環境準備
pip
で google-cloud-pubsub をインストールすればひとまず OK
pip install google-cloud-pubsub
Push 用 Python スクリプト作成
トピック pokoyakazan-topic
にメッセージを投げる Python スクリプト(pubsub_push.py
)を作成。
import argparse
import time
from google.cloud import pubsub_v1
def publish(project_id, topic_id, msg_str):
_publisher = pubsub_v1.PublisherClient()
topic_path = _publisher.topic_path(project_id, topic_id)
# メッセージをバイト列に変換
msg = msg_str.encode("utf-8")
future = _publisher.publish(topic_path, msg)
message_id = future.result(timeout=300)
print(f"Published {msg.decode()} to {topic_id}: {message_id}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("project_id", help="Google Cloud project ID")
parser.add_argument("topic_id", help="Pub/Sub topic ID")
args = parser.parse_args()
for i in range(100):
msg = f"{i}: Hello"
publish(args.project_id, args.topic_id, msg)
time.sleep(1)
配信先の、プロジェクト名、トピック名を引数で受け取るようにしており、1秒感覚で合計100個のメッセージを送信する。
topic_path
メソッドを使うと、projects/{project}/topics/{topic}
という形のトピックの FQDN を作ってくれる。
Pull 用 Python スクリプト作成
サブスクリプション pokoyakazan-sub
からメッセージを引っ張ってくる Python スクリプト(pubsub_pull.py
)を作成。
import argparse
from google.cloud import pubsub_v1
def subscribe(project_id, subscription_id):
_subscriber = pubsub_v1.SubscriberClient()
subscription_path = _subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(message)
message.ack()
future = _subscriber.subscribe(
subscription_path, callback=callback
)
print(f"Listening for messages on {subscription_id}..\n")
try:
future.result(timeout=300)
except KeyboardInterrupt:
future.cancel()
future.result()
_subscriber.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"project_id", help="Google Cloud project ID"
)
parser.add_argument(
"subscription_id", help="Pub/Sub subscription ID"
)
args = parser.parse_args()
subscribe(args.project_id, args.subscription_id)
メッセージを引っ張ってくる、プロジェクト名、サブスクリプション名を引数で受け取るようにしている。
subscription_path
メソッドを使うと、projects/{project_id}/subscriptions/{subscription_id}
という形のサブスクリプションの FQDN を作ってくれる。
実行
2つのターミナルウィンドウを開いて、まず1つ目のウィンドウでサブスクリプション(pubsub_pull.py
)を実行。
python pubsub_pull.py ${PROJECT_ID} pokoyakazan-sub
# 出力
Listening for messages on pokoyakazan-sub..
続いて、もう1つのウィンドウでトピックにメッセージを投げる処理(pubsub_push.py
)を実行。
python pubsub_push.py ${PROJECT_ID} pokoyakazan-topic
# 出力
Published 0: Hello to pokoyakazan-topic: 11611560812232285
Published 1: Hello to pokoyakazan-topic: 10212785273198229
Published 2: Hello to pokoyakazan-topic: 11611575733495444
Published 3: Hello to pokoyakazan-topic: 11612216330053931
Published 4: Hello to pokoyakazan-topic: 11612173002154105
...
ここでサブスクリプション側のウィンドウを見るとメッセージが受信できていることがわかる。
まとめ
簡単な Pub/Sub へのメッセージ Pus/Sub スクリプトを Python で実装した。
また余裕があれば BigQuery への配信方法なども書いていく。
めでたし!