GCP の Cloud Pub/Sub を使い方を、gcloud コマンドとpython で確認できたのでメモします。自分の理解を図にしたものです。
以前に同じpubsub モデルの枠組みとしてMQTTを利用したことがありますが、そのときはtopic以外に意識はしなかったのですが、ここではsubscription というモジュールを通してより細かい?制御ができるようです。また、subscriber が取得していない message は補間できるなど、思っていた以上の機能があるようです。
概要
以下に、今回理解した内容のテストの実行手順を示します。
- GCP に topic と subscription を作成する。
- メッセージを topic に publish する。
- subscription から pull する。
- (for python) publish/subscriberするための権限を証明するkey file(json)を用意します。
- publisher, subscriber のプログラムを各ターミナルで実行
環境は Ubuntu (WSL)、power shell でそれぞれ確認しました。ソースは以下にあります。
LINK: github/xtkd77/test-docker-python/samples/gcp-pubsub-python-20200112
少しだけCloud pubsub のメモ
Subscription には pushとpullの2つのタイプが用意されています(説明)。Pull メッセージを任意のタイミングに発行して取りに行くことができ、Polling でも実装ができるようです
。実際にpython で実装するときは、message が届いたときに発行されるcallback 関数があり、そこに処理を実装するようにAPIが設計されているので、polling をベタに実装する必要はありません。
1. Topic と Subscription の用意
Topic を作り、そのtopic を購読するための subscription を作成します。
$ export TOPIC_ID="topic0000"
$ export SUBSCRIPTION_ID="subsc0000"
$ gcloud pubsub topics create ${TOPIC_ID}
$ gcloud pubsub subscriptions create ${SUBSCRIPTION_ID} --topic=${TOPIC_ID}
作られたものを確認します。pushConfig が無いsubscriptionがpull 型というのかな?
$ gcloud pubsub subscriptions list
---
ackDeadlineSeconds: 10
expirationPolicy:
ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/my-project-id/subscriptions/subsc0000
pushConfig: {}
topic: projects/my-project-id/topics/topic0000
$ gcloud pubsub topics list
---
name: projects/my-project-id/topics/topic0000
2. publish する
puslish コマンドを実行すると、Message IDが返ってきます。
$ gcloud pubsub topics publish ${TOPIC_ID} --message "Hello World!"
messageIds:
- '926358110170555'
また、console.cloud.google.com の Pub/Sub のtopic のページからこの topic のページを開き、そこからメッセージを発行することもできます。
json をpublish (追記)
ファイル(json とかを保存したもの)の内容をそのままpublish したいとき、以下のコマンドを使っています。
$ gcloud pubsub toipcs publish ${topic_id} --message "`cat myfile.json`"
3. subscribe する
$ gcloud pubsub subscriptions pull --auto-ack ${SUBSCRIPTION_ID}
┌──────────────┬─────────────────┬────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES │
├──────────────┼─────────────────┼────────────┤
│ Hello World! │ 926358110170555 │ │
└──────────────┴─────────────────┴────────────┘
ここまでは、google SDK で認証が通っているので、できました。
また、console.cloud.google.com の Pub/Sub のsubscriptionのページからこの subscription のページを開き、そこからメッセージをtopic に投稿されたメッセージをpull することもできます。「メッセージを表示」をクリックして出てきた画面で「pull」という青いボタンを押すと、たまっていたメッセージがpull されます。
4. pub and/or sub の権限のあるサービスアカウントのキーを用意
python で実行するとき、publish する(topic を発行する)、subscribe する (subscriptionにアクセスする)ために、認証が必要になります。python のAPIを利用する場合、それはpublisher や subscriber のインスタンスを作成するときに必要になります。
ここでは素直に console.cloud.google.com で行いました。左側のメニューで「IAMと管理」のサブ項目にある「サービスアカウント」を選択します。ここで「+サービスアカウントを作成」を選択します。
あとは流れに沿って進め、PubSub について必要な権限を付与します。
キーを発行できるので、json ファイルをダウンロードします。
ファイル名は任意で構いません。なお、default では環境変数に設定してあるキーファイルを読むようなので、プログラムで明示的にkeyファイルを指定しない場合は、環境変数を設定します。(無くても良いです。)
$ export GOOGLE_APPLICATION_CREDENTIALS="./myproject-mykey.json"
5. python application でpublish する
開発上は、python の仮想環境を用意してやるのがマナーのようです。
python モジュール google-cloud-pubsub がインストールされていれば、以下のプログラムを実行できると思います。
import sys, os, time, datetime
from google.cloud import pubsub_v1
project_id, topic_name = sys.argv[1], sys.argv[2]
cred_file = sys.argv[3]
publisher = pubsub_v1.publisher.Client.from_service_account_file(cred_file)
topic_path = publisher.topic_path(project_id, topic_name)
cnt = 0
while True:
data = u"Message from test publisher {}".format(cnt) + datetime.datetime.now().isoformat(" ")
data = data.encode("utf-8")
print("Publish: " + data.decode("utf-8", "ignore") )
future = publisher.publish(topic_path, data=data)
print("return ", future.result())
time.sleep(0.25)
cnt = cnt + 1
もし環境変数にkeyファイルが指定してあれば、publisher のinstance は以下で作られます。
publisher = pubsub_v1.PublisherClient()
使い方は、
$ python3 pub.py my-project-id my-topic my-key.json
6. python で subscribe したメッセージを表示する
以下のプログラムを使います。
import os, sys, time, datetime
from google.cloud import pubsub_v1
def callback(message):
now = datetime.datetime.now()
print( "msg = \"" + message.data.decode("utf-8") + "\"" + " [" + now.isoformat(" ") + "]")
message.ack()
project_id, sub_name, cred_file = sys.argv[1], sys.argv[2], sys.argv[3]
subscriber = pubsub_v1.subscriber.Client.from_service_account_file(cred_file)
subpath = subscriber.subscription_path(project_id, sub_name)
flow_control = pubsub_v1.types.FlowControl(max_messages=2)
subscriber.subscribe(subpath, callback=callback, flow_control = flow_control)
input()
使い方は、
$ python3 sub.py my-project-id my-subscription my-key.json
実際の動かすと、publish したときのシステムの時刻とsub.py でmessage を受信したときの時刻のずれを読めます。
msg = "Message from test publisher 262020-01-12 13:19:32.691757" [2020-01-12 13:19:32.793407]
msg = "Message from test publisher 272020-01-12 13:19:33.009596" [2020-01-12 13:19:33.090725]
msg = "Message from test publisher 282020-01-12 13:19:33.285470" [2020-01-12 13:19:33.335753]
msg = "Message from test publisher 292020-01-12 13:19:33.566437" [2020-01-12 13:19:33.619432]
msg = "Message from test publisher 302020-01-12 13:19:33.845906" [2020-01-12 13:19:33.896490
References
Google 提供の情報
- Receiving messages using Pull:サンプルコードの解説。必要に応じてしっかり読む価値がありそうです。ここの最後にある、メッセージごとに処理のプロセスを立ち上げる実装のデザイン等、参考になります。
- Python Client for Google Cloud Pub / Sub: API仕様
参考にさせていただいたブログ
雑感
今回、subscription のpull/push のタイプの違いがあるところや、認証が必要なところでハマりました。涙
使い方は分かったのですが、これをDocker, cloud build, Kubernetes で動かすとすると先が長いな。。。真面目に本稿を書いて疲れた。とは言え、実際に多対多でたくさんの種類のメッセージをやりとりするテストをしていないので、使う前にこれらもテストはしてみたいと思います。