0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Step Functionsを使ってStorage Transfer Serviceの転送ジョブを管理する

Posted at

概要

https://qiita.com/okadarhi/items/5ff734c6472b83d51597 の続き。
AWS側で転送元のデータを作成するが、作成処理が完全に完了してから転送ジョブを実行して欲しいという要望があり、スケジュール実行ではなく、AWS側から手動で転送ジョブを実行する方法を検討した。
転送ジョブを実行するAPIがあるのだが、それは実行しかしてくれず、転送ジョブの成否は別のAPIで確認する必要があった。
実行するAPI: https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/run
確認するAPI: https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferOperations/get
なので、転送ジョブの実行と確認を管理するジョブを作成しようと思い、Step Functionsを採用することにした。
その時行った検証作業を備忘録としてまとめる。

手順

Lambda関数の作成

レイヤーの作成

Google APIを実行するためのpythonクライアントライブラリーをLambdaで実行するためのレイヤーを作成する。
まずは、Cloud9上でLambdaレイヤーに使用するライブラリーファイルを作成する。

admin:~/environment $ mkdir python
admin:~/environment $ pip install -t ./python google-api-python-client boto3
admin:~/environment $ zip -r google-api-python-client.zip python

次に、作成したライブラリーファイルをLambdaレイヤーのアップロードする。
スクリーンショット 2021-10-31 2.32.06.png

Workload Identityの認証情報ファイルの配置

Workload Identityを使用してLambda関数でGoogle APIを実行するため、参照用の認証情報ファイルをS3バケットに格納する。
まずは、バケットを作成する。
スクリーンショット 2021-10-31 2.34.20.png
次に、認証情報ファイルをアップロードする。
スクリーンショット 2021-10-31 2.34.57.png

Storage Transfer Serviceの転送ジョブを実行するLambda関数の作成

Storage Transfer Serviceの転送ジョブを実行するLambda関数を作成する。

import os
import boto3
from pprint import pprint
from googleapiclient import discovery
import google.auth

s3 = boto3.resource('s3')

def lambda_handler(event, context):

    bucket = s3.Bucket(os.environ['config_bucket_name'])
    bucket.download_file(os.environ['client_library_config'], '/tmp/' + os.environ['client_library_config'])
    
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/tmp/' + os.environ['client_library_config']
    scopes=['https://www.googleapis.com/auth/cloud-platform']
    credentials, project = google.auth.default(scopes=scopes)
    service = discovery.build('storagetransfer', 'v1', credentials=credentials)

    job_name = os.environ['job_name']
    job_body = {
        "projectId": os.environ['project_id']
    }
    
    request = service.transferJobs().run(jobName=job_name, body=job_body)
    response = request.execute()
    
    pprint(response)
    
    return response

ランタイムはpython3.9(x86_64)を指定する。
スクリーンショット 2021-10-31 2.42.18.png

作成したレイヤーを指定する。
スクリーンショット 2021-10-31 2.43.05.png

デフォルトのタイムアウト(3秒)は短いので、1分に変更する。
スクリーンショット 2021-10-31 2.43.18.png

コードで指定した環境変数を設定する。
※ここでは https://qiita.com/okadarhi/items/5ff734c6472b83d51597 で作成した転送ジョブを指定している。
スクリーンショット 2021-10-31 2.44.18.png

Storage Transfer Serviceの転送ジョブのステータスを確認するLambda関数の作成

Storage Transfer Serviceの転送ジョブのステータスを確認するLambda関数を作成する。

import os
import boto3
from pprint import pprint
from googleapiclient import discovery
import google.auth

s3 = boto3.resource('s3')

def lambda_handler(event, context):

    bucket = s3.Bucket(os.environ['config_bucket_name'])
    bucket.download_file(os.environ['client_library_config'], '/tmp/' + os.environ['client_library_config'])
    
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/tmp/' + os.environ['client_library_config']
    scopes=['https://www.googleapis.com/auth/cloud-platform']
    credentials, project = google.auth.default(scopes=scopes)
    service = discovery.build('storagetransfer', 'v1', credentials=credentials)

    name = event['Payload']['name']

    request = service.transferOperations().get(name=name)
    response = request.execute()
    
    pprint(response)
    
    return response

ランタイムはpython3.9(x86_64)を指定する。
スクリーンショット 2021-10-31 2.42.18.png

作成したレイヤーを指定する。
スクリーンショット 2021-10-31 2.43.05.png

デフォルトのタイムアウト(3秒)は短いので、1分に変更する。
スクリーンショット 2021-10-31 2.43.18.png

コードで指定した環境変数を設定する。
スクリーンショット 2021-10-31 3.00.12.png

Step Functionsの作成

Storage Transfer Serviceの転送ジョブを実行して、ステータスを確認するStep Functionsを作成する
stepfunctions_graph.png

Run: Storage Transfer Serviceの転送ジョブを実行するLambda関数を呼び出している。
Get: Storage Transfer Serviceの転送ジョブのステータスを確認するLambda関数を呼び出している。
          Runで実行した転送ジョブを確認する。
Check: Getで取得した転送ジョブのステータスを確認する。
              ・ステータスがIN_PROGRESS→Waitステートに遷移し、一定時間経過後に再度Getを実行する。
              ・ステータスがSUCCESS→Successステートに遷移する。
              ・ステータスが上記以外→Failステートに遷移する。

実際の定義内容は以下の通り。

{
  "Comment": "This is your state machine",
  "StartAt": "Run",
  "States": {
    "Run": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "<Storage Transfer Serviceの転送ジョブを実行するLambda関数>"
      },
      "Retry": [<デフォルト設定>],
      "Next": "Get"
    },
    "Get": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "<Storage Transfer Serviceの転送ジョブのステータスを確認するLambda関数>"
      },
      "Retry": [<デフォルト設定>],
      "Next": "Check"
    },
    "Check": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.Payload.metadata.status",
          "StringMatches": "IN_PROGRESS",
          "Next": "Wait"
        },
        {
          "Variable": "$.Payload.metadata.status",
          "StringMatches": "SUCCESS",
          "Next": "Success"
        }
      ],
      "Default": "Fail"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 10,
      "Next": "Get"
    },
    "Fail": {
      "Type": "Fail"
    },
    "Success": {
      "Type": "Succeed"
    }
  }
}

Step Functionsの実行

作成したStep Functionsを実行する。
スクリーンショット 2021-10-31 3.29.07.png
→正常終了したことが確認できる。

スクリーンショット 2021-10-31 3.29.53.png
→Google Cloud側でもStorage Transfer Serviceの転送ジョブが成功していることが確認できる。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?