1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AWSでデータ分析基盤構築をサクッと始めてみる(5.Lambdaで分析自動化編)

Last updated at Posted at 2023-08-10

シリーズ目次

AWSでデータ分析基盤構築をサクッと始めてみる(1.データカタログ作成編(行指向))
AWSでデータ分析基盤構築をサクッと始めてみる(2.行指向から列指向に変換編)
AWSでデータ分析基盤構築をサクッと始めてみる(3.データカタログ作成編(列指向))
AWSでデータ分析基盤構築をサクッと始めてみる(4.Athenaでアドホック分析編)
AWSでデータ分析基盤構築をサクッと始めてみる(5.Lambdaで分析自動化編)

5. Lambdaで分析自動化編

本章がこのシリーズの最終章です。
では、Lambdaでクエリー処理を行うプログラムを実装し、実行結果をS3にCSV形式で保管できるか検証していきます。
それでは、Lambdaを使って、クエリープログラムにチャレンジしましょう!

(1)Lambda関数を作成します
  関数名:”sample_code”
  ランタイム:Python 3.11
lambda1.png

lambda2.png

(2)コーディング
  さて、いよいよPythonでプログラムを実装していきます。
  大まかなプログラム流れは以下の通りとします
   ①テーブル一覧の取得
   ②SQLの記載
   ③SQLの実行(①のテーブル数分繰り返す)
   ④SQLの実行結果を保存
  “コード“タブをクリックすると、おなじみのNotebookが出てきますので、こちらにコーディングしていきましょう。
ex.) サンプルコード(AWS SDK for Pythonである「Boto3」を使用します)

import boto3
import json

# データベース名
DBNAME = 'testdb'

# クエリー結果出力先(S3)
S3_OUTPUT = 's3://test_bucket/query/'

def lambda_handler(event, context):

    try:
        print('Start')

        # glue client
        glue = boto3.client('glue')

        # テーブル名に"output"を含んだテーブルを取得(中間一致)
        response = glue.get_tables(DatabaseName=DBNAME,Expression='*output*')

        # テーブル一覧
        tableList = response['TableList']

        # athena client
        athena = boto3.client('athena')

        # テーブル数分繰り返す
        for x in tableList:
            tableName = x['Name']
            print( 'tableName: '+tableName)

            # テーブル毎にクエリー文作成
            query = "SELECT * FROM %s.%s LIMIT 10;" % (DBNAME, tableName)         
            print( 'query: '+query)

            # SQL クエリ ステートメントを実行
            response = athena.start_query_execution(
                QueryString=query,
                QueryExecutionContext={
                    'Database': DBNAME
                },
                ResultConfiguration={
                    'OutputLocation': S3_OUTPUT,
                }
            )

            # クエリー実行idを取得
            query_execution_id = response['QueryExecutionId']
            print( 'query_execution_id: '+query_execution_id)

            # クエリー実行に関する情報を返却
            query_status = athena.get_query_execution(QueryExecutionId=query_execution_id)
            query_execution_status = query_status['QueryExecution']['Status']['State']

            if query_execution_status == 'FAILED':
                raise Exception("STATUS:" + query_execution_status)
            else:
                print("STATUS:" + query_execution_status)

    except Exception:
        print('Exception Error')

    finally:
        print('End')

(3)デプロイ
  コーディングが完了したら、デプロイします。
  “Deploy“ボタンを押下

(4)テスト
  デプロイ完了後、実装したプログラムが思い通りに動くか、検証します。
  “テスト“タブをクリックし、”テスト”ボタンを押下

(5)結果確認
  出力先のS3を開きCSVファイルができているか実行結果を確認します。
   ex.) s3://test_bucket/query/
  恐らく、csvファイルとmetaファイルができているはずです。
  おめでとうございます!

  ※ハマりポイント:
   恐らく、初めは「Access Denied」とかでテストに失敗するはず、、、
   「安心してください。履いてます!」ではなく、接続許可してくださいw
   対策はLambdaにアタッチされているIAMロールのポリシーを編集するだけ
   ポリシーを編集しないでそのままプログラムを実行すると、接続先リソースへの
   アクセスエラーが起きるので、接続元リソースにアタッチしているIAMロールの
   ポリシーを編集をここで行います。
   “設定“タブ→”アクセス権限”→”ロール名”をクリック 

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": [
				"athena:*"
			],
			"Resource": [
				"*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"glue:CreateDatabase",
				"glue:DeleteDatabase",
				"glue:GetDatabase",
				"glue:GetDatabases",
				"glue:UpdateDatabase",
				"glue:CreateTable",
				"glue:DeleteTable",
				"glue:BatchDeleteTable",
				"glue:UpdateTable",
				"glue:GetTable",
				"glue:GetTables",
				"glue:BatchCreatePartition",
				"glue:CreatePartition",
				"glue:DeletePartition",
				"glue:BatchDeletePartition",
				"glue:UpdatePartition",
				"glue:GetPartition",
				"glue:GetPartitions",
				"glue:BatchGetPartition"
			],
			"Resource": [
				"*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"s3:GetBucketLocation",
				"s3:GetObject",
				"s3:ListBucket",
				"s3:ListBucketMultipartUploads",
				"s3:ListMultipartUploadParts",
				"s3:AbortMultipartUpload",
				"s3:CreateBucket",
				"s3:PutObject"
			],
			"Resource": [
				"arn:aws:s3:::test_bucket",
				"arn:aws:s3:::test_bucket/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"s3:ListBucket",
				"s3:GetObject"
			],
			"Resource": [
				"arn:aws:s3:::test_bucket",
				"arn:aws:s3:::test_bucket/*"
			]
		}
	]
}

 
(6)自動化設定
 最後に、Lambdaで作成したプログラムを手動実行ではなく、自動化したい場合はLambdaのコンソール画面からトリガー追加すればできるようになります。
例えば、S3の特定フォルダにparquetファイルがPUTされたらLambda実行、とか。
※こちらは、ご自身の実現したいトリガー内容にお任せします。

Lambdaでクエリープログラムの動作はうまくいきましたか?

本シリーズはこれで終わりです。
どうもお疲れ様でした!!
データ分析基盤構築の入門編として少しはお役に立てたでしょうか?
皆さまの益々のご活躍をお祈り申し上げます。

参考文献

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?