1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SAMでS3にCSVファイルアップロードをトリガーにDynamoDBへの登録処理の作成 Lambdaの関数部分

Posted at

概要

前回の記事のLambda関数の部分です

Lambda

Lambda関数は以下になります

import json
import boto3
import os
import csv
import codecs
import sys
import traceback
import collections
from datetime import datetime

s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')

bucket = os.environ['bucket']
tableName = os.environ['table']

def lambda_handler(event, context):

   key=event['Records'][0]['s3']['object']['key']
   #get() does not store in memory
   try:
       obj = s3.Object(bucket, key).get()['Body']
   except:
       print("S3 Object could not be opened. Check environment variable. ")
       print(traceback.format_exc())
   try:
       table = dynamodb.Table(tableName)
   except:
       print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")
       print(traceback.format_exc())

   batch_size = 100
   batch = []
   date =datetime.now()
   dateStr=date.strftime('%Y%m%d')

   #DictReader is a generator; not stored in memory
   for row in csv.DictReader(codecs.getreader('utf-8')(obj)):
      if len(batch) >= batch_size:
         write_to_dynamo(batch)
         batch.clear()
      batch.append(row)

   if batch:
      write_to_dynamo(batch)

   return {
      'statusCode': 200,
      'body': json.dumps('Uploaded to DynamoDB Table')
   }


def write_to_dynamo(rows):
   try:
      table = dynamodb.Table(tableName)
   except:
      print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")
   try:
      with table.batch_writer() as batch:
         for i in range(len(rows)):
            row=rows[i]
            batch.put_item(
               Item=row
            )
   except:
      print(traceback.format_exc())
      print("Error executing batch_writer")

ソースの説明

  • BOTO3の使い方は公式ドキュメント参照。
  • コードを見ればなんとなくわかるかと思いますが、使用するリソースを指定して使用します。
    • バケット名、テーブル名は環境変数としてLambdaの定義に指定しています
    • key=event['Records'][0]['s3']['object']['key'] の部分でS3にオブジェクトが格納されたときに渡されるキーを取得しています。

補足

  • BOTO3には他のサービスを使用する機能も色々あるので必要に応じて使うとよさそうです。
  • 最初は処理の中で日時をセットしたり、カラムの内容の変換を行っていましたが、Lambda内で下手に処理をやると処理時間が伸びてしまったので事前に加工できるなら加工してしまった方がよさそうです。
  • 前回の記事ではDynamoDBのキャパシティーモードをプロビジョニングにしたところ大量にデータを登録するときに失敗してしまったので大量にインサートを行うときはオンデマンドの方がよさそうです。ただしお金がかかるのでお試しでやる場合はしっかりと考えた方が・・・
  • DynamoDBにグローバルセカンダリーインデックスがあると余分にキャパシティを消費するのでそのあたりも実際に使う前に考えた方がよさそうです。
  • 状況の確認はCloudWatchLogs で確認できるので処理に時間がかかる時はそちらで確認できます。
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?