env.pyで環境変数など設定すれば、コマンドでのexportを実施せずに済みますので、楽です!
ポイント:
*サービスアカウントキーファイル(jsonファイル)はenv.py、publish.pyと同じディレクトリに置いておくと、便利です!
*リクエストを Pub/Sub サーバーに送信する際に、
publisher = pubsub_v1.PublisherClient()
を使わず、
publisher = pubsub_v1.publisher.Client.from_service_account_file(JSON_KEY)
に変更すればokです!
env.py
# GCPサービスのプロジェクトID
GOOGLE_CLOUD_PROJECT = '[PROJECT_ID]'
# Cloud Pub/Subのトピック名
TOPIC = '[TOPIC_NAME]'
# サービスアカウントキーファイル(jsonファイル)の保存パース
JSON_KEY = '[PATH_TO_JSON_KEY]'
publish.py
from env import *
from google.cloud import pubsub_v1
# # # メッセージのパブリッシュ
# # リクエストを Pub/Sub サーバーに送信する
publisher = pubsub_v1.publisher.Client.from_service_account_file(JSON_KEY)
# # The `topic_path` method creates a fully qualified identifier
# # in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(GOOGLE_CLOUD_PROJECT, TOPIC)
# # エラーハンドラー
futures = dict()
def get_callback(f, data):
def callback(f):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
return callback
# # トピック(topic_name)にメッセージをパブリッシュする
for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path, data=data.encode("utf-8") # data must be a bytestring.
)
futures[data] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, data))
# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)
print("Published messages.")
参考リンク:
1、https://cloud.google.com/pubsub/docs/publisher?hl=ja
2、https://googleapis.dev/python/pubsub/latest/publisher/api/client.html
3、https://qiita.com/xtkd/items/61e16fd980e5ea87519b
4、https://qiita.com/shibacow/items/cb3b011b41bcc47f5e4c