2
2

More than 1 year has passed since last update.

Cloud FunctionsからCloud Workflowsを呼び出す

Last updated at Posted at 2021-08-11

BIでGCP触る機会があったのでメモです。

やりたいこと

  • Cloud Scheduler: 日次でCloud Functionsを動かす
  • Cloud Functions: GCSに保存されているファイル名をWorkflowsに連携して動かす
  • Cloud Workflows: 受け取ったGCSのパスを元にワークフローを実行

上記の
Cloud FunctionsCloud Workflows
部分で詰まりまくった。
というか実現方法がなかなか見つけられなかった。

事前準備

今回は全てGCPのコンパネ上で作業しました。
特に難しい操作もないので、コンパネ操作については割愛します。

バケットを作成

バケットを作成してCloud Functionsで参照するオブジェクトを作成する。
今回は以下に適当なオブジェクトを設置。
dataflow/data/raw/

Cloud Workflowsを作成

今回は動くまでの確認なので、サンプルをそのまま使う。

workflow.yaml
# This is a sample workflow to test or replace with your source code.
#
# This workflow returns a list of Wikipedia articles related to a search term.
# The search term is retrieved from a Cloud Function that returns the current day of the week
# (in GMT), unless a search term is given as input (e.g. {"searchTerm": "Monday"}).
main:
    params: [input]
    steps:
    - checkSearchTermInInput:
        switch:
            - condition: ${"searchTerm" in input}
              assign:
                - searchTerm: ${input.searchTerm}
              next: readWikipedia
    - getCurrentTime:
        call: http.get
        args:
            url: https://us-central1-workflowsample.cloudfunctions.net/datetime
        result: currentDateTime
    - setFromCallResult:
        assign:
            - searchTerm: ${currentDateTime.body.dayOfTheWeek}
    - readWikipedia:
        call: http.get
        args:
            url: https://en.wikipedia.org/w/api.php
            query:
                action: opensearch
                search: ${searchTerm}
        result: wikiResult
    - returnOutput:
            return: ${wikiResult.body[1]}

※ざっくり処理内容

  1. checkSearchTermInInput:検索条件設定(searchTermが設定されていたらステップreadWikipediaに遷移)
  2. getCurrentTime:現在日時を取得
  3. setFromCallResultgetCurrentTimeで得た日時から曜日を取得
  4. readWikipedia:wikipediaのAPIコール
  5. returnOutput:結果を表示

Cloud Functionsを作成

ランタイムはpython3.7、エントリはscan_raw_dataで作成。

main.py
from google.cloud import storage
from google.cloud.workflows.executions import ExecutionsClient
from google.cloud.workflows.executions_v1.types import Execution
import json

def scan_raw_data(request):
    storage_client = storage.Client()

    bucket_name = '[バケット名]'
    prefix = "dataflow/data/raw/"
    delimiter = "/"
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)
    for blob in blobs:
        call_workflow(blob.name)


def call_workflow(gcs_path):
    # workflowを指定
    project = "[プロジェクトID]"
    location = "us-central1"
    workflow = "[ワークフローID]"
    parent = "projects/{}/locations/{}/workflows/{}".format(project, location, workflow)

    # リクエストボディを指定
    arguments = {"gcs_path": gcs_path}
    execution = Execution(argument=json.dumps(arguments))

    # ワークフロー実行リクエストを作成
    client = ExecutionsClient()

    try:
        client.create_execution(parent=parent, execution=execution)
    except:
        print("Error occurred when triggering workflow execution")

※ざっくり処理内容

  1. GCSバケットのファイル一覧を取得
  2. ファイル毎にWorkflowsサービスに連携

今回FunctionsからWorkflowsにデータを渡す部分については特に触れていませんが、
Executionの際にargmentsを指定することで値をWorkflowsに渡すことが出来ます。
 

requirements.txt
google-cloud-storage==1.40.0
google-cloud-workflows==1.2.1

実行

ここまで出来たら、
Functionsで関数をテスト実行(もしくはSchedulerから実行)を行い、
FunctionsWorkflowsのログを確認して正常に動いてるか確認する。
Workflowsの結果にwikipediaの情報が出力されてたら成功です。

感想

一応これで一連の流れが動作するところまでは確認出来ました。
今回google-cloud-workflowsパッケージの存在に気付くまでが一番時間がかかったので、特に補足みたいなのは無いです…。
FunctionsからCURLでWorkflows APIを呼べるかを必死に調べてた。

あとは…Client Library for Cloud Workflows API の公式リファレンスが全然理解出来なくて禿げそうでした。
やっぱり英語スキルは偉大だなぁと…。
結果Stack Overflowのサンプルソースで動いただけなので、徐々に理解を深めていきたい。

 
今回はGCPだと比較的新しめなWorkflowsサービスを利用してみました。
マイクロサービスや外部APIとの組み合わせでちょっとしたワークフローを実現するには結構有用かなぁと。
あまり複雑なことは出来ませんが、yamlも直感的でわかりやすかったです。

ただ制限が多いので、比較的小さめなシステム用な印象でした。
※変数が256KBまでとか、ステップ数が10までとか…(Dataflow内のステップ数も10が限界でした。)
ただCloud Composerなどに比べると費用も抑えられるので、
うまくワークフローを構築すればそこそこな規模なシステムで利用出来たときのメリットは大きいかもしれません。

ワークフローの構成次第で様々な使い方が出来そうなので、
GCPに触れる良い機会だし、色々試してガンガンインプットしていきたいですね。

参考

https://cloud.google.com/workflows/quotas
https://cloud.google.com/workflows/docs/overview?hl=ja
https://cloud.google.com/workflows/docs/reference/libraries?hl=ja
https://googleapis.dev/python/workflows/latest/index.html
https://stackoverflow.com/questions/64935762/pass-arguments-to-cloud-workflows-using-python

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2