概要
前回の記事のLambda関数の部分です
Lambda
Lambda関数は以下になります
import json
import boto3
import os
import csv
import codecs
import sys
import traceback
import collections
from datetime import datetime
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
bucket = os.environ['bucket']
tableName = os.environ['table']
def lambda_handler(event, context):
key=event['Records'][0]['s3']['object']['key']
#get() does not store in memory
try:
obj = s3.Object(bucket, key).get()['Body']
except:
print("S3 Object could not be opened. Check environment variable. ")
print(traceback.format_exc())
try:
table = dynamodb.Table(tableName)
except:
print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")
print(traceback.format_exc())
batch_size = 100
batch = []
date =datetime.now()
dateStr=date.strftime('%Y%m%d')
#DictReader is a generator; not stored in memory
for row in csv.DictReader(codecs.getreader('utf-8')(obj)):
if len(batch) >= batch_size:
write_to_dynamo(batch)
batch.clear()
batch.append(row)
if batch:
write_to_dynamo(batch)
return {
'statusCode': 200,
'body': json.dumps('Uploaded to DynamoDB Table')
}
def write_to_dynamo(rows):
try:
table = dynamodb.Table(tableName)
except:
print("Error loading DynamoDB table. Check if table was created correctly and environment variable.")
try:
with table.batch_writer() as batch:
for i in range(len(rows)):
row=rows[i]
batch.put_item(
Item=row
)
except:
print(traceback.format_exc())
print("Error executing batch_writer")
ソースの説明
- BOTO3の使い方は公式ドキュメント参照。
- コードを見ればなんとなくわかるかと思いますが、使用するリソースを指定して使用します。
- バケット名、テーブル名は環境変数としてLambdaの定義に指定しています
-
key=event['Records'][0]['s3']['object']['key']
の部分でS3にオブジェクトが格納されたときに渡されるキーを取得しています。
補足
- BOTO3には他のサービスを使用する機能も色々あるので必要に応じて使うとよさそうです。
- 最初は処理の中で日時をセットしたり、カラムの内容の変換を行っていましたが、Lambda内で下手に処理をやると処理時間が伸びてしまったので事前に加工できるなら加工してしまった方がよさそうです。
- 前回の記事ではDynamoDBのキャパシティーモードをプロビジョニングにしたところ大量にデータを登録するときに失敗してしまったので大量にインサートを行うときはオンデマンドの方がよさそうです。ただしお金がかかるのでお試しでやる場合はしっかりと考えた方が・・・
- DynamoDBにグローバルセカンダリーインデックスがあると余分にキャパシティを消費するのでそのあたりも実際に使う前に考えた方がよさそうです。
- 状況の確認はCloudWatchLogs で確認できるので処理に時間がかかる時はそちらで確認できます。