BIでGCP触る機会があったのでメモです。
やりたいこと
-
Cloud Scheduler
: 日次でCloud Functions
を動かす
↓ -
Cloud Functions
:GCS
に保存されているファイル名をWorkflows
に連携して動かす
↓ -
Cloud Workflows
: 受け取ったGCS
のパスを元にワークフローを実行
上記の
・Cloud Functions
→ Cloud Workflows
部分で詰まりまくった。
というか実現方法がなかなか見つけられなかった。
事前準備
今回は全てGCPのコンパネ上で作業しました。
特に難しい操作もないので、コンパネ操作については割愛します。
バケットを作成
バケットを作成してCloud Functions
で参照するオブジェクトを作成する。
今回は以下に適当なオブジェクトを設置。
dataflow/data/raw/
Cloud Workflowsを作成
今回は動くまでの確認なので、サンプルをそのまま使う。
# This is a sample workflow to test or replace with your source code.
#
# This workflow returns a list of Wikipedia articles related to a search term.
# The search term is retrieved from a Cloud Function that returns the current day of the week
# (in GMT), unless a search term is given as input (e.g. {"searchTerm": "Monday"}).
main:
params: [input]
steps:
- checkSearchTermInInput:
switch:
- condition: ${"searchTerm" in input}
assign:
- searchTerm: ${input.searchTerm}
next: readWikipedia
- getCurrentTime:
call: http.get
args:
url: https://us-central1-workflowsample.cloudfunctions.net/datetime
result: currentDateTime
- setFromCallResult:
assign:
- searchTerm: ${currentDateTime.body.dayOfTheWeek}
- readWikipedia:
call: http.get
args:
url: https://en.wikipedia.org/w/api.php
query:
action: opensearch
search: ${searchTerm}
result: wikiResult
- returnOutput:
return: ${wikiResult.body[1]}
※ざっくり処理内容
-
checkSearchTermInInput
:検索条件設定(searchTerm
が設定されていたらステップreadWikipedia
に遷移) -
getCurrentTime
:現在日時を取得 -
setFromCallResult
:getCurrentTime
で得た日時から曜日を取得 -
readWikipedia
:wikipediaのAPIコール -
returnOutput
:結果を表示
Cloud Functionsを作成
ランタイムはpython3.7
、エントリはscan_raw_data
で作成。
from google.cloud import storage
from google.cloud.workflows.executions import ExecutionsClient
from google.cloud.workflows.executions_v1.types import Execution
import json
def scan_raw_data(request):
storage_client = storage.Client()
bucket_name = '[バケット名]'
prefix = "dataflow/data/raw/"
delimiter = "/"
blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter=delimiter)
for blob in blobs:
call_workflow(blob.name)
def call_workflow(gcs_path):
# workflowを指定
project = "[プロジェクトID]"
location = "us-central1"
workflow = "[ワークフローID]"
parent = "projects/{}/locations/{}/workflows/{}".format(project, location, workflow)
# リクエストボディを指定
arguments = {"gcs_path": gcs_path}
execution = Execution(argument=json.dumps(arguments))
# ワークフロー実行リクエストを作成
client = ExecutionsClient()
try:
client.create_execution(parent=parent, execution=execution)
except:
print("Error occurred when triggering workflow execution")
※ざっくり処理内容
- GCSバケットのファイル一覧を取得
- ファイル毎に
Workflows
サービスに連携
今回Functions
からWorkflows
にデータを渡す部分については特に触れていませんが、
Execution
の際にargments
を指定することで値をWorkflows
に渡すことが出来ます。
google-cloud-storage==1.40.0
google-cloud-workflows==1.2.1
実行
ここまで出来たら、
Functions
で関数をテスト実行(もしくはScheduler
から実行)を行い、
Functions
とWorkflows
のログを確認して正常に動いてるか確認する。
Workflows
の結果にwikipediaの情報が出力されてたら成功です。
感想
一応これで一連の流れが動作するところまでは確認出来ました。
今回google-cloud-workflows
パッケージの存在に気付くまでが一番時間がかかったので、特に補足みたいなのは無いです…。
※Functions
からCURLでWorkflows API
を呼べるかを必死に調べてた。
あとは…Client Library for Cloud Workflows API
の公式リファレンスが全然理解出来なくて禿げそうでした。
やっぱり英語スキルは偉大だなぁと…。
結果Stack Overflowのサンプルソースで動いただけなので、徐々に理解を深めていきたい。
今回はGCPだと比較的新しめなWorkflows
サービスを利用してみました。
マイクロサービスや外部APIとの組み合わせでちょっとしたワークフローを実現するには結構有用かなぁと。
あまり複雑なことは出来ませんが、yamlも直感的でわかりやすかったです。
ただ制限が多いので、比較的小さめなシステム用な印象でした。
※変数が256KBまでとか、ステップ数が10までとか…(Dataflow
内のステップ数も10が限界でした。)
ただCloud Composer
などに比べると費用も抑えられるので、
うまくワークフローを構築すればそこそこな規模なシステムで利用出来たときのメリットは大きいかもしれません。
ワークフローの構成次第で様々な使い方が出来そうなので、
GCPに触れる良い機会だし、色々試してガンガンインプットしていきたいですね。
参考
https://cloud.google.com/workflows/quotas
https://cloud.google.com/workflows/docs/overview?hl=ja
https://cloud.google.com/workflows/docs/reference/libraries?hl=ja
https://googleapis.dev/python/workflows/latest/index.html
https://stackoverflow.com/questions/64935762/pass-arguments-to-cloud-workflows-using-python