0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambdaを利用してRDSのデータをCSVファイルでS3にアップロードしてみた。

0
Posted at

今回はAWSのLambdaを使ってRDSからデータを取得してS3にアップロードする機能を実装してみようと思います。

構成

LambdaでSecrets Managerの認証情報を利用してRDSに接続します。
RDSから取得したデータはCSVファイルとしてS3に保存します。
S3とSecrets Managerにはエンドポイントを利用して接続します。
テストの為なのでsingle-AZで構築します。
Lambda-RDS.drawio.png

前提条件

VPC,RDS,Secrets Managerはすでに準備されているとします。
RDSはMYSQLを使用します。

構築作業

IAMロールを作成

Lambdaに付与するロールを作成します。
許可するポリシー

  • AWSLambdaVPCAccessExecutionRole
  • SecretsManagerReadWrite
  • AmazonS3FullAccess

image.png

Lambda作成(環境)

Python3.14を使用します。
実行ロールには先ほど作成したロールを指定しましょう。
image.png
VPCを有効化してVPCとサブネット、セキュリティーグループを設定したら関数を作成します。
image.png

ライブラリを使えるようにする。

このままの状態ではpymysqlなどが使用できません。
import pymysqlと入力してもエラーになってしまいます。
それを解決するためにライブラリをZIPファイル化してアップロードします。

  1. ローカルにpythonという名前のフォルダを新規作成
    image.png
     

  2. python(フォルダ)にライブラリをインストール
    WSLでubuntuを開いてpythonファイルに移動して下記コマンドを実行
    LambdaのOSに合わせてライブラリを実行

pip3 install \
--platform manylinux2014_x86_64 \
--target=. \
--implementation cp \
--python-version 3.14 \
--only-binary=:all: \
pymysql cryptography

image.png
pymysqlがインストールされました。
 

  1. python(フォルダ)をZIPファイル化
    image.png
     

  2. レイヤー作成
    Lambda>レイヤーからレイヤーを作成
    名称はPyMySQL-Libで作成したpython.zipをファイルを選択からアップロードしましょう。
    image.png
     

  3. レイヤーを追加
    作成した関数の一番下までスクロールしてレイヤーを編集します
    image.png
    レイヤーの追加からカスタムレイヤーから作成したPyMySQL-Libを選択したら追加してください
    image.png
     

  4. 使えるようになったか確認
    import pymysqlと``を追加してから
    deployをしてからtestをクリックします。
    image.png
    Create new test eventをクリックしてEvent Nameを適当にtestなどにしてSaveします
    image.png
    再度testをクリックすると実行できます。
    エラーも出ず、Status: Succeededになっていれば読み込み成功です。
    image.png

コーディング

Secrets Managerから認証情報を取得

get_secretで認証情報を取得します。
もしタイムアウトになった場合は、設定タブから実行時間の制限を伸ばしてみてください。

注意
確認のためだとしても絶対にパスワードはログに出力しないようにしましょう。

import json
import boto3
import pymysql
import cryptography
from botocore.exceptions import ClientError

def get_secret():
    # secret_nameを取得したいシークレットマネージャの名前に変更
    secret_name = "シークレット名" 
    region_name = "ap-northeast-1"

    # Secrets Manager クライアントを作成
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # 権限不足や名前間違いなどのエラー処理
        raise e

    # シークレットの値(JSON文字列)を取得して辞書形式に変換
    secret = get_secret_value_response['SecretString']
    return json.loads(secret)

def lambda_handler(event, context):
    try:
        # 認証情報を取得
        creds = get_secret()
        
        # 取得できているか確認
        print(f"Host: {creds.get('host')}")
        print(f"Username: {creds.get('username')}")
        
        return {
            'statusCode': 200,
            'body': 'Secrets acquired successfully!'
        }
    except Exception as e:
        print(e)
        return {
            'statusCode': 500,
            'body': f'Error: {str(e)}'
        }

テストデータ登録

認証情報を取得できるようになったので疎通確認もかねてテスト用のデータを作成。
下記の関数を作成してlambda_handler()で使用します。

def create_test_data(creds):
    connection = pymysql.connect(
        host=creds['host'],
        user=creds['username'],
        password=creds['password'],
        connect_timeout=10
    )
    try:
        with connection.cursor() as cursor:
            db_name = "test_db"
            cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db_name}")
            cursor.execute(f"USE {db_name}")
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS test_users (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            # INSERT IGNORE で重複登録を防止
            cursor.execute("INSERT IGNORE INTO test_users (id, name) VALUES (1, %s)", ('Taro Yamada',))
            cursor.execute("INSERT IGNORE INTO test_users (id, name) VALUES (2, %s)", ('Hanako Sato',))
        
        connection.commit()
        print("Data insertion successful!")
    finally:
        connection.close()
# 認証情報を取得
def lambda_handler(event, context):
    try:
        # 認証情報を取得
        creds = get_secret()
        create_test_data(creds)#ここに追加

testで実行し手確認してみてください、
エラーが出なければOKですが、CloudWatchでログを確認した方が確実です。

データ取得関数作成

import csv
import io

def export_test_data_to_s3(creds, bucket_name):
    # 1. RDSに接続(DictCursorを使うと列名がキーになって便利)
    connection = pymysql.connect(
        host=creds['host'],
        user=creds['username'],
        password=creds['password'],
        database="test_db",
        cursorclass=pymysql.cursors.DictCursor,
        connect_timeout=10
    )
    
    try:
        with connection.cursor() as cursor:
            # 2. データを全件取得
            cursor.execute("SELECT id, name, created_at FROM test_users")
            rows = cursor.fetchall()
            
            if not rows:
                print("No data to export.")
                return

            # 3. メモリ上にCSVを作成
            output = io.StringIO()
            writer = csv.DictWriter(output, fieldnames=rows[0].keys())
            writer.writeheader()
            writer.writerows(rows)
            
            # 4. S3にアップロード
            s3 = boto3.client('s3')
            file_key = 'exports/test_users.csv'
            
            s3.put_object(
                Bucket=bucket_name,
                Key=file_key,
                Body=output.getvalue(),
                ContentType='text/csv'
            )
            print(f"Successfully uploaded to s3://{bucket_name}/{file_key}")
            
    finally:
        connection.close()

最終的なコード

これで必要な関数がそろいました。
下記が全体のコードです。

import json
import boto3
import pymysql
import cryptography
import csv
import io
from botocore.exceptions import ClientError


def get_secret():
    """AWS Secrets Managerから認証情報を取得し、辞書形式で返却する。

    この関数はSecrets Managerにアクセスし、特定のシークレット名に紐付いた
    JSON形式の認証情報を取得してパースする。

    Returns:
        dict: 取得した認証情報(ホスト名、ユーザー名、パスワード等)を含む辞書。

    Raises:
        ClientError: Secrets Managerへのアクセス権限不足や、シークレット名が
            存在しない場合に発生する。
    """
    # Secrets Managerで設定したシークレットの名前
    secret_name = "シークレット名" # 自分の環境に合わせて変更してください
    region_name = "ap-northeast-1"

    # Secrets Manager クライアントを作成
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        raise e

    # シークレットの値(JSON文字列)を取得して辞書形式に変換
    secret = get_secret_value_response['SecretString']
    return json.loads(secret)

def export_test_data_to_s3(creds, bucket_name):
    """RDSのデータを取得し、CSV形式でS3バケットへアップロードする。

    RDS上の 'test_db.test_users' テーブルから全データを取得し、
    メモリ上でCSVデータを作成してS3の指定パスへ保存する。

    Args:
        creds (dict): DB接続情報(host, username, password等)を含む辞書。
        bucket_name (str): 保存先となるS3バケットの名前。

    Returns:
        None: 処理が成功した場合は戻り値なし。データが空の場合は途中で終了する。

    Raises:
        pymysql.MySQLError: データベース接続やクエリ実行時にエラーが発生した場合。
        boto3.exceptions.Boto3Error: S3へのアップロード(put_object)に失敗した場合。
        IOError: メモリ上でのCSV作成に失敗した場合。
    """
    # RDSに接続
    connection = pymysql.connect(
        host=creds['host'],
        user=creds['username'],
        password=creds['password'],
        database="test_db",
        cursorclass=pymysql.cursors.DictCursor,
        connect_timeout=10
    )
    
    try:
        with connection.cursor() as cursor:
            # データを全件取得
            cursor.execute("SELECT id, name, created_at FROM test_users")
            rows = cursor.fetchall()
            
            if not rows:
                print("No data to export.")
                return

            # メモリ上にCSVを作成
            output = io.StringIO()
            writer = csv.DictWriter(output, fieldnames=rows[0].keys())
            writer.writeheader()
            writer.writerows(rows)
            
            # S3にアップロード
            s3 = boto3.client('s3')
            file_key = 'exports/test_users.csv' # 配置場所を指定
            
            s3.put_object(
                Bucket=bucket_name,
                Key=file_key,
                Body=output.getvalue(),
                ContentType='text/csv'
            )
            print(f"Successfully uploaded to s3://{bucket_name}/{file_key}")
            
    finally:
        connection.close()

def lambda_handler(event, context):
    """Lambda関数のメインエントリポイント。

    AWS Secrets Managerから認証情報を取得し、RDSのデータをCSVとして
    S3バケットへエクスポートする一連の処理を実行する。

    Args:
        event (dict): Lambdaに渡されるイベントデータ(トリガー元の情報)。
        context (LambdaContext): Lambdaの実行環境に関するランタイム情報。

    Returns:
        dict: 処理結果を示すステータスコードとメッセージを含む辞書。

    Note:
        事前にS3バケット名 (MY_BUCKET) が正しく設定されている必要がある。
    """
    try:
        # S3バケット名を指定
        MY_BUCKET = "バケット名" # 自分の環境に合わせて変更してください

        creds = get_secret()
        
        # S3へエクスポート
        export_test_data_to_s3(creds, MY_BUCKET)
        
        return {
            'statusCode': 200,
            'body': 'Successfully exported RDS data to S3!'
        }
    except Exception as e:
        print(e)
        return {
            'statusCode': 500,
            'body': f'Error: {str(e)}'
        }

動作確認

上記のコードを実行して、うまくいけばS3にCSVファイルがアップロードされているはずです。
image.png
image.png

まとめ

今回はLambdaを使ったRDSの操作、S3へのアップロードを行いました。
今回の構成を通じて感じられる Lambdaのメリット は以下の通りです。

  • サーバー管理が不要(サーバーレス)
    データ抽出のためだけに24時間稼働のサーバーを維持する必要がありません。実行時のみリソースが割り当てられるため、運用の手間が大幅に削減されます。
  • 圧倒的なコストパフォーマンス
    Lambdaは実行時間に応じた従量課金です。今回のような1日1回のバッチ処理であれば、無料利用枠に収まることも多く、非常に安価に構築できます。
  • VPC内リソースとのセキュアな連携
    VPCエンドポイントを活用することで、インターネットを経由せずにRDS、Secrets Manager、S3と通信できます。機密性の高いDBデータを扱う上で、このセキュアな構成が容易に組めるのは大きな強みです。

今後はこのLambdaをEventBridgeを使って定期実行できるようにしていきたいと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?