3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Google Cloud WorkflowsをPythonから引数指定で実行してみる

Posted at

Workflow登録

引数を取って、30秒スリープ後、結果出力という簡単なワークフローを登録します。

workflow.yaml
main:
  params: [args]
  steps:
    - map_params:
        assign:
          - outputVar: ${"Hello " + args.firstName + " " + args.lastName}
    - sleep:
        call: sys.sleep
        args:
          seconds: 30
    - output:
        return: ${outputVar}
$ gcloud beta workflows deploy myworkflow --location us-central1 --source workflow.yaml 

以下の用に登録されます。

image.png

PythonによるWorkflow起動 :非同期

Workflowに名前を引数として渡すコードを作ってみます。

run_workflow.py
import json
from google.cloud.workflows import WorkflowsClient
from google.cloud.workflows.executions import (
    ExecutionsClient,
    Execution,
)

GCP_PROJECT = "your-googlecloud-project"
WF_LOCATION = "us-central1"
WF_NAME = "myworkflow"


WF_ARGS = {"firstName": "Yusuke", "lastName": "Otomo"}

if __name__ == "__main__":
    workflow_name = WorkflowsClient().workflow_path(
        project=GCP_PROJECT, location=WF_LOCATION, workflow=WF_NAME
    )  # workflow_name -> projects/your-gcp-project/locations/us-central1/workflows/myworkflow

    cli = ExecutionsClient()
    cli.create_execution(
        parent=workflow_name,
        execution=Execution(argument=json.dumps(WF_ARGS)),
    )

実行します。

$ python run_workflow.py

非同期なので、スクリプトはWorkflowをトリガーしてすぐに返ります。

Google Cloudの画面で見るとワークフローが実行中であることがわかります。

image.png

想定通り引数も渡っています。

image.png

PythonによるWorkflow起動 :同期

Workflowの結果を取得するため、ステータスをポーリングしてみます

run_workflow_sync.py
import json
from time import sleep
from google.cloud.workflows import WorkflowsClient
from google.cloud.workflows.executions import (
    ExecutionsClient,
    Execution,
    GetExecutionRequest,
)

GCP_PROJECT = "your-googlecloud-project"
WF_LOCATION = "us-central1"
WF_NAME = "myworkflow"


WF_ARGS = {"firstName": "Yusuke", "lastName": "Otomo"}

if __name__ == "__main__":
    workflow_name = WorkflowsClient().workflow_path(
        project=GCP_PROJECT, location=WF_LOCATION, workflow=WF_NAME
    )  # workflow_name -> projects/your-gcp-project/locations/us-central1/workflows/myworkflow

    cli = ExecutionsClient()
    create_response = cli.create_execution(
        parent=workflow_name,
        execution=Execution(argument=json.dumps(WF_ARGS)),
    )

    while True:
        get_response = cli.get_execution(
            request=GetExecutionRequest(name=create_response.name)
        )
        if get_response.state != Execution.State.ACTIVE:
            print(get_response)
            break

        print(f"waiting... State ->{get_response.state}")
        sleep(5)

実行すると以下のようにWorkflowのOutputの結果を取得できました。

$ python run_workflow_sync.py

waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
waiting... State -> ACTIVE
Workflow Result -> "Hello Yusuke Otomo"

参考

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?