LoginSignup
1
1

More than 1 year has passed since last update.

GCP Workflows使って調査結果のcsvデータ作成+調査結果のヒートマップhtmlのビルドを行う仕組みを作った話

Last updated at Posted at 2022-07-14

弊社が提供しているタカノメサービスについて、GCP Workflowsを用いることで調査結果の出力フローを自動化することができました。本稿では、構築したワークフローの概要を紹介いたします。

開発背景

陸ごみ散布状況の調査サービスであるタカノメにおいて、調査のたびに調査結果を出力しています。これまではローカル上でPythonスクリプトを動かす必要があり、コードバージョンや秘密鍵の管理、環境構築面での課題がありました。そこで、社内の管理ページから調査結果を出力指示できるようにし、運用スタッフがコードを触らなくても良いように改善することとなりました。

ワークフローの構築

下図の様に構築しました。大まかな流れは以下の通りです。

  1. 変数定義: GCPプロジェクトID(projectId), ワークフローの実行ID(executionId), Cloud BuildのトリガーID(triggerId)を定義する
  2. 変数マッピング: firestoreで配列型のフィールドを追加するため、配列データを型変換する
  3. 管理データ作成: 調査結果の管理データを作成する。このデータで、ワークフローの処理成否も記録する
  4. 通常処理
    1. Cloud Functionsにて、調査結果データ(csv, xlsx, json)を作成する
    2. Cloud Buildにて、調査結果のヒートマップhtmlを作成する
    3. 管理データにワークフローの処理OKを記録する
  5. 例外処理: 調査結果を作成できなかった場合、管理データにワークフローの処理NGを記録する

タカノメ 調査結果作成ワークフロー

1. 変数定義

以下の通り、変数を定義しました。

main:
    params: [input]  # ワークフロー呼び出し時に指定したパラメータがinputに入る
    steps:
    - setParams:
        assign:
            - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}  # 実行しているワークフローのプロジェクトID
            - executionId: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}  # ワークフローの実行ID
            - triggerId: "実行したいCloud BuildのトリガーID"
            - targetIdsForFiretore: []   # 次のステップで変数マッピングする際に値を設定する

GCP Workflowsではいくつかの環境変数が事前に定義されていて、ワークフロー上で読み出すことができます1

2. 変数マッピング

ワークフローからfirestoreのフィールドを追加するとき、Method: googleapis.firestore.v1.projects.databases.documents.createDocumentで定義されているフォーマットに従ってフィールドを指定する必要があります。プリミティブな型(string, timestampなど)であれば、以下の様に フィールド名.型Valueの形式で指定すればOKです。

            fields:
              hoge:
                stringValue: ${input.hoge}
              foo:
                timestampValue: ${time.format(sys.now())}

問題は配列型のデータの場合で、json形式での配列をそのままフィールド名.arrayValueに渡してもエラーになります。arrayValueは

            fields:
              bar:
                arrayValue:
                  values:
                    - stringValue: "aaa"
                    - stringValue: "bbb"
                    - stringValue: "ccc"

という形で、arrayValue.values[]下に stringValue: "xxx" や timestampValue: YYYY-MM-DDTHH:mm:ss の配列を渡す必要があります2

そこで、以下の様にfor文でlist型のtargetIdsForFiretoreに要素を足し込んでいくことで対応しています。

    - mappingForFirestore:
        for:
            value: v
            in: ${input.targetIds}
            steps:
                - getStep:
                    assign:
                        - newValue:
                            stringValue: ${v}
                        - targetIdsForFiretore: ${list.concat(targetIdsForFiretore, newValue)}

3. 管理データ作成

前ステップで準備したtargetIdsForFiretoreおよびinputに渡した各パラメータを基に、firestoreに管理データdocumentを追加します。

callにてgoogleapis.firestore.v1.projects.databases.documents.createDocumentコネクタを指定し、argsにcollection名やdocumentのIDを指定することでdocumentを作成できます。なお、timestampValueにはISO8601フォーマットの日時文字列を渡すことができます。

    - createSurveySummaryFirestore:
        call: googleapis.firestore.v1.projects.databases.documents.createDocument
        args:
          parent: ${ "projects/" + projectId + "/databases/(default)/documents" }
          collectionId: surveysummary
          documentId: ${executionId}  # documentはワークフローごとに作成する
          body:
            fields:
              targetIds:
                arrayValue:
                  values: ${targetIdsForFiretore}
              name:
                stringValue: ${input.name}
              createdAt:
                timestampValue: ${time.format(sys.now())}
              status:
                stringValue: "prepare"
        result: resp

処理結果で403が返る場合、ワークフローを実行しているサービスアカウントのIAM権限でFirestoreの書き込み権限があるか確認してください。

4. 通常処理

調査結果データ(csv, xlsx)の作成と、ヒートマップhtmlの作成を順に行います。前者はCloud Functions、後者はCloud Buildで実施しますが、いずれも状況によっては失敗する可能性があります。

