以下今回のアーキテクチャイメージです。
#用意するモノ
・Pub/SubでTopic2つ
┗「topic_1」と「topic_2」という名前とする
・Cloud Scheduler
┗トピックに「topic_1」を設定し、ペイロードは「hello」とする
・Cloud FunctionsでFunctionを2つ
┗「function_1」と「function_2」という名前とする
「function_1」のトリガーはPub/Subの「topic_1」を設定する
「function_2」のトリガーはPub/Subの「topic_2」を設定する
function_1の中身
以下の「event_message」ではtopic_1を経由したCloud Schedulerで設定したペイロード「hello」とかの文字列が格納されている。
N個のCloud Schedulerをペイロードだけ変えてほか同じ設定にすると、function_1 内で event_message を判定させて後続処理をごにょごにょできる
def main(event, context):
event_message = base64.b64decode(event['data']).decode('utf-8')
次に、function_2にリストを渡すと仮定する
Pub/Subではテキストのみが渡せるのでencodeさせる必要がある
from google.cloud import pubsub_v1
PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()
topic_id = "topic_2" # 次にパスするトピックを設定
topic_path = client.topic_path(PROJECT_ID, topic_id)
pub_text = ["りんご", "ゴリラ", "ラッパ"]
data = pub_text.encode() # ここでエンコード
client.publish(topic_path, data=data) # これでtopic_2にpub_textがpushされる
上記の最終行でtopic_2へ'["りんご", "ゴリラ", "ラッパ"]'が渡されてfunction_2が発火している。
function_2の中身
以下 event_message では'["りんご", "ゴリラ", "ラッパ"]'が入っているのでevalしてpythonのリストに戻す。
後続処理はよしなに
def main(event, context):
event_message = base64.b64decode(event['data']).decode('utf-8')
fruit_lst = eval(event_message)
function_2を並列起動させてみる
Cloud Functionsは以下公式ドキュメントの通り、並列起動ができる
https://cloud.google.com/functions/quotas?hl=ja#scalability
そのためfunction_1を以下のようにループさせながらPub/Subへpublishさせると、function_2では3つのフルーツを取得することになる
from google.cloud import pubsub_v1
PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()
topic_id = "topic_2"
topic_path = client.topic_path(PROJECT_ID, topic_id)
fruit_lst = ["りんご", "ゴリラ", "ラッパ"]
for fruit in fruit_lst:
data = fruit.encode()
client.publish(topic_path, data=data)
参考文献
https://cloud.google.com/solutions/streaming-data-from-cloud-storage-into-bigquery-using-cloud-functions?hl=ja
https://cloud.google.com/functions/quotas?hl=ja#scalability
余談
list や dict も encode() して Pub/Sub へ渡して受け取った側で eval すれば元通りなので、複数の軽い処理を定期的に実行する、という場合に便利でした。