LoginSignup
14
9

More than 3 years have passed since last update.

【GCP】Google Pub/Sub の送受信確認(gcloud コマンド & python3)

Last updated at Posted at 2020-01-12

GCP の Cloud Pub/Sub を使い方を、gcloud コマンドとpython で確認できたのでメモします。自分の理解を図にしたものです。

20200112-cloudpubsub.png

以前に同じpubsub モデルの枠組みとしてMQTTを利用したことがありますが、そのときはtopic以外に意識はしなかったのですが、ここではsubscription というモジュールを通してより細かい?制御ができるようです。また、subscriber が取得していない message は補間できるなど、思っていた以上の機能があるようです。

概要

以下に、今回理解した内容のテストの実行手順を示します。

  1. GCP に topic と subscription を作成する。
  2. メッセージを topic に publish する。
  3. subscription から pull する。
  4. (for python) publish/subscriberするための権限を証明するkey file(json)を用意します。
  5. publisher, subscriber のプログラムを各ターミナルで実行

環境は Ubuntu (WSL)、power shell でそれぞれ確認しました。ソースは以下にあります。

LINK: github/xtkd77/test-docker-python/samples/gcp-pubsub-python-20200112

少しだけCloud pubsub のメモ

Subscription には pushとpullの2つのタイプが用意されています(説明)。Pull メッセージを任意のタイミングに発行して取りに行くことができ、Polling でも実装ができるようです

subscriber_pull.png

。実際にpython で実装するときは、message が届いたときに発行されるcallback 関数があり、そこに処理を実装するようにAPIが設計されているので、polling をベタに実装する必要はありません。

1. Topic と Subscription の用意

Topic を作り、そのtopic を購読するための subscription を作成します。

$ export TOPIC_ID="topic0000"
$ export SUBSCRIPTION_ID="subsc0000"
$ gcloud pubsub topics create ${TOPIC_ID}
$ gcloud pubsub subscriptions create ${SUBSCRIPTION_ID} --topic=${TOPIC_ID}

作られたものを確認します。pushConfig が無いsubscriptionがpull 型というのかな?

$ gcloud pubsub subscriptions list
---
ackDeadlineSeconds: 10
expirationPolicy:
  ttl: 2678400s
messageRetentionDuration: 604800s
name: projects/my-project-id/subscriptions/subsc0000
pushConfig: {}
topic: projects/my-project-id/topics/topic0000

$ gcloud pubsub topics list
---
name: projects/my-project-id/topics/topic0000

2. publish する

puslish コマンドを実行すると、Message IDが返ってきます。

$ gcloud pubsub topics publish ${TOPIC_ID} --message "Hello World!"
messageIds:
- '926358110170555'

また、console.cloud.google.com の Pub/Sub のtopic のページからこの topic のページを開き、そこからメッセージを発行することもできます。

json をpublish (追記)

ファイル(json とかを保存したもの)の内容をそのままpublish したいとき、以下のコマンドを使っています。

$ gcloud pubsub toipcs publish ${topic_id} --message "`cat myfile.json`"

3. subscribe する

$ gcloud pubsub subscriptions pull --auto-ack ${SUBSCRIPTION_ID}
┌──────────────┬─────────────────┬────────────┐
│     DATA     │    MESSAGE_ID   │ ATTRIBUTES │
├──────────────┼─────────────────┼────────────┤
│ Hello World! │ 926358110170555 │            │
└──────────────┴─────────────────┴────────────┘

ここまでは、google SDK で認証が通っているので、できました。

また、console.cloud.google.com の Pub/Sub のsubscriptionのページからこの subscription のページを開き、そこからメッセージをtopic に投稿されたメッセージをpull することもできます。「メッセージを表示」をクリックして出てきた画面で「pull」という青いボタンを押すと、たまっていたメッセージがpull されます。

4. pub and/or sub の権限のあるサービスアカウントのキーを用意

