Python
AWS
pandas
DynamoDB
lambda

Lambda : s3のcsvファイルをpandasで変換し自動でDynamoへ格納する

はじめに

簡単な内容ですが、Lambdaでのpandasの起動やcsv読み込み、DynamoDB格納の為のfloat⇒Decimal変換等、一部嵌りポイントがあったので、備忘録も兼ねて記載します。

やりたい事

s3にcsvファイルをアップロードしたら自動的にDynamoDBへ格納する。

s3 ⇒ Lambda ⇒ DynamoDB

DynamoDBの作成

DynamoDBを作成します。
プライマリキーは一旦インデックスにします。
ソートキーの追加で拡張できます。

Dynamo_create.PNG

Lambda関数の作成(設定)

Lambda関数を作成します。
ロールにはs3とDynamoDBへのアクセスが可能な権限をつけておきます。
リージョンはs3と合わせます。

lambda_1.PNG

Lambda関数のトリガーを設定します。

lambda_trriger.PNG

Lambda関数の作成(コード)

Lambda_function.py
import boto3
import json
import io
import decimal
import pandas as pd

from decimal import *
getcontext().prec = 6

dynamodb = boto3.resource('dynamodb')

# convert function to store data to dynamodb.
def convert_float_to_Decimal(dictionary):
    for key in dictionary:
        if isinstance(dictionary[key], float):
            dictionary[key] = Decimal(str(dictionary[key]))
    return dictionary

def insert(index, dictionary):
    dictionary["index"] = int(index)
    import_item = convert_float_to_Decimal(dictionary)
    table = dynamodb.Table('test')
    table.put_item(Item=import_item)

# read csv from s3 and insert data as dictionary to dynamodb.
def lambda_handler(event, context):
    client = boto3.client('s3')
    # retrieve directory from event.
    key = event['Records'][0]['s3']['object']['key']
    obj = client.get_object(Bucket='dataset-pract',Key=key)
    initial_df = pd.read_csv(io.BytesIO(obj['Body'].read()),
    index_col=0)
    initial_df_index = list(initial_df.index)
    # rewrite df as list of dict.
    initial_df = initial_df.to_dict("records")
    for index, dictionary in zip(initial_df_index, initial_df):
        insert(index, dictionary)

float型をDecimal型に変換

DynamoDBにはfloat形式ではデータを入れることができないので、格納したい辞書形式すべてのkeyに対して、valueがfloatであればDecimalに変換しています。
変換の際、floatから直接Decimalに変換するとうまくいかないため、一旦stringを挟みます。
https://github.com/boto/boto3/issues/665

csvの読み込み

get_objectで指定したバケットから読み込みます。その後、Bodyを展開してバイトデータを変換し、read_csvで読み込みます。

Pandasのimportについて

Lambda上でPandasを使うにはパッケージをインストールする必要があります。
この場合、PandasのフォルダとLambda関数をzip化しアップロードすれば良いのですが、Pandasのインポートには依存関係があるので下記サイトからダウンロードしたファイルを使用するのが楽です。

Lambda用Pandas

pandas_package.PNG

このようにダウンロードしたフォルダに作成したLambda関数を入れてzip化します。

zipをアップロード

lambda_zip.PNG

トリガー起動時に実行する関数を指定します。
今回はLambda_handlerにします。

アップロードして保存を行えば完了です。

テスト

使用するデータ

import pandas as pd
data = pd.read_csv("USAccDeaths.csv", index_col=0)
print(data.head())

これをs3の指定したバケットに入れます。

DBを確認します。

Dynamo.PNG

無事入りました。

以上です。