LoginSignup
1
0

More than 1 year has passed since last update.

非同期の Amazon Redshift Data API を Amazon EventBridge で上手に取り扱う

Posted at

Amazon Redshift Data API は SQL を簡単に実行することができる非同期APIです。
非同期なので、SQL が完了するまで待たずにレスポンスを返却します。

APIリクエストが完了したかどうか繰り返しポーリングするような仕組みが必要かな?と思いましたが、必要なさそうでした。
WithEvent パラメータに True を指定すれば Amazon EventBridge にイベントを送信することができます。

Amazon EventBridge でルールを設定しておけば、完了イベントを拾って後続処理に繋げることもできそうです。
Amazon EventBridge が対応している他のサービスとも連携しやすいですね。

非同期 API なので
AWS Lambda 上で実行する場合には、実行時間を極小化することができ、コスト最適化にもつながりそうです。
長時間実行 SQL や COPYコマンドでの大量データロードなど、長時間がかかる処理の場合にはより効果的です。

試してみること

AWS Lambda から Amazon Redshift Data API を使って SQL を実行し、Amazon EventBridge にイベントが送信されるかを確認してみます。
イベントが送信されたか確認しやすいように Amazon SNS を連携させておきます。

redshift data api qiita.png

AWS Lambdaのコード実装

SQLを実行には、Boto3のexecute_statement()を使用します。
Amazon EventBridge へのイベント送信には、WithEventパラメータを True に設定します。
今回は Amazon EventBridge へのイベント送信の確認が目的なので、SQLはできるだけシンプルにしています。

※IAM や AWS Secrets Manager の設定は割愛します。
※あくまでサンプルコードです。

import json
import boto3

def lambda_handler(event, context):
    
    client = boto3.client('redshift-data') 
    # Redshift接続情報
    CLUSTER_NAME='sample-cluster'
    DATABASE_NAME='sample-db'
    SECRET='arn:aws:secretsmanager:region:accountid:secret:sample-secret'
    STATEMENT_NAME='sample-sql'
    # 実行するSQLを設定
    SQL = '''
        SELECT 1;
    '''
    result = client.execute_statement(
        ClusterIdentifier=CLUSTER_NAME,
        Database=DATABASE_NAME,
        SecretArn=SECRET,
        StatementName=STATEMENT_NAME,
        Sql=SQL,
        WithEvent=True
    )
    print(json.dumps(result, indent=4, default=str))

Amazon SNSの設定

SNSトピックを作成しておきます。
サブスクリプション作成で自身の Email アドレスを設定します。

※Subscription Confirmation メールが届くので本文内のURLはクリックして confirm しておきまし。
image (46).png

Amazon EventBridge の設定

Amazon EventBridgeでイベントが拾えるようにルールの設定を行います。
イベントパターンの設定でイベントのフィルタ条件を指定します。
image (47).png

{
  "detail-type": ["Redshift Data Statement Status Change"],
  "source": ["aws.redshift-data"],
  "account": ["<aws account id>"],
  "region": ["<region>"],
  "resources": ["arn:aws:redshift:<region>:<aws account id>:cluster:<cluster name>"],
  "detail": {
    "state": ["FINISHED"],
    "statementName": ["sample-sql"]
  }
}

成功した場合には、stateFINISHED、失敗した場合にはFAILEDが返却されます。
state をここでフィルタ条件に加えずに、後続処理で state を判断するロジックを実装するのもありですね。

ターゲットには先ほど作成したSNS トピックを設定しておきます。
image (48).png

実行してみた

AWS Lambda をテスト実行してみます。
テスト実行したあとの Execution Results の内容を示しておきます。

Test Event Name
test

Response
null

Function Logs
START RequestId: <truncated> Version: $LATEST
{
"ClusterIdentifier": "sample-cluster",
"CreatedAt": "2022-11-26 05:22:07.207000+00:00",
"Database": "sample-db",
"Id": "<truncated>",
"SecretArn": "arn:aws:secretsmanager:region:accountid:secret:sample-secret",
"ResponseMetadata": {
"RequestId": "<truncated>",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"x-amzn-requestid": "<truncated>",
"content-type": "application/x-amz-json-1.1",
"content-length": "223",
"date": "Sat, 26 Nov 2022 05:22:07 GMT"
},
"RetryAttempts": 0
}
}
END RequestId: <truncated>
REPORT RequestId: <truncated>    Duration: 1432.17 ms    Billed Duration: 1433 ms    Memory Size: 128 MB    Max Memory Used: 64 MB    Init Duration: 230.11 ms

Request ID
<truncated>

Amazon EventBridgeのモニタリング

Amazon EventBridge のモニタリングタブにもイベントが発生したことが記録されています。
image (50).png

SNSからの受信メール

{
  "version": "0",
  "id": "<truncated>",
  "detail-type": "Redshift Data Statement Status Change",
  "source": "aws.redshift-data",
  "account": "aws accound id",
  "time": "2022-11-26T05:22:08Z",
  "region": "region",
  "resources": [
    "arn:aws:redshift:region:aws-account-id:cluster:sample-cluster"
  ],
  "detail": {
    "principal": "arn:aws:sts::aws-account-id:assumed-role/Redshift-data-api-role-xxxxxxx/redshift-data-api-provisioned",
    "statementName": "sample-sql",
    "statementId": "<truncated>",
    "redshiftQueryId": -1,
    "state": "FINISHED",
    "rows": 1,
    "expireAt": 1669699327
  }
}

※見やすくするためにjqで整形しています

補足: SQLの実行結果を取得する

Boto3では、get_statement_result() を利用します。
ここで指定する Id は、 AWS Lambda で実行した execute_statement() のレスポンス内にある Id を指定します。
もしくはSNSで受信した内容の statementId を指定します。

response = client.get_statement_result(
    Id='string',
    NextToken='string'
)

get_statement_result()execute_statement() を実行した同じIAMロール/ユーザで実行する必要があります。

以下のDocに記載があります。

デフォルトでは、ExecuteStatementまたは BatchExecuteStatement API オペレーションの実行者と同じ IAM ロールまたは IAM ユーザーを持つユーザーは、CancelStatementDescribeStatementGetStatementResultおよび ListStatements API オペレーションで同じステートメントを操作できます。

まとめ

  • Amazon EventBridge と連携することで完了確認のための一手間から解放される
  • Amazon EventBridge が色々な AWS サービスと連携できるので、パイプラインを作りやすい
  • AWS Lambda のコスト最適化ができる

参考資料

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0