#やりたいこと
S3にログデータをアップするとLambdaがDynamoDBに自動登録してくれる。
#やること
主なフェーズとしては以下の流れです。順にまとめていきます。
- S3バケットの作成
- IAMロールの作成
- DynamoDBテーブルの作成
- Lambda関数作成
- テストデータでの動作確認
なお今回は、名前と年齢のスペース区切りデータを処理する関数を書きます。
alice 20
bob 31
超簡略化してますが、取得するデータやテーブルに登録するキーを変更すれば色々と応用可能ですね。
##1. S3バケットの作成
S3のコンソールを開き、「バケットを作成する」から、ログデータをアップロードするためのバケットを作成しました。
データは暗号化を行っておきます。
##2. IAMロールの作成
新しいIAMロールを作成し、以下のポリシーをアタッチしました。
- AmazonDynamoDBFullAccess
- AmazonS3ReadOnlyAccess
- CloudWatchLogsFullAccess
##3. DynamoDBテーブルの作成
DynamoDBのコンソールを開き、テーブルの作成から新しいDynamoDBテーブルを作成しました。
プライマリキーにはname/文字列を指定しました。
##4. Lambda関数作成
準備ができたので、いよいよLambda関数を作成していきます。
Lambdaのコンソールから「関数の作成」をクリックします。
「一から作成」を選び、必要な設定を入力し「関数の作成」をクリックします。
- 名前: 関数の名前を入力
- ランタイム: Python 3.6を選択
- 既存のロール: 2で作成したIAMロールを選択
設定ページが表示されたら、まずトリガを設定します。
左側のペインから「S3」を選択します。
追加が完了したら、追加された「S3」をクリックして、バケット名とトリガとして「PUT」を選択します。
次に関数名をクリックして、コードの入力画面に進みます。
以下のコードを記述して「保存」します。
### This lambda function parses text data for each s3 put event
### and register these to assigned dynamo db.
### Runtime: python 3.6
import boto3
import json
import urllib.parse
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.client('dynamodb')
s3 = boto3.client('s3')
def lambda_handler(event, context):
#target dynamo table name
dynamotable = 'YOUR_DYNAMO_TABLE_NAME'
#get bucket name and file name by s3 put event
bucket = event['Records'][0]['s3']['bucket']['name']
filename = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
#get uploaded text file contents
try:
res = s3.get_object(Bucket=bucket, Key=filename)
except Exception as e:
return logger.error("Failed to get uploaded file: {}".format(e))
# file type check
if (res['ContentType'] == 'text/plain'):
# get body
body = res['Body'].read()
bodystr = body.decode('UTF-8')
lines = bodystr.split('\r\n') # for windows line endings
#get items of each line
for item in lines:
data = item.split(' ')
name = data[0]
age = data[1]
#register data to the dynamo db
try:
dynamodb.put_item(
TableName = dynamotable,
Item = {'name':{'S':name},'age':{'N':age}},
Expected = {'name':{'Exists':False}}
)
except Exception as e:
print(e)
else:
logger.error('Invalid File Type: ' + res['ContentType'])
##5. テストデータでの動作確認
以下のテキストデータをS3バケットにアップロードして、dynamoDBへ自動で登録されるか確認します。
alice 20
bob 31
DynamoDBのテーブルを更新してみます。
追加されました!
アップロードのテストでうまくいかなかったときは、コード内でバケット名やファイル名を直接指定して、
Lambda関数のテスト機能を使ってデバッグするのが楽でした。
あとは、CloudWatchのイベントを見ると、ちゃんとログが吐かれているのがわかります。
画像ファイルなどをアップロードすると、エラーログが出力できます。
#まとめ
テキストデータをパースして、DBに登録できるようになりました。
テキストのフォーマットに合わせて区切り文字や取得するデータの位置などを変えれば、応用が効きます。
複数ファイルアップロードした時に、並列処理できるようなエンハンスもしてみたいと思います。
設定いじって、Pythonのコード書くだけで実現可能というのは、サーバーレスの恩恵を感じますね。
#参考サイト
S3をトリガーとするLambdaの冪等性をDynamoDBで実現してみた
https://dev.classmethod.jp/cloud/aws/lambda-dynamodb/
AWS Lambda(Python 3)でS3にあるファイルを読み込む
https://qiita.com/KoyanagiHitoshi/items/7b306b0f80806dfe2f61
LmabdaからS3のファイルを読み込む
https://www.noobs.tokyo/entry/2018/10/08/201938