1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

AWS Lambdaを使用してGlueジョブをトリガーし、SNS通知を送信する方法

Posted at

はじめに

この記事では、AWS Lambdaを使用して、特定のS3イベントに応じてGlueジョブをトリガーし、その結果をSNSで通知する方法を紹介します。コード例には、すべてダミーの情報が含まれています。

はじめに

AWS Lambdaは、イベントドリブンなサーバーレスコンピューティングサービスです。これにより、インフラストラクチャの管理を行わずにコードを実行できます。AWS Glueは、ETL(Extract, Transform, Load)ジョブを実行するためのマネージドサービスであり、AWS SNSは通知サービスです。

以下のコードでは、S3にファイルがアップロードされると、LambdaがトリガーされてGlueジョブを開始し、結果をSNSで通知します。

コードの詳細

必要なライブラリのインポート

import boto3
from urllib.parse import unquote
import logging
import time
  • boto3: AWS SDK for Python。AWSサービスと対話するために使用します。
  • unquote: URLエンコードされた文字列をデコードするために使用します。
  • logging: ログの設定。
  • time: ジョブのステータスをポーリングするために使用します。

ロガーの設定

logger = logging.getLogger()
logger.setLevel(logging.INFO)
  • ログのレベルをINFOに設定します。

Lambdaハンドラー関数

def lambda_handler(event, context):
    glue_client = boto3.client('glue')
    s3_client = boto3.client('s3')
    sns_client = boto3.client('sns')
  • glue_client, s3_client, sns_clientを作成して、それぞれのAWSサービスと対話します。

SNSトピックのマッピング

sns_topics = {
    'folder1': 'arn:aws:sns:ap-northeast-1:123456789012:folder1_topic',
    'folder2': 'arn:aws:sns:ap-northeast-1:123456789012:folder2_topic',
    'folder3': 'arn:aws:sns:ap-northeast-1:123456789012:folder3_topic'
}
  • 各フォルダに対応するSNSトピックのARNをマッピングします。

イベントの処理

try:
    logger.info("Event: %s", event)

    if 'Records' not in event:
        raise KeyError("Records key not found in event")
    
    records = event['Records']
    if not records or len(records) == 0:
        raise ValueError("No Records found in event")
    
    s3_event = records[0].get('s3')
    if not s3_event:
        raise ValueError("No S3 data found in event")
    
    bucket_name = s3_event['bucket']['name']
    encoded_key = s3_event['object'].get('key')
    if not encoded_key:
        raise ValueError("No object key found in S3 event")

    key = unquote(encoded_key)
    logger.info("Processed key: %s", key)
  • イベントから必要な情報を抽出し、S3のバケット名とオブジェクトキーを取得します。

Glueジョブの起動

    folder_name = key.split('/')[0]
    file_name = key.split('/')[-1]

    if key == 'folder1/file1.csv':
        job_name = 'job1'
    elif key == 'folder1/file2.csv':
        job_name = 'job2'
    elif key == 'folder2/file3.csv':
        job_name = 'job3'
    elif key == 'folder2/file4.csv':
        job_name = 'job4'
    elif key == 'folder3/file5.csv':
        job_name = 'job5'
    else:
        logger.warning("No matching job for the key: %s", key)
        return {'message': 'フォルダ名が間違っています'}
    
    response = glue_client.start_job_run(JobName=job_name)
    job_run_id = response['JobRunId']
    logger.info("Job started successfully: %s", job_run_id)
  • キーに基づいてGlueジョブを選択し、ジョブを開始します。

ジョブのステータスを確認

    while True:
        job_status = glue_client.get_job_run(JobName=job_name, RunId=job_run_id)
        status = job_status['JobRun']['JobRunState']
        logger.info("Current job status: %s", status)
        if status in ['SUCCEEDED', 'FAILED', 'STOPPED']:
            break
        time.sleep(20)
  • ジョブのステータスをポーリングし、完了するまで待ちます。

SNS通知の送信

    sns_topic_arn = sns_topics[folder_name]
    if status == 'SUCCEEDED':
        message = f"{job_name.split('_')[0]}ジョブが成功しました。"
        subject = f"{job_name.split('_')[0]}ジョブ成功"
    else:
        message = f"{job_name.split('_')[0]}ジョブが失敗しました。ステータス: {status}."
        subject = f"{job_name.split('_')[0]}ジョブ失敗"

    if sns_topic_arn:
        sns_client.publish(
            TopicArn=sns_topic_arn,
            Subject=subject,
            Message=message
        )
  • ジョブの結果に応じてSNS通知を送信します。

処理完了後のファイル削除

    s3_client.delete_object(Bucket=bucket_name, Key=key)
    logger.info("Deleted file: %s", key)

    return {'message': '処理が完了しました'}

except Exception as e:
    logger.error("Error processing event: %s", str(e))
    raise e
  • 処理が完了したファイルをS3から削除します。

結論

このLambda関数は、S3にファイルがアップロードされるとGlueジョブを起動し、その結果をSNSで通知する仕組みを提供します。これにより、データ処理の自動化と監視が容易になります。
このサンプルコードを参考に、自身のユースケースに合わせて適宜変更し、実装してみてください。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?