8
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

GCP:Pub/SubからCloud Functions、Cloud Functions からPub/Sub、を繰り返す

Posted at

以下今回のアーキテクチャイメージです。

image.png

#用意するモノ
・Pub/SubでTopic2つ
 ┗「topic_1」と「topic_2」という名前とする

・Cloud Scheduler
 ┗トピックに「topic_1」を設定し、ペイロードは「hello」とする

・Cloud FunctionsでFunctionを2つ
 ┗「function_1」と「function_2」という名前とする
  「function_1」のトリガーはPub/Subの「topic_1」を設定する
  「function_2」のトリガーはPub/Subの「topic_2」を設定する

function_1の中身

以下の「event_message」ではtopic_1を経由したCloud Schedulerで設定したペイロード「hello」とかの文字列が格納されている。

N個のCloud Schedulerをペイロードだけ変えてほか同じ設定にすると、function_1 内で event_message を判定させて後続処理をごにょごにょできる

.py
def main(event, context):
    event_message = base64.b64decode(event['data']).decode('utf-8')

次に、function_2にリストを渡すと仮定する
Pub/Subではテキストのみが渡せるのでencodeさせる必要がある

.py
from google.cloud import pubsub_v1

PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()

topic_id = "topic_2" # 次にパスするトピックを設定

topic_path = client.topic_path(PROJECT_ID, topic_id)

pub_text = ["りんご", "ゴリラ", "ラッパ"]

data = pub_text.encode() # ここでエンコード
client.publish(topic_path, data=data) # これでtopic_2にpub_textがpushされる

上記の最終行でtopic_2へ'["りんご", "ゴリラ", "ラッパ"]'が渡されてfunction_2が発火している。

function_2の中身

以下 event_message では'["りんご", "ゴリラ", "ラッパ"]'が入っているのでevalしてpythonのリストに戻す。
後続処理はよしなに

.py
def main(event, context):
    event_message = base64.b64decode(event['data']).decode('utf-8')
    fruit_lst = eval(event_message)

function_2を並列起動させてみる

Cloud Functionsは以下公式ドキュメントの通り、並列起動ができる
https://cloud.google.com/functions/quotas?hl=ja#scalability

そのためfunction_1を以下のようにループさせながらPub/Subへpublishさせると、function_2では3つのフルーツを取得することになる

function_1.py
from google.cloud import pubsub_v1

PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()

topic_id = "topic_2"

topic_path = client.topic_path(PROJECT_ID, topic_id)

fruit_lst = ["りんご", "ゴリラ", "ラッパ"]

for fruit in fruit_lst:
    data = fruit.encode()
    client.publish(topic_path, data=data)

参考文献

https://cloud.google.com/solutions/streaming-data-from-cloud-storage-into-bigquery-using-cloud-functions?hl=ja
https://cloud.google.com/functions/quotas?hl=ja#scalability

余談

list や dict も encode() して Pub/Sub へ渡して受け取った側で eval すれば元通りなので、複数の軽い処理を定期的に実行する、という場合に便利でした。

8
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?