はじめに
簡単な内容ですが、Lambdaでのpandasの起動やcsv読み込み、DynamoDB格納の為のfloat⇒Decimal変換等、一部嵌りポイントがあったので、備忘録も兼ねて記載します。
やりたい事
s3にcsvファイルをアップロードしたら自動的にDynamoDBへ格納する。
s3 ⇒ Lambda ⇒ DynamoDB
DynamoDBの作成
DynamoDBを作成します。
プライマリキーは一旦インデックスにします。
ソートキーの追加で拡張できます。
Lambda関数の作成(設定)
Lambda関数を作成します。
ロールにはs3とDynamoDBへのアクセスが可能な権限をつけておきます。
リージョンはs3と合わせます。
Lambda関数のトリガーを設定します。
Lambda関数の作成(コード)
import boto3
import json
import io
import decimal
import pandas as pd
from decimal import *
getcontext().prec = 6
dynamodb = boto3.resource('dynamodb')
# convert function to store data to dynamodb.
def convert_float_to_Decimal(dictionary):
for key in dictionary:
if isinstance(dictionary[key], float):
dictionary[key] = Decimal(str(dictionary[key]))
return dictionary
def insert(index, dictionary):
dictionary["index"] = int(index)
import_item = convert_float_to_Decimal(dictionary)
table = dynamodb.Table('test')
table.put_item(Item=import_item)
# read csv from s3 and insert data as dictionary to dynamodb.
def lambda_handler(event, context):
client = boto3.client('s3')
# retrieve directory from event.
key = event['Records'][0]['s3']['object']['key']
obj = client.get_object(Bucket='dataset-pract',Key=key)
initial_df = pd.read_csv(io.BytesIO(obj['Body'].read()),
index_col=0)
initial_df_index = list(initial_df.index)
# rewrite df as list of dict.
initial_df = initial_df.to_dict("records")
for index, dictionary in zip(initial_df_index, initial_df):
insert(index, dictionary)
float型をDecimal型に変換
DynamoDBにはfloat形式ではデータを入れることができないので、格納したい辞書形式すべてのkeyに対して、valueがfloatであればDecimalに変換しています。
変換の際、floatから直接Decimalに変換するとうまくいかないため、一旦stringを挟みます。
https://github.com/boto/boto3/issues/665
csvの読み込み
get_objectで指定したバケットから読み込みます。その後、Bodyを展開してバイトデータを変換し、read_csvで読み込みます。
Pandasのimportについて
Lambda上でPandasを使うにはパッケージをインストールする必要があります。
この場合、PandasのフォルダとLambda関数をzip化しアップロードすれば良いのですが、Pandasのインポートには依存関係があるので下記サイトからダウンロードしたファイルを使用するのが楽です。
このようにダウンロードしたフォルダに作成したLambda関数を入れてzip化します。
zipをアップロード
トリガー起動時に実行する関数を指定します。
今回はLambda_handlerにします。
アップロードして保存を行えば完了です。
テスト
使用するデータ
import pandas as pd
data = pd.read_csv("USAccDeaths.csv", index_col=0)
print(data.head())
これをs3の指定したバケットに入れます。
DBを確認します。
無事入りました。
以上です。