そこで、通常処理をtry文、例外処理をexcept文で対処します。

    - createSurveyResultSteps:
        try:
            steps:
            - accumulateSurveyResult:
               # 通常時の処理1: Cloud Functionsを呼び出し、調査結果データを作成する
            - buildRenderer:
               # 通常時の処理2: Cloud Buildを呼び出し、ヒートマップhtmlを作成する
            - updateSurveySummaryFirestoreOnSuccess:
               # 通常時の処理3: 管理データについて、処理成功を記録する
        except:
            as: e
            steps:
            - logStep:
               # 例外時の処理1: エラーをロギングする
            - updateSurveySummaryFirestoreOnError:
               # 例外時の処理2: 管理データについて、処理失敗を記録する

4.1. Cloud Functionsにて、調査結果データ(csv, xlsx, json)を作成する

事前に定義したFunctionのcreate_survey_resultを呼び出します。本Functionはus-east1に定義してあるので、下記の通り呼び出します。

            - accumulateSurveyResult:
                call: http.post
                args:
                    url: ${ "https://us-east1-" + $projectId + ".cloudfunctions.net/create_survey_result" }
                    body:
                        request_id: ${executionId}
                        target_ids: ${input.targetIds}
                    auth:
                        type: OIDC
                result: response

このFunctionは要認証で定義しています。ワークフローにおいては、auth.type = OIDCを指定することでサービスアカウントのOpenId Connect トークンをAuthorizationヘッダのBearerに付与することができます。こうして、Functionを認証付きで実行することができます。

4.2. Cloud Buildにて、調査結果のヒートマップhtmlを作成する

Cloud Buildを呼び出すには、googleapis.cloudbuild.v1.projects.triggers.run コネクタを使用します。呼び出しにあたり、projectIdおよびtriggerIdの指定が必要です。また、body.substitutions._変数名 に値を渡すことでビルド時の変数値を指定できます。

            - buildRenderer:
                call: googleapis.cloudbuild.v1.projects.triggers.run
                args:
                  projectId: ${projectId}
                  triggerId: ${triggerId}
                  body:
                    substitutions:
                      _REQUEST_ID: ${executionId}  # ワークフローの実行IDを渡し、前ステップで作成した調査結果データを参照できるようにする
                result: runResult

4.3. 管理データにワークフローの処理OKを記録する

一連の処理が成功したことを管理データに記録します。googleapis.firestore.v1.projects.databases.documents.patchによりdocumentをupsertできます。今回は処理結果のフィールド(status)だけを更新したいので、updateMaskでstatusを指定します。

            - updateSurveySummaryFirestoreOnSuccess:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${ "projects/" + projectId + "/databases/(default)/documents/surveysummary/" + executionId}  # ここでcollectionIdとdocumentIdを指定する
                  updateMask:
                    fieldPaths: [ "status" ]  # 値を更新したいフィールドの一覧をここで指定する
                  body:
                    fields:
                      status:
                        stringValue: "success"
                result: resp

5. 例外処理

通常処理の途中で例外が発生し、調査結果を作成できないときに本フローを実行します。本フローではエラーが発生したことの記録が必要だったため、ロギングへのログ出力と管理データへの失敗記録を行っています。

        except:
            as: e
            steps:
            - logStep:
                call: sys.log
                args:
                    text: ${json.encode_to_string(e)}
                    severity: ERROR
            - updateSurveySummaryFirestoreOnError:  # 記載省略。4.3節の内容からstringValueへの代入値を変えただけになるため

ワークフローの呼び出し

フロントエンド側から直接呼び出すことはできないため、Workflows呼び出し用のCloud Functionsを定義しました。

Cloud Functions for Firebaseであれば、公式の@google-cloud/workflowsライブラリを利用できます。createExecutionを呼び出す際、execution.argumentにJSON文字列を渡します。すると、ワークフローに実行時引数(ワークフローのyaml中のinput変数)にデータを渡すことができます。

import * as functions from "firebase-functions";
import { ExecutionsClient, WorkflowsClient } from "@google-cloud/workflows";

export const dispatchCreateSurveyResultWorkflow = functions.https.onCall(
  async (data: Record<string, unknown>) => {
    const config: Record<string, string> = JSON.parse(
      process.env.FIREBASE_CONFIG || "{}"
    );
    const projectId = config["projectId"] || "your-project-id";
    const region = "us-east1";
    if (!data["name"] || !data["targetIds"]) {
      return;
    }

    const client = new ExecutionsClient();
    const parent = new WorkflowsClient().workflowPath(
      projectId,  
      region,  // Workflowsのあるリージョン
      "create_survey_result_workflow"  // Workflows名
    );
    await client.createExecution({
      parent,
      execution: {
        argument: JSON.stringify(data),
      },
    });
    return {};
  }
);

あとはフロントエンドからhttpsCallableなどを使って本Functionを呼び出せば、ワークフローを走らせることができます。

  1. https://cloud.google.com/workflows/docs/reference/environment-variables

  2. https://cloud.google.com/workflows/docs/reference/googleapis/firestore/v1/Overview#arrayvalue

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1