Edited at

Lambda : s3のcsvファイルをpandasで変換し自動でDynamoへ格納する

More than 1 year has passed since last update.


はじめに

簡単な内容ですが、Lambdaでのpandasの起動やcsv読み込み、DynamoDB格納の為のfloat⇒Decimal変換等、一部嵌りポイントがあったので、備忘録も兼ねて記載します。


やりたい事

s3にcsvファイルをアップロードしたら自動的にDynamoDBへ格納する。

s3 ⇒ Lambda ⇒ DynamoDB


DynamoDBの作成

DynamoDBを作成します。

プライマリキーは一旦インデックスにします。

ソートキーの追加で拡張できます。

Dynamo_create.PNG


Lambda関数の作成(設定)

Lambda関数を作成します。

ロールにはs3とDynamoDBへのアクセスが可能な権限をつけておきます。

リージョンはs3と合わせます。

lambda_1.PNG

Lambda関数のトリガーを設定します。

lambda_trriger.PNG


Lambda関数の作成(コード)


Lambda_function.py

import boto3

import json
import io
import decimal
import pandas as pd

from decimal import *
getcontext().prec = 6

dynamodb = boto3.resource('dynamodb')

# convert function to store data to dynamodb.
def convert_float_to_Decimal(dictionary):
for key in dictionary:
if isinstance(dictionary[key], float):
dictionary[key] = Decimal(str(dictionary[key]))
return dictionary

def insert(index, dictionary):
dictionary["index"] = int(index)
import_item = convert_float_to_Decimal(dictionary)
table = dynamodb.Table('test')
table.put_item(Item=import_item)

# read csv from s3 and insert data as dictionary to dynamodb.
def lambda_handler(event, context):
client = boto3.client('s3')
# retrieve directory from event.
key = event['Records'][0]['s3']['object']['key']
obj = client.get_object(Bucket='dataset-pract',Key=key)
initial_df = pd.read_csv(io.BytesIO(obj['Body'].read()),
index_col=0)
initial_df_index = list(initial_df.index)
# rewrite df as list of dict.
initial_df = initial_df.to_dict("records")
for index, dictionary in zip(initial_df_index, initial_df):
insert(index, dictionary)



float型をDecimal型に変換

DynamoDBにはfloat形式ではデータを入れることができないので、格納したい辞書形式すべてのkeyに対して、valueがfloatであればDecimalに変換しています。

変換の際、floatから直接Decimalに変換するとうまくいかないため、一旦stringを挟みます。

https://github.com/boto/boto3/issues/665


csvの読み込み

get_objectで指定したバケットから読み込みます。その後、Bodyを展開してバイトデータを変換し、read_csvで読み込みます。


Pandasのimportについて

Lambda上でPandasを使うにはパッケージをインストールする必要があります。

この場合、PandasのフォルダとLambda関数をzip化しアップロードすれば良いのですが、Pandasのインポートには依存関係があるので下記サイトからダウンロードしたファイルを使用するのが楽です。

Lambda用Pandas

pandas_package.PNG

このようにダウンロードしたフォルダに作成したLambda関数を入れてzip化します。

zipをアップロード

lambda_zip.PNG

トリガー起動時に実行する関数を指定します。

今回はLambda_handlerにします。

アップロードして保存を行えば完了です。


テスト

使用するデータ

import pandas as pd

data = pd.read_csv("USAccDeaths.csv", index_col=0)
print(data.head())

これをs3の指定したバケットに入れます。

DBを確認します。

Dynamo.PNG

無事入りました。

以上です。