Workflow登録
引数を取って、30秒スリープ後、結果出力という簡単なワークフローを登録します。
workflow.yaml
main:
params: [args]
steps:
- map_params:
assign:
- outputVar: ${"Hello " + args.firstName + " " + args.lastName}
- sleep:
call: sys.sleep
args:
seconds: 30
- output:
return: ${outputVar}
$ gcloud beta workflows deploy myworkflow --location us-central1 --source workflow.yaml
以下の用に登録されます。
PythonによるWorkflow起動 :非同期
Workflowに名前を引数として渡すコードを作ってみます。
run_workflow.py
import json
from google.cloud.workflows import WorkflowsClient
from google.cloud.workflows.executions import (
ExecutionsClient,
Execution,
)
GCP_PROJECT = "your-googlecloud-project"
WF_LOCATION = "us-central1"
WF_NAME = "myworkflow"
WF_ARGS = {"firstName": "Yusuke", "lastName": "Otomo"}
if __name__ == "__main__":
workflow_name = WorkflowsClient().workflow_path(
project=GCP_PROJECT, location=WF_LOCATION, workflow=WF_NAME
) # workflow_name -> projects/your-gcp-project/locations/us-central1/workflows/myworkflow
cli = ExecutionsClient()
cli.create_execution(
parent=workflow_name,
execution=Execution(argument=json.dumps(WF_ARGS)),
)
実行します。
$ python run_workflow.py
非同期なので、スクリプトはWorkflowをトリガーしてすぐに返ります。
Google Cloudの画面で見るとワークフローが実行中であることがわかります。
想定通り引数も渡っています。
PythonによるWorkflow起動 :同期
Workflowの結果を取得するため、ステータスをポーリングしてみます
run_workflow_sync.py
import json
from time import sleep
from google.cloud.workflows import WorkflowsClient
from google.cloud.workflows.executions import (
ExecutionsClient,
Execution,
GetExecutionRequest,
)
GCP_PROJECT = "your-googlecloud-project"
WF_LOCATION = "us-central1"
WF_NAME = "myworkflow"
WF_ARGS = {"firstName": "Yusuke", "lastName": "Otomo"}
if __name__ == "__main__":
workflow_name = WorkflowsClient().workflow_path(
project=GCP_PROJECT, location=WF_LOCATION, workflow=WF_NAME
) # workflow_name -> projects/your-gcp-project/locations/us-central1/workflows/myworkflow
cli = ExecutionsClient()
create_response = cli.create_execution(
parent=workflow_name,
execution=Execution(argument=json.dumps(WF_ARGS)),
)
while True:
get_response = cli.get_execution(
request=GetExecutionRequest(name=create_response.name)
)
if get_response.state != Execution.State.ACTIVE:
print(get_response)
break
print(f"waiting... State ->{get_response.state}")
sleep(5)
実行すると以下のようにWorkflowのOutputの結果を取得できました。
$ python run_workflow_sync.py
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
Workflow Result -> "Hello Yusuke Otomo"
参考