Amazon Redshift Data API は SQL を簡単に実行することができる非同期APIです。
非同期なので、SQL が完了するまで待たずにレスポンスを返却します。
APIリクエストが完了したかどうか繰り返しポーリングするような仕組みが必要かな?と思いましたが、必要なさそうでした。
WithEvent
パラメータに True
を指定すれば Amazon EventBridge にイベントを送信することができます。
Amazon EventBridge でルールを設定しておけば、完了イベントを拾って後続処理に繋げることもできそうです。
Amazon EventBridge が対応している他のサービスとも連携しやすいですね。
非同期 API なので
AWS Lambda 上で実行する場合には、実行時間を極小化することができ、コスト最適化にもつながりそうです。
長時間実行 SQL や COPYコマンドでの大量データロードなど、長時間がかかる処理の場合にはより効果的です。
試してみること
AWS Lambda から Amazon Redshift Data API を使って SQL を実行し、Amazon EventBridge にイベントが送信されるかを確認してみます。
イベントが送信されたか確認しやすいように Amazon SNS を連携させておきます。
AWS Lambdaのコード実装
SQLを実行には、Boto3のexecute_statement()
を使用します。
Amazon EventBridge へのイベント送信には、WithEvent
パラメータを True
に設定します。
今回は Amazon EventBridge へのイベント送信の確認が目的なので、SQLはできるだけシンプルにしています。
※IAM や AWS Secrets Manager の設定は割愛します。
※あくまでサンプルコードです。
import json
import boto3
def lambda_handler(event, context):
client = boto3.client('redshift-data')
# Redshift接続情報
CLUSTER_NAME='sample-cluster'
DATABASE_NAME='sample-db'
SECRET='arn:aws:secretsmanager:region:accountid:secret:sample-secret'
STATEMENT_NAME='sample-sql'
# 実行するSQLを設定
SQL = '''
SELECT 1;
'''
result = client.execute_statement(
ClusterIdentifier=CLUSTER_NAME,
Database=DATABASE_NAME,
SecretArn=SECRET,
StatementName=STATEMENT_NAME,
Sql=SQL,
WithEvent=True
)
print(json.dumps(result, indent=4, default=str))
Amazon SNSの設定
SNSトピックを作成しておきます。
サブスクリプション作成で自身の Email アドレスを設定します。
※Subscription Confirmation メールが届くので本文内のURLはクリックして confirm しておきまし。
Amazon EventBridge の設定
Amazon EventBridgeでイベントが拾えるようにルールの設定を行います。
イベントパターンの設定でイベントのフィルタ条件を指定します。
{
"detail-type": ["Redshift Data Statement Status Change"],
"source": ["aws.redshift-data"],
"account": ["<aws account id>"],
"region": ["<region>"],
"resources": ["arn:aws:redshift:<region>:<aws account id>:cluster:<cluster name>"],
"detail": {
"state": ["FINISHED"],
"statementName": ["sample-sql"]
}
}
成功した場合には、state
は FINISHED
、失敗した場合にはFAILED
が返却されます。
state
をここでフィルタ条件に加えずに、後続処理で state
を判断するロジックを実装するのもありですね。
ターゲットには先ほど作成したSNS トピックを設定しておきます。
実行してみた
AWS Lambda をテスト実行してみます。
テスト実行したあとの Execution Results の内容を示しておきます。
Test Event Name
test
Response
null
Function Logs
START RequestId: <truncated> Version: $LATEST
{
"ClusterIdentifier": "sample-cluster",
"CreatedAt": "2022-11-26 05:22:07.207000+00:00",
"Database": "sample-db",
"Id": "<truncated>",
"SecretArn": "arn:aws:secretsmanager:region:accountid:secret:sample-secret",
"ResponseMetadata": {
"RequestId": "<truncated>",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "<truncated>",
"content-type": "application/x-amz-json-1.1",
"content-length": "223",
"date": "Sat, 26 Nov 2022 05:22:07 GMT"
},
"RetryAttempts": 0
}
}
END RequestId: <truncated>
REPORT RequestId: <truncated> Duration: 1432.17 ms Billed Duration: 1433 ms Memory Size: 128 MB Max Memory Used: 64 MB Init Duration: 230.11 ms
Request ID
<truncated>
Amazon EventBridgeのモニタリング
Amazon EventBridge のモニタリングタブにもイベントが発生したことが記録されています。
SNSからの受信メール
{
"version": "0",
"id": "<truncated>",
"detail-type": "Redshift Data Statement Status Change",
"source": "aws.redshift-data",
"account": "aws accound id",
"time": "2022-11-26T05:22:08Z",
"region": "region",
"resources": [
"arn:aws:redshift:region:aws-account-id:cluster:sample-cluster"
],
"detail": {
"principal": "arn:aws:sts::aws-account-id:assumed-role/Redshift-data-api-role-xxxxxxx/redshift-data-api-provisioned",
"statementName": "sample-sql",
"statementId": "<truncated>",
"redshiftQueryId": -1,
"state": "FINISHED",
"rows": 1,
"expireAt": 1669699327
}
}
※見やすくするためにjqで整形しています
補足: SQLの実行結果を取得する
Boto3では、get_statement_result()
を利用します。
ここで指定する Id
は、 AWS Lambda で実行した execute_statement()
のレスポンス内にある Id
を指定します。
もしくはSNSで受信した内容の statementId
を指定します。
response = client.get_statement_result(
Id='string',
NextToken='string'
)
※get_statement_result()
は execute_statement()
を実行した同じIAMロール/ユーザで実行する必要があります。
以下のDocに記載があります。
デフォルトでは、
ExecuteStatement
またはBatchExecuteStatement
API オペレーションの実行者と同じ IAM ロールまたは IAM ユーザーを持つユーザーは、CancelStatement
、DescribeStatement
、GetStatementResult
およびListStatements
API オペレーションで同じステートメントを操作できます。
まとめ
- Amazon EventBridge と連携することで完了確認のための一手間から解放される
- Amazon EventBridge が色々な AWS サービスと連携できるので、パイプラインを作りやすい
- AWS Lambda のコスト最適化ができる