0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[初心者向け]S3にアップロードされたファイルをlambdaで処理してDynamoDBに登録

Last updated at Posted at 2025-03-27

初めに

Amazon S3バケットにアップロードしたファイルをAmazon DynamoDBテーブルに自動で書きこむ方法とその際に遭遇したエラーを備忘のため記録しておきます.

構成

今回の構成は,非常に単純ですがAWSマネジメントコンソールからS3バケットにファイルをアップロードし,S3のPUTをトリガーにしてLambda関数起動させます.
Lambda関数は,ファイルの内容を読み取りDynamoDBテーブルに書き込む処理を行います.

手順

S3バケットの作成

S3の画面から,バケットを作成を行い任意の名前でバケットを作成する
例) origin-bucket

image.png

DynamoDBテーブルの作成

以下の設定でテーブルを作成します.

  • パーティションキー:id(文字列)
  • ソートキー:なし
  • テーブル設定:デフォルト設定
    image.png

Lambda関数設定

  • ランタイム設定;Python3.12(こちらは,ご自身の使いやすい言語を選んでください)
  • 適用するIAMポリシー
    • AmazonDynamoDBFullAccess
      • DynamoDBへの全権限を付与します
    • AmazonS3ReadOnlyAccess
      • S3からの読み込みだけを許可します
    • 下記の画面でロールを作成してください.
      • ポリシーテンプレートの欄に,dynamoとs3と入力すると上記のポリシーが出てきます.
        image.png
import boto3
import csv
import json

def get_s3_file_content(bucket_name, object_key):
    s3 = boto3.client('s3')
    csv_file = s3.get_object(Bucket=bucket_name, Key=object_key)
    return csv_file['Body'].read().decode('utf-8')

def parse_csv_data(csv_content):
    csv_data = csv.DictReader(csv_content.splitlines())
    return list(csv_data)

def delete_all_items_from_dynamodb(table):
    scan = table.scan()
    with table.batch_writer() as batch:
        for each in scan['Items']:
            batch.delete_item(Key={'id': each['id']}) 

#DynamoDbのbatchwriterは最大25件なので,25件ずつ分割を行う
def write_csv_data_to_dynamodb(table, csv_data):
    with table.batch_writer() as batch:
        for i in range(0, len(csv_data), 25):
            for item in csv_data[i:i+25]:
                batch.put_item(Item=item)


def lambda_handler(event, context):
    print('event:' + json.dumps(event, ensure_ascii=False))

    bucket_name = event['Records'][0]['s3']['bucket']['name']
    object_key = event['Records'][0]['s3']['object']['key']

    csv_content = get_s3_file_content(bucket_name, object_key)
    csv_data = parse_csv_data(csv_content)

    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('作成したdyanamoDBのテーブル名')

    delete_all_items_from_dynamodb(table)
    write_csv_data_to_dynamodb(table, csv_data)

    return {
        'statusCode': 200,
        'body': json.dumps('CSV data has been successfully written to DynamoDB.')
    }

S3バケットのイベント通知の作成(トリガーの作成)

S3バケットにファイルがアップロードされると,Lambda関数が発火するようにトリガーを作成します.
Lambda関数の画面からでも,トリガーの設定はできますがここではS3でイベント通知による設定を行います.

  • プレフィックスはcontentを指定し,s3バケットの中のcontentフォルダにファイルがアップロードされたときに発火するように設定

  • イベントのタイプはPUTだけを選択します
    image.png

  • 送信先はLambda関数を選択し,Lambda関数のARNを入力から,先ほど作成した関数を選択
    lanbda.png

  • うまくいくと以下のようにLambdaの画面が変わります.
    image.png

実行結果

今回利用したデータセットは偽物ですが,名前と電話番号が含まれているため掲載いたしませんが,うまくいきました.

今回遭遇したエラーと対処法

[ERROR] ClientError: An error occurred (ValidationException) when calling the BatchWriteItem operation: The provided key element does not match the schema

DynamoDBで定義したキーがファイルに含まれていない場合に起きる.
ファイルのプライマリーキーとDynamoDBのパーティションキーを確認してみてください.

[ERROR] ClientError: An error occurred (AccessDeniedException) when calling the BatchWriteItem operation:

これはシンプルにアクセス権限がない.
ロールを確認しなおしてみてください.

[ERROR] UnicodeDecodeError: 'utf-8' codec can't decode byte 0x90 in position 35

ファイルのエンコードがutf-8ではなかったときに,起きたエラーです.
ファイルをutf-8でエンコードしなおしてください.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?