シリーズ目次
AWSでデータ分析基盤構築をサクッと始めてみる(1.データカタログ作成編(行指向))
AWSでデータ分析基盤構築をサクッと始めてみる(2.行指向から列指向に変換編)
AWSでデータ分析基盤構築をサクッと始めてみる(3.データカタログ作成編(列指向))
AWSでデータ分析基盤構築をサクッと始めてみる(4.Athenaでアドホック分析編)
AWSでデータ分析基盤構築をサクッと始めてみる(5.Lambdaで分析自動化編)
5. Lambdaで分析自動化編
本章がこのシリーズの最終章です。
では、Lambdaでクエリー処理を行うプログラムを実装し、実行結果をS3にCSV形式で保管できるか検証していきます。
それでは、Lambdaを使って、クエリープログラムにチャレンジしましょう!
(1)Lambda関数を作成します
関数名:”sample_code”
ランタイム:Python 3.11
(2)コーディング
さて、いよいよPythonでプログラムを実装していきます。
大まかなプログラム流れは以下の通りとします
①テーブル一覧の取得
②SQLの記載
③SQLの実行(①のテーブル数分繰り返す)
④SQLの実行結果を保存
“コード“タブをクリックすると、おなじみのNotebookが出てきますので、こちらにコーディングしていきましょう。
ex.) サンプルコード(AWS SDK for Pythonである「Boto3」を使用します)
import boto3
import json
# データベース名
DBNAME = 'testdb'
# クエリー結果出力先(S3)
S3_OUTPUT = 's3://test_bucket/query/'
def lambda_handler(event, context):
try:
print('Start')
# glue client
glue = boto3.client('glue')
# テーブル名に"output"を含んだテーブルを取得(中間一致)
response = glue.get_tables(DatabaseName=DBNAME,Expression='*output*')
# テーブル一覧
tableList = response['TableList']
# athena client
athena = boto3.client('athena')
# テーブル数分繰り返す
for x in tableList:
tableName = x['Name']
print( 'tableName: '+tableName)
# テーブル毎にクエリー文作成
query = "SELECT * FROM %s.%s LIMIT 10;" % (DBNAME, tableName)
print( 'query: '+query)
# SQL クエリ ステートメントを実行
response = athena.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': DBNAME
},
ResultConfiguration={
'OutputLocation': S3_OUTPUT,
}
)
# クエリー実行idを取得
query_execution_id = response['QueryExecutionId']
print( 'query_execution_id: '+query_execution_id)
# クエリー実行に関する情報を返却
query_status = athena.get_query_execution(QueryExecutionId=query_execution_id)
query_execution_status = query_status['QueryExecution']['Status']['State']
if query_execution_status == 'FAILED':
raise Exception("STATUS:" + query_execution_status)
else:
print("STATUS:" + query_execution_status)
except Exception:
print('Exception Error')
finally:
print('End')
(3)デプロイ
コーディングが完了したら、デプロイします。
“Deploy“ボタンを押下
(4)テスト
デプロイ完了後、実装したプログラムが思い通りに動くか、検証します。
“テスト“タブをクリックし、”テスト”ボタンを押下
(5)結果確認
出力先のS3を開きCSVファイルができているか実行結果を確認します。
ex.) s3://test_bucket/query/
恐らく、csvファイルとmetaファイルができているはずです。
おめでとうございます!
※ハマりポイント:
恐らく、初めは「Access Denied」とかでテストに失敗するはず、、、
「安心してください。履いてます!」ではなく、接続許可してくださいw
対策はLambdaにアタッチされているIAMロールのポリシーを編集するだけ
ポリシーを編集しないでそのままプログラムを実行すると、接続先リソースへの
アクセスエラーが起きるので、接続元リソースにアタッチしているIAMロールの
ポリシーを編集をここで行います。
“設定“タブ→”アクセス権限”→”ロール名”をクリック
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"athena:*"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:DeleteDatabase",
"glue:GetDatabase",
"glue:GetDatabases",
"glue:UpdateDatabase",
"glue:CreateTable",
"glue:DeleteTable",
"glue:BatchDeleteTable",
"glue:UpdateTable",
"glue:GetTable",
"glue:GetTables",
"glue:BatchCreatePartition",
"glue:CreatePartition",
"glue:DeletePartition",
"glue:BatchDeletePartition",
"glue:UpdatePartition",
"glue:GetPartition",
"glue:GetPartitions",
"glue:BatchGetPartition"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:ListMultipartUploadParts",
"s3:AbortMultipartUpload",
"s3:CreateBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::test_bucket",
"arn:aws:s3:::test_bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::test_bucket",
"arn:aws:s3:::test_bucket/*"
]
}
]
}
(6)自動化設定
最後に、Lambdaで作成したプログラムを手動実行ではなく、自動化したい場合はLambdaのコンソール画面からトリガー追加すればできるようになります。
例えば、S3の特定フォルダにparquetファイルがPUTされたらLambda実行、とか。
※こちらは、ご自身の実現したいトリガー内容にお任せします。
Lambdaでクエリープログラムの動作はうまくいきましたか?
本シリーズはこれで終わりです。
どうもお疲れ様でした!!
データ分析基盤構築の入門編として少しはお役に立てたでしょうか?
皆さまの益々のご活躍をお祈り申し上げます。
参考文献
- AWS公式サイト
AWS Lambda - Boto3 ドキュメント
Boto3 documentation