0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

341日目 Lambdaでコードを書いた社畜M

Last updated at Posted at 2025-03-11

♠はじめに

こんにちは、社畜Mだぜ。
今回は以前作った「AWS Lambda を活用し、S3 に保存された CSV を取得して RDS にデータを格納する」というくみについてコードと共に軽く解説していくぜ。少し長くはなるがそこは許してくれ。

さぁ、行くぜ!!!
デュエル!

♠S3、Lambda、RDSとは?

まずは、それぞれの概念について簡単に説明していきます。

♦S3とは?

AWS の高耐久性オブジェクトストレージサービスで、画像や CSV などのファイルをクラウド上に安全に保存できます。サーバーレスなデータ管理に最適で、拡張性のあるデータストレージを提供します。

♦Lambdaとは?

AWS Lambdaはイベント駆動型のサーバーレス関数実行環境です。インフラの管理不要でコードをアップロードするだけで実行可能。イベント発生時に自動で関数が実行されるため、効率的なデータ処理が可能になります。

♦RDSとは?

Amazon RDSは、MySQLやPostgreSQLなどのリレーショナルデータベースをマネージド環境で簡単に運用できるサービスです。自動バックアップやスケーリング機能を備えており、データの信頼性と可用性を向上させます。

♠今回やりたかったこと

今回構築するのはS3にアップロードされた CSV ファイルをAWS Lambdaが自動的に処理し、データベース(RDS)に格納する仕組みです。
具体的な流れとしては、次のような流れで処理されます。


1.ユーザーがS3にCSVファイルをアップロードする
これをトリガーとして、AWS のイベント通知機能(EventBridge)が動作する。

2.EventBridgeがLambdaを起動する
S3 へのアップロードイベントを受け取ると、自動的に Lambda 関数が実行される。

3.LambdaがS3からCSVファイルを取得する
アップロードされたファイルの内容を取得し、処理を開始する。

4.CSV ファイルをパースし、データを RDS に挿入する
取得した CSV データを解析し、データベースに書き込むことで永続化を実現する。

今回紹介するコードは3と4のあたりになります。

♠コード

import json
import pymysql
import csv
import io
import boto3
import os

# SSMクライアントの初期化
ssm = boto3.client('ssm')

# SSMパラメータストアからデータベース接続情報を取得する関数
def get_ssm_parameter(name, with_decryption=False):
    response = ssm.get_parameter(Name=name, WithDecryption=with_decryption)
    return response['Parameter']['Value']

# SSMからデータベースの接続情報を取得
rds_host = get_ssm_parameter('MYSQL_HOST')
db_name = get_ssm_parameter('MYSQL_DATABASE')
username = get_ssm_parameter('MYSQL_USER')
password = get_ssm_parameter('MYSQL_PASSWORD', True)

# S3クライアントの初期化
s3 = boto3.client('s3')

# Lambdaのエントリーポイント
def lambda_handler(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        # input-csv/内のCSVファイルかどうかを確認
        if key.startswith('input-csv/') and key.endswith('.csv'):
            print(f"Processing file: {key} from bucket: {bucket}")
            
            conn = None

            try:
                # S3からCSVファイルを取得
                csv_file = s3.get_object(Bucket=bucket, Key=key)
                file_content = csv_file['Body'].read().decode('utf-8')

                # CSVファイルの内容を辞書形式で読み込み
                csv_reader = csv.DictReader(io.StringIO(file_content))

                # RDS(MySQL)に接続
                conn = pymysql.connect(host=rds_host, user=username, passwd=password, db=db_name, connect_timeout=60)

                with conn.cursor(pymysql.cursors.DictCursor) as cur:
                    # 各CSVの行をデータベースに挿入
                    for row in csv_reader:
                        email = row['email']
                        nickname = row['nickname']
                        created_at = 'CURRENT_TIMESTAMP'  # レコード作成日時はデータベース側で設定

                        # INSERTクエリを作成
                        query = """
                            INSERT INTO users (email, nickname, created_at)
                            VALUES (%s, %s, %s)
                        """
                        # クエリを実行
                        cur.execute(query, (email, nickname, created_at))
                    
                    # トランザクションをコミット
                    conn.commit()

                # 正常終了した場合のレスポンス
                return {
                    'statusCode': 200,
                    'body': 'CSV data inserted into RDS successfully'
                }

            except pymysql.MySQLError as e:
                # MySQLエラーが発生した場合
                print(f"Error: {e}")
                return {
                    'statusCode': 500,
                    'body': f"Failed to insert data into RDS: {e}"
                }

            except Exception as e:
                # その他の予期せぬエラーの処理
                print(f"Unexpected error: {e}")
                return {
                    'statusCode': 500,
                    'body': f"Unexpected error occurred: {e}"
                }

            finally:
                # 接続が開いていればクローズ
                if conn and conn.open:
                    conn.close()
                    print("Database connection closed")

        else:
            print(f"Skipped file: {key} (not a CSV or not in input-csv/ folder)")

♦ポイント(個人の主観)

  • S3にアップロードしたCSVファイルを自動で処理する
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']

これは、S3にアップロードされたファイルのバケット名 (bucket) とファイル名 (key) を取得します。これがないと、Lambda関数はどのファイルを処理すればよいのか判断できません。

  • S3からCSVを取得して中身をPythonで扱えるようにする
csv_file = s3.get_object(Bucket=bucket, Key=key)
file_content = csv_file['Body'].read().decode('utf-8')
csv_reader = csv.DictReader(io.StringIO(file_content))

これは、アイルの中身を読み込み、Pythonで理解できる形に変換しています。これによりCSVの各行を処理できるようになります。

  • データベースにデータを追加する
query = """
    INSERT INTO users (email, nickname, created_at)
    VALUES (%s, %s, %s)
"""
cur.execute(query, (email, nickname, created_at))
conn.commit()

これは、email、nicknameをデータベースのテーブルに登録しています。この時commit()を行わないとデータは保存されません。

  • エラーハンドリング(問題が起きたときの対応)
except pymysql.MySQLError as e:
    # MySQLエラーが発生した場合
    print(f"Error: {e}")
    return {
        'statusCode': 500,
        'body': f"Failed to insert data into RDS: {e}"
    }

これは、上記でも書いたがエラーが出たときにLambdaの実行結果にエラーメッセージとして返すものです。これがないと動かして不具合があった時に何が原因なのかがわかりません。

♠まとめ

今回はAWSに触れる数少ない機会の一つだったので学ぶことは多くありましたが、これ作ってるときにネットに踊らされたり、作るこちら側の不足する部分が見つけられたのも個人的には大きいことでした。

♠おまけ

一気にキャラを変えていくぜ!
今回の社会人が知っておいた方がいいビジネスマナーはこいつだ!
スマホの扱いには気を付ける」だぜ。
オレたちの生活において欠かせないものの一つスマートフォン…だが、ビジネスマナーとしてスマホの扱い方も含まれているぜ。
特に会議中や面談中のスマホの扱いには気を付ける必要があるぜ!
主に重要なのは三つ
1つ「目的に集中する」
2つ「音は消しておく」
3つ「緊急時以外は使わない」

これは絶対に守る必要があるぜ、当たり前だと思うだろ?
当たり前なことこそ重要な時もあるんだぜ、気を付けるんだな!!

これで今回は終わりだぜ。
ターンエンド

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?