はじめに
この記事では、AWS Lambdaを使用して、特定のS3イベントに応じてGlueジョブをトリガーし、その結果をSNSで通知する方法を紹介します。コード例には、すべてダミーの情報が含まれています。
はじめに
AWS Lambdaは、イベントドリブンなサーバーレスコンピューティングサービスです。これにより、インフラストラクチャの管理を行わずにコードを実行できます。AWS Glueは、ETL(Extract, Transform, Load)ジョブを実行するためのマネージドサービスであり、AWS SNSは通知サービスです。
以下のコードでは、S3にファイルがアップロードされると、LambdaがトリガーされてGlueジョブを開始し、結果をSNSで通知します。
コードの詳細
必要なライブラリのインポート
import boto3
from urllib.parse import unquote
import logging
import time
-
boto3
: AWS SDK for Python。AWSサービスと対話するために使用します。 -
unquote
: URLエンコードされた文字列をデコードするために使用します。 -
logging
: ログの設定。 -
time
: ジョブのステータスをポーリングするために使用します。
ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
- ログのレベルをINFOに設定します。
Lambdaハンドラー関数
def lambda_handler(event, context):
glue_client = boto3.client('glue')
s3_client = boto3.client('s3')
sns_client = boto3.client('sns')
-
glue_client
,s3_client
,sns_client
を作成して、それぞれのAWSサービスと対話します。
SNSトピックのマッピング
sns_topics = {
'folder1': 'arn:aws:sns:ap-northeast-1:123456789012:folder1_topic',
'folder2': 'arn:aws:sns:ap-northeast-1:123456789012:folder2_topic',
'folder3': 'arn:aws:sns:ap-northeast-1:123456789012:folder3_topic'
}
- 各フォルダに対応するSNSトピックのARNをマッピングします。
イベントの処理
try:
logger.info("Event: %s", event)
if 'Records' not in event:
raise KeyError("Records key not found in event")
records = event['Records']
if not records or len(records) == 0:
raise ValueError("No Records found in event")
s3_event = records[0].get('s3')
if not s3_event:
raise ValueError("No S3 data found in event")
bucket_name = s3_event['bucket']['name']
encoded_key = s3_event['object'].get('key')
if not encoded_key:
raise ValueError("No object key found in S3 event")
key = unquote(encoded_key)
logger.info("Processed key: %s", key)
- イベントから必要な情報を抽出し、S3のバケット名とオブジェクトキーを取得します。
Glueジョブの起動
folder_name = key.split('/')[0]
file_name = key.split('/')[-1]
if key == 'folder1/file1.csv':
job_name = 'job1'
elif key == 'folder1/file2.csv':
job_name = 'job2'
elif key == 'folder2/file3.csv':
job_name = 'job3'
elif key == 'folder2/file4.csv':
job_name = 'job4'
elif key == 'folder3/file5.csv':
job_name = 'job5'
else:
logger.warning("No matching job for the key: %s", key)
return {'message': 'フォルダ名が間違っています'}
response = glue_client.start_job_run(JobName=job_name)
job_run_id = response['JobRunId']
logger.info("Job started successfully: %s", job_run_id)
- キーに基づいてGlueジョブを選択し、ジョブを開始します。
ジョブのステータスを確認
while True:
job_status = glue_client.get_job_run(JobName=job_name, RunId=job_run_id)
status = job_status['JobRun']['JobRunState']
logger.info("Current job status: %s", status)
if status in ['SUCCEEDED', 'FAILED', 'STOPPED']:
break
time.sleep(20)
- ジョブのステータスをポーリングし、完了するまで待ちます。
SNS通知の送信
sns_topic_arn = sns_topics[folder_name]
if status == 'SUCCEEDED':
message = f"{job_name.split('_')[0]}ジョブが成功しました。"
subject = f"{job_name.split('_')[0]}ジョブ成功"
else:
message = f"{job_name.split('_')[0]}ジョブが失敗しました。ステータス: {status}."
subject = f"{job_name.split('_')[0]}ジョブ失敗"
if sns_topic_arn:
sns_client.publish(
TopicArn=sns_topic_arn,
Subject=subject,
Message=message
)
- ジョブの結果に応じてSNS通知を送信します。
処理完了後のファイル削除
s3_client.delete_object(Bucket=bucket_name, Key=key)
logger.info("Deleted file: %s", key)
return {'message': '処理が完了しました'}
except Exception as e:
logger.error("Error processing event: %s", str(e))
raise e
- 処理が完了したファイルをS3から削除します。
結論
このLambda関数は、S3にファイルがアップロードされるとGlueジョブを起動し、その結果をSNSで通知する仕組みを提供します。これにより、データ処理の自動化と監視が容易になります。
このサンプルコードを参考に、自身のユースケースに合わせて適宜変更し、実装してみてください。