0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【第6回】Step Functions + Lambdaで自動処理化(ワークフロー設計)

Last updated at Posted at 2025-05-05

🎮 はじめに

設計図_6.png
前回、赤線部分の開発を進めてきましたが、これを手動で順番に実行していくのは大変です。

なので今回はデータアップロード後にETL処理を自動実行するワークフローを設計します!


🛠 使用技術

  • AWS Step Functions(ワークフロー定義)
  • AWS Lambda(Glueジョブ起動)
  • AWS Glue(ETL)
  • AWS S3(データトリガー)

🧱 ワークフローの全体像

  1. S3のファイル更新をトリガーに、LambdaでStep functionsを起動
  2. Step FunctionsがLambdaに順番指示を出し、Glue CrawlerかJobを実行していく

1. S3のファイル更新をトリガーに、LambdaでStep functionsを起動

まずは、LambdaでStep functionsを起動を書いていきます。
実際のコードは以下になります。

import boto3
import json
import logging

# ロギングの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):

    if 'Records' not in event:
        logging.error("Received event does not contain 'Records': %s", json.dumps(event))
        return {'statusCode': 400, 'body': 'No Records in event'}

    # `event` オブジェクトの内容をログに記録
    logger.info("Event: %s", json.dumps(event))

    # `context` オブジェクトの一部の属性をログに記録
    logger.info("Context: %s", {
        'function_name': context.function_name,
        'function_version': context.function_version,
        'invoked_function_arn': context.invoked_function_arn,
        'memory_limit_in_mb': context.memory_limit_in_mb
    })

    # Step Functionsのクライアントを作成
    sf = boto3.client('stepfunctions')

    # ステートマシンのARNを指定
    state_machine_arn = 'arn:aws:states:your-region:your-account-id:stateMachine:yourStateMachineName'

    # S3イベントからファイル名とバケット名を取得
    records = event.get('Records', [])
    s3_event = records[0].get('s3')

    bucket_name = s3_event.get('bucket', {}).get('name')
    object_key = s3_event.get('object', {}).get('key')

    # 特定のファイルが更新された場合にステートマシンを起動
    if "preterite_weather_export.csv" in object_key:
        # ステートマシンの入力として渡すデータ
        input_data = json.dumps({
            "bucket": bucket_name,
            "key": object_key
        })

        # Step Functionsのステートマシンを実行
        response = sf.start_execution(
            stateMachineArn=state_machine_arn,
            input=input_data
        )

        return {
            'statusCode': 200,
            'body': json.dumps('Step Function started successfully.')
        }
    else:
        return {
            'statusCode': 400,
            'body': json.dumps('No action taken. File does not match criteria.')
        }

S3イベント受信し、ファイル名をチェック。
条件一致ならStep Functionsステートマシンを実行するという流れで作成しています。

次にS3のトリガーを設定していきます。
これはS3のプロパティタブ → イベント通知をクリックし、先ほどの関数名を入力したら作成できます!

これで、S3のファイル更新をトリガーに、LambdaでStep functionsを起動するようになりました。

2. Step FunctionsがLambdaに順番指示を出し、Glue CrawlerかJobを実行していく

次に、Step FunctionsとLambdaを使って、CrawlerとJobを実行していきます。
実際のコードは以下になります。

Lambda

import boto3
import json

def lambda_handler(event, context):
    glue = boto3.client('glue')
    
    action = event['action']
    job_or_crawler_name = event['job_or_crawler_name']

    if action == 'start_crawler':
        response = glue.start_crawler(Name=job_or_crawler_name)
    elif action == 'start_job':
        response = glue.start_job_run(JobName=job_or_crawler_name)

    return {
        'statusCode': 200,
        'body': json.dumps(f"{action} for {job_or_crawler_name} executed successfully.")
    }

Step functions(コード)

{
  "Comment": "A workflow to manage Glue jobs and crawlers",
  "StartAt": "StartCrawler1",
  "States": {
    "StartCrawler1": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
      "Parameters": {
        "job_or_crawler_name": "smash_datalake_crawlers",
        "action": "start_crawler"
      },
      "Next": "StartETLJob1"
    },
    "StartETLJob1": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
      "Parameters": {
        "job_or_crawler_name": "datalake_date_conversion",
        "action": "start_job"
      },
      "Next": "StartCrawler2"
    },
    "StartCrawler2": {
      "Type": "Task",
      "Resource": "aarn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
      "Parameters": {
        "job_or_crawler_name": "smash_datawarehouse_crawlers",
        "action": "start_crawler"
      },
      "Next": "DelayBeforeETLJob2"
    },
    "DelayBeforeETLJob2": {
      "Type": "Wait",
      "Seconds": 100,
      "Next": "StartETLJob2"
    },
    "StartETLJob2": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
      "Parameters": {
        "job_or_crawler_name": "datamart_create",
        "action": "start_job"
      },
      "Next": "StartCrawler3"
    },
    "StartCrawler3": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
      "Parameters": {
        "job_or_crawler_name": "smash_datamart_crawlers",
        "action": "start_crawler"
      },
      "End": true
    }
  }
}

Step functions(図)

step.png

簡単に説明すると、Step Functionsが順番に指示を出して、LambdaがそのたびにGlue CrawlerかJobを実行していくというものになります。
途中で待機(Wait)も挟んで、リソース競合が起こることを防いでいます!

以上がETL処理を自動実行するワークフローになります。


🚀 次回

次回(第7回)は、いよいよ加工したデータを分析していきます!
引き続きよろしくお願いいたします。
https://qiita.com/shota1212/items/182f121d2865a93e0766


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?