python で実行するとき、publish する(topic を発行する)、subscribe する (subscriptionにアクセスする)ために、認証が必要になります。python のAPIを利用する場合、それはpublisher や subscriber のインスタンスを作成するときに必要になります。

ここでは素直に console.cloud.google.com で行いました。左側のメニューで「IAMと管理」のサブ項目にある「サービスアカウント」を選択します。ここで「+サービスアカウントを作成」を選択します。

2020-01-12-125902.png

あとは流れに沿って進め、PubSub について必要な権限を付与します。
キーを発行できるので、json ファイルをダウンロードします。
ファイル名は任意で構いません。なお、default では環境変数に設定してあるキーファイルを読むようなので、プログラムで明示的にkeyファイルを指定しない場合は、環境変数を設定します。(無くても良いです。)

$ export GOOGLE_APPLICATION_CREDENTIALS="./myproject-mykey.json"

5. python application でpublish する

開発上は、python の仮想環境を用意してやるのがマナーのようです。

python モジュール google-cloud-pubsub がインストールされていれば、以下のプログラムを実行できると思います。

pub.py
import sys, os, time, datetime
from google.cloud import pubsub_v1


project_id, topic_name = sys.argv[1], sys.argv[2]
cred_file = sys.argv[3] 

publisher = pubsub_v1.publisher.Client.from_service_account_file(cred_file)
topic_path = publisher.topic_path(project_id, topic_name)

cnt = 0
while True:
    data = u"Message from test publisher {}".format(cnt) + datetime.datetime.now().isoformat(" ")
    data = data.encode("utf-8")
    print("Publish: " + data.decode("utf-8", "ignore") )
    future = publisher.publish(topic_path, data=data)
    print("return ", future.result())
    time.sleep(0.25)
    cnt = cnt + 1

もし環境変数にkeyファイルが指定してあれば、publisher のinstance は以下で作られます。

publisher = pubsub_v1.PublisherClient()

使い方は、

$ python3 pub.py my-project-id my-topic my-key.json

6. python で subscribe したメッセージを表示する

以下のプログラムを使います。

sub.py
import os, sys, time, datetime
from google.cloud import pubsub_v1

def callback(message):
    now = datetime.datetime.now()
    print( "msg = \"" + message.data.decode("utf-8") + "\"" +  "  [" + now.isoformat(" ") + "]")
    message.ack()

project_id, sub_name, cred_file = sys.argv[1],  sys.argv[2], sys.argv[3]

subscriber = pubsub_v1.subscriber.Client.from_service_account_file(cred_file)

subpath = subscriber.subscription_path(project_id, sub_name)
flow_control = pubsub_v1.types.FlowControl(max_messages=2)

subscriber.subscribe(subpath, callback=callback, flow_control = flow_control)
input()

使い方は、

$ python3 sub.py my-project-id my-subscription my-key.json

実際の動かすと、publish したときのシステムの時刻とsub.py でmessage を受信したときの時刻のずれを読めます。

msg = "Message from test publisher 262020-01-12 13:19:32.691757"  [2020-01-12 13:19:32.793407]
msg = "Message from test publisher 272020-01-12 13:19:33.009596"  [2020-01-12 13:19:33.090725]
msg = "Message from test publisher 282020-01-12 13:19:33.285470"  [2020-01-12 13:19:33.335753]
msg = "Message from test publisher 292020-01-12 13:19:33.566437"  [2020-01-12 13:19:33.619432]
msg = "Message from test publisher 302020-01-12 13:19:33.845906"  [2020-01-12 13:19:33.896490

References

Google 提供の情報

参考にさせていただいたブログ

雑感

今回、subscription のpull/push のタイプの違いがあるところや、認証が必要なところでハマりました。涙
使い方は分かったのですが、これをDocker, cloud build, Kubernetes で動かすとすると先が長いな。。。真面目に本稿を書いて疲れた。とは言え、実際に多対多でたくさんの種類のメッセージをやりとりするテストをしていないので、使う前にこれらもテストはしてみたいと思います。

qs-diag-final.png

14
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
9