はじめに
AWS Lambdaを使用して、S3に保存されたCSVファイルをDynamoDBに一括アップロードするプロセスを実施しました。備忘録 & 復習で実装手順を共有します。
システム概要
<目的要件>
S3に保存されたCSVファイルをDynamoDBにバッチでアップロードする。
S3にファイルがPUTされたことをトリガーに、自動的にDynamoDBに登録するシステムを構築する。
以下のAWSサービスを使用しました。
・AWS Lambda
・AWS S3
・DynamoDB
実装手順
-
DynamoDBのセットアップ
DynamoDBテーブルを作成
テーブル名: your_table_name
プライマリキー: id (文字列型)
※必要に応じて、ソートキーやセカンダリインデックスを設定 -
Lambda関数の実装
LambdaがS3とDynamoDBにアクセスできるよう、以下のIAMポリシーをアタッチ
・AmazonS3ReadOnlyAccess
・AmazonDynamoDBFullAccess -
トリガーの設定
S3のイベント通知設定をLambdaに追加
Lambda関数画面から"トリガーを追加"⇒"ソースを選択"でS3を選択し、
バケット名・イベントタイプ(PUT)、サフィックスオプション(.csv)を設定
コンソールからテーブルを作成する
AWS Management Consoleにログイン
DynamoDBを検索してサービスにアクセス
左側メニューから「テーブル」を選択し、「テーブルの作成」ボタンをクリック
テーブル名 / パーテンションキー / ソートキーを入力し、テーブルを作成
(他は既存設定にしました。)
Lambda関数の実装
Lambda関数コードを下記のように設定
table_name = "your_table_name"は要書き換え
import boto3
import csv
import json
from botocore.exceptions import ClientError
def lambda_handler(event, context):
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
# DynamoDBテーブル名
table_name = "your_table_name"
table = dynamodb.Table(table_name)
# S3情報を取得
bucket_name = event['Records'][0]['s3']['bucket']['name']
file_key = event['Records'][0]['s3']['object']['key']
try:
# S3からCSVデータを取得
response = s3.get_object(Bucket=bucket_name, Key=file_key)
csv_content = response['Body'].read().decode('utf-8')
# CSVを読み込む
rows = csv.DictReader(csv_content.splitlines())
# データをDynamoDBに一括アップロード
with table.batch_writer() as batch:
for row in rows:
clean_row = {key: value.strip() for key, value in row.items() if value}
batch.put_item(Item=clean_row)
return {
'statusCode': 200,
'body': json.dumps('CSV data successfully uploaded to DynamoDB!')
}
except ClientError as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error: {e.response['Error']['Message']}")
}
トリガー設定
Lambda関数画面から"トリガーを追加"⇒"ソースを選択"でS3を選択し、
バケット名・イベントタイプ(PUT)、サフィックスオプション(.csv)を設定
(こちらも他はデフォです。再帰呼び出しもチェックしない)
結果
S3にテストファイルをアップロード⇒DynamoDBテーブルにうまく反映されていることを確認。
フロントエンドまたはAWSコンソールでデータを確認。
実際の使用例として、DynamoDBから特定のフィールドで検索するクエリを実行して、問題ないことを確認できた。
実装時につまずいたポイント
下記が代表的なエラーであった。初歩でつまづいている部分も多く、情けない。。
① CSVのフォーマットエラー
CSVのヘッダーに空白や特殊文字が含まれる。
<解決策>
ヘッダーをクリーニングするコードを追加する
clean_row = {key.strip(): value.strip() for key, value in row.items() if value}
② S3オブジェクトの読み取りエラー
An error occurred (NoSuchKey) when calling the GetObject operation: The specified key does not exist.
<発生原因>
・指定したS3バケットまたはキーが存在しない。
・ファイルパスが誤っている。
<解決策>
・Lambda関数で使用しているS3バケット名とキーを再確認
・ファイル名やフォルダパスをプログラム内で動的に取得するロジックを実装
③ Lambda実行エラー
AccessDeniedException: User is not authorized to perform: dynamodb:PutItem on resource: arn:aws:dynamodb:<region>:<account-id>:table/<table-name>
<発生原因>
・Lambda関数に関連付けられたIAMロールがDynamoDBへの書き込み権限 (dynamodb:PutItem) を持っていない
・S3からデータを読み取るための権限が不足している
<解決策>
LambdaのIAMロールに以下のポリシーを追加
・AmazonDynamoDBFullAccess
・AmazonS3ReadOnlyAccess
④ DynamoDB書き込みエラー
ValidationException: One or more parameter values were invalid: Missing the key <Primary Key Name> in the item
<発生原因>
・DynamoDBテーブルのスキーマに必要なプライマリキーやソートキーが欠落している。
・CSVデータの構造がDynamoDBテーブルのスキーマと一致していない。
<解決策>
・DynamoDBテーブルのスキーマ(プライマリキー、ソートキー)を確認し、
CSVデータに必要なカラムが含まれているか検証。
・データをDynamoDBに挿入する前に、キーの存在を確認するロジックを追加。
・プライマリーキーのみで登録していたロジックを見直し、プライマリキー×ソートキーの組み合わせでエラーを回避
⑤ CSVのパースエラー
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x... in position ...: invalid start byte
<発生原因>
・S3に保存されたCSVファイルがUTF-8形式ではない。
・CSVファイルに無効な文字列が含まれている。
<解決策>
・csvモジュールを使用する際に、適切なエンコーディング(utf-8-sig など)を指定。
・CSVファイルのエンコーディングを調査し、必要に応じて変換。
今後の改善点
エラーログの充実: Lambda関数で詳細なエラーメッセージをCloudWatchに記録。
データ検証の追加: CSVデータに対するスキーマチェックを追加。
セキュリティ強化: IAMポリシーを最小限の権限に絞る。
まとめ
今回の実装により、S3–DynamoDB間の自動データ転送が効率的に行えるようになりました。
少量であればコマンドラインからポチポチ追加でいいやと思っていましたが、やはりこちらの方が便利ですね。