🎮 はじめに
前回、赤線部分の開発を進めてきましたが、これを手動で順番に実行していくのは大変です。
なので今回はデータアップロード後にETL処理を自動実行するワークフローを設計します!
🛠 使用技術
- AWS Step Functions(ワークフロー定義)
- AWS Lambda(Glueジョブ起動)
- AWS Glue(ETL)
- AWS S3(データトリガー)
🧱 ワークフローの全体像
- S3のファイル更新をトリガーに、LambdaでStep functionsを起動
- Step FunctionsがLambdaに順番指示を出し、Glue CrawlerかJobを実行していく
1. S3のファイル更新をトリガーに、LambdaでStep functionsを起動
まずは、LambdaでStep functionsを起動を書いていきます。
実際のコードは以下になります。
import boto3
import json
import logging
# ロギングの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
if 'Records' not in event:
logging.error("Received event does not contain 'Records': %s", json.dumps(event))
return {'statusCode': 400, 'body': 'No Records in event'}
# `event` オブジェクトの内容をログに記録
logger.info("Event: %s", json.dumps(event))
# `context` オブジェクトの一部の属性をログに記録
logger.info("Context: %s", {
'function_name': context.function_name,
'function_version': context.function_version,
'invoked_function_arn': context.invoked_function_arn,
'memory_limit_in_mb': context.memory_limit_in_mb
})
# Step Functionsのクライアントを作成
sf = boto3.client('stepfunctions')
# ステートマシンのARNを指定
state_machine_arn = 'arn:aws:states:your-region:your-account-id:stateMachine:yourStateMachineName'
# S3イベントからファイル名とバケット名を取得
records = event.get('Records', [])
s3_event = records[0].get('s3')
bucket_name = s3_event.get('bucket', {}).get('name')
object_key = s3_event.get('object', {}).get('key')
# 特定のファイルが更新された場合にステートマシンを起動
if "preterite_weather_export.csv" in object_key:
# ステートマシンの入力として渡すデータ
input_data = json.dumps({
"bucket": bucket_name,
"key": object_key
})
# Step Functionsのステートマシンを実行
response = sf.start_execution(
stateMachineArn=state_machine_arn,
input=input_data
)
return {
'statusCode': 200,
'body': json.dumps('Step Function started successfully.')
}
else:
return {
'statusCode': 400,
'body': json.dumps('No action taken. File does not match criteria.')
}
S3イベント受信し、ファイル名をチェック。
条件一致ならStep Functionsステートマシンを実行するという流れで作成しています。
次にS3のトリガーを設定していきます。
これはS3のプロパティタブ → イベント通知をクリックし、先ほどの関数名を入力したら作成できます!
これで、S3のファイル更新をトリガーに、LambdaでStep functionsを起動するようになりました。
2. Step FunctionsがLambdaに順番指示を出し、Glue CrawlerかJobを実行していく
次に、Step FunctionsとLambdaを使って、CrawlerとJobを実行していきます。
実際のコードは以下になります。
Lambda
import boto3
import json
def lambda_handler(event, context):
glue = boto3.client('glue')
action = event['action']
job_or_crawler_name = event['job_or_crawler_name']
if action == 'start_crawler':
response = glue.start_crawler(Name=job_or_crawler_name)
elif action == 'start_job':
response = glue.start_job_run(JobName=job_or_crawler_name)
return {
'statusCode': 200,
'body': json.dumps(f"{action} for {job_or_crawler_name} executed successfully.")
}
Step functions(コード)
{
"Comment": "A workflow to manage Glue jobs and crawlers",
"StartAt": "StartCrawler1",
"States": {
"StartCrawler1": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
"Parameters": {
"job_or_crawler_name": "smash_datalake_crawlers",
"action": "start_crawler"
},
"Next": "StartETLJob1"
},
"StartETLJob1": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
"Parameters": {
"job_or_crawler_name": "datalake_date_conversion",
"action": "start_job"
},
"Next": "StartCrawler2"
},
"StartCrawler2": {
"Type": "Task",
"Resource": "aarn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
"Parameters": {
"job_or_crawler_name": "smash_datawarehouse_crawlers",
"action": "start_crawler"
},
"Next": "DelayBeforeETLJob2"
},
"DelayBeforeETLJob2": {
"Type": "Wait",
"Seconds": 100,
"Next": "StartETLJob2"
},
"StartETLJob2": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
"Parameters": {
"job_or_crawler_name": "datamart_create",
"action": "start_job"
},
"Next": "StartCrawler3"
},
"StartCrawler3": {
"Type": "Task",
"Resource": "arn:aws:lambda:リージョン:アカウントID:function:Lambda関数名",
"Parameters": {
"job_or_crawler_name": "smash_datamart_crawlers",
"action": "start_crawler"
},
"End": true
}
}
}
Step functions(図)
簡単に説明すると、Step Functionsが順番に指示を出して、LambdaがそのたびにGlue CrawlerかJobを実行していくというものになります。
途中で待機(Wait)も挟んで、リソース競合が起こることを防いでいます!
以上がETL処理を自動実行するワークフローになります。
🚀 次回
次回(第7回)は、いよいよ加工したデータを分析していきます!
引き続きよろしくお願いいたします。
https://qiita.com/shota1212/items/182f121d2865a93e0766