概要
この記事ではAWS認定DASに出題されるリソースを使った構成をシリーズ化して紹介します。
前回までの内容は以下をご覧ください!
【AWS DAS】S3上のデータをQuicksightで可視化するまでやってみた Part1
今回の実施内容
前回Part1の内容を行いました。今回はLambda関連の設定を行います。
- Part1
- JSONのダミーデータの作成
- S3バケットの作成
- Part2
- Lambda作成
- S3イベントとLambdaの連動
- Part3
- Glueの設定
- Part4
- Athenaの設定
- Part5
- Quicksightでの可視化
Lambda作成
S3バケット「transbucket-2024」のinputフォルダにアップロードしたjsonファイルを
csvファイルに加工後、outputフォルダに格納するLambdaを作成していきます。
- IAMロールの作成
LambdaからS3にアクセスできるようにIAMロール「Lambda_Role」を作成します。
以下ポリシーを付与します。- AWSLambdaBasicExecutionRole
- AmazonS3FullAccess
- Lambdaの作成
Lambdaのコンソール画面から作成を行います。
以下内容で作成しました。
項目 | 設定値 |
---|---|
関数名 | trans_csv_lambda |
ランタイム | Python3.12 |
実行ロール | 既存のロールを使用する |
既存のロール | Lambda_Role |
- 作成したLambdaにコードを実装します。
コード
import json
import csv
import boto3
import os
from datetime import datetime, timezone, timedelta
s3 = boto3.client('s3')
fieldnames = ['gender' , 'age' , 'favorite_animal']
def lambda_handler(event, context):
json_data = []
JST = timezone(timedelta(hours=+9), 'JST')
timestamp = datetime.now(JST).strftime('%Y%m%d%H%M%S')
#一時書き込み用ファイル
tmp_csv = '/tmp/test_{ts}.csv'.format(ts=timestamp)
tmp_json = '/tmp/test_{ts}.json'.format(ts=timestamp)
# 最終的な出力ファイル
output_csv = 'output/{ts}.csv'.format(ts=timestamp)
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
key_name = record['s3']['object']['key']
s3_object = s3.get_object(Bucket=bucket_name, Key=key_name)
data = s3_object['Body'].read()
contents = data.decode('utf-8')
try:
#一時書き込み用jsonファイルにjsonの内容を書き込む
with open(tmp_json, 'w') as json_data:
json.dump(contents, json_data)
#辞書型に変更
with open(tmp_json, 'r') as f:
json_dict = json.load(f)
j = json.loads(json_dict)
#CSVに変換後、一時書き込み用CSVファイルに内容を書き込む
with open(tmp_csv, 'w') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames,
doublequote=True,
quoting=csv.QUOTE_NONE)
writer.writeheader()
writer.writerows(j)
#S3の最終的な出力ファイルに書き込む
with open(tmp_csv, 'r') as csv_file_contents:
response = s3.put_object(Bucket=bucket_name, Key=output_csv, Body=csv_file_contents.read())
#一時書き込み用ファイルを削除
os.remove(tmp_csv)
os.remove(tmp_json)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key_name, bucket_name))
raise e
S3イベントとLambdaの連動
- Lamdaの画面から「設定」タブをクリックしトリガーを設定します。
- 以下内容で設定しました。
「transbucket-2024」のinputフォルダにファイルが格納されることを
契機にLambda実行される設定になっています。
項目 | 設定値 |
---|---|
ソースを選択 | S3 |
Bucket | s3/transbucket-2024 |
Event types | PUT |
Prefix | input/ |
動作確認
設定が完了したので実際に動くか検証してみます!
- 「transbucket-2024」のinputフォルダに「test.json」をアップロードします。
test.json
[
{"gender": "男性", "age": 33, "favorite_animal": "パンダ"}
]
- 「transbucket-2024」のoutputフォルダを確認します。
csvファイルが出力されていることが確認できます!!
- ファイルの中身もcsvに変換されています。
まとめ
ここまで読んでくださりありがとうございました!
今回はLambdaの実装があり、何度も検証を行いました。。
Pythonの勉強を行っていましたがまだまだ知らないことが多く
CSVに変換するためのcsvモジュールも今回初めて使用したので苦戦しました。。
参考にさせていただいたサイト