Redshiftのクエリを連続実行したい!
バッチ処理でRedshiftのクエリを直列連続実行したいという要件がありました。
短時間の処理であればLambdaなどからpython等のスクリプトから実行して終わりなんですが、今回はトータルで10時間前後かかる事がわかっており、そのためだけにサーバーを起動するのは効率的でないのでStep Functionsを利用することにしました。
Step Functionsからクエリを実行すると非同期になる問題点
直列処理なのでStep Functionsを採用することにしました。
その際はAmazon Redshift Data APIを利用することとなります。
ExecuteStatementを使って実行するのですが、これは実行IDを返すだけでSQLが完了しているわけではありません。以下のような形式です。
{
"ClusterIdentifier": "mycluster-test",
"CreatedAt": 1598323175.823,
"Database": "dev",
"Id": "c016234e-5c6c-4bc5-bb16-2c5b8ff61814",
"SecretArn": "arn:aws:secretsmanager:us-west-2:123456789012:secret:yanruiz-secret-hKgPWn"
}
ここのIdを使って結果を取りに行くこととなるのですが、いつクエリが完了するかわからない点が問題です。短時間のものであれば数十秒や1分待てばいいか、となります。
ただ今回は数分から数時間かかるものを想定すると、いつ取りに行くかという問題が発生します。
Step FunctionsでSQL実行の成功・失敗を取得するSate Machineの作成
----------------------------------------
Jsonを表示(折りたたみ)
------------------------------------------
{
"Comment": "A description of my state machine",
"StartAt": "Redshift SQL実行",
"States": {
"Redshift SQL実行": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier.$": "$.ClusterIdentifier",
"Database.$": "$.Database",
"DbUser.$": "$.DbUser",
"Sql.$": "$.Sql"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"Next": "クエリの完了を待つ"
},
"クエリの完了を待つ": {
"Type": "Wait",
"Next": "クエリstateの取得",
"Seconds": 30
},
"クエリstateの取得": {
"Type": "Task",
"Parameters": {
"Id.$": "$.Id"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement",
"Next": "Choice",
"Retry": [
{
"ErrorEquals": [
"States.ALL"
],
"BackoffRate": 5,
"IntervalSeconds": 3,
"MaxAttempts": 10
}
]
},
"Choice": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Status",
"StringEquals": "FINISHED",
"Next": "Success"
},
{
"Or": [
{
"Variable": "$.Status",
"StringEquals": "SUBMITTED"
},
{
"Variable": "$.Status",
"StringEquals": "STARTED"
},
{
"Variable": "$.Status",
"StringEquals": "PICKED"
}
],
"Next": "クエリの完了を待つ"
}
],
"Default": "Fail"
},
"Success": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail"
}
}
}
結論としてはこのようなStep Functionsを作成しました。
SQL実行し、SQLの実行がどうなったかをDescribeStatementを使って取りに行き、完了していれば終了とします。
今回は成功か失敗だけを見ていますが、SELECTの結果を取得したいならGetStatementResultを利用すれば結果が得られるはずです。クエリのstateを取りに行くのを30秒に設定していますが、ここの時間は適宜変えてください。
ただ、これでは単一のクエリしか実行できません。
なお実行のState Machineには次の権限が必要です。このポリシーは用意されていないので自分で作成・付与する必要があります。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"redshift-data:DescribeStatement",
"redshift-data:ExecuteStatement",
"redshift-serverless:GetCredentials",
"redshift:GetClusterCredentials"
],
"Resource": "*"
}
]
}
連続実行させる
先のState Machineでは単一のクエリしか実行できないのでこれを繰り返し処理できるようにします。具体的にはMapを使います。
----------------------------------------
Jsonを表示(折りたたみ)
------------------------------------------
{
"Comment": "A description of my state machine",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "SQL取得",
"States": {
"SQL取得": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:123456789123:function:returnSql:$LATEST",
"Payload": {
"hoge.$": "$.hoge"
}
},
"Next": "SQL実行"
},
"SQL実行": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "arn:aws:states:ap-northeast-1:123456789123:stateMachine:ExcecuteRedShiftQuery",
"Input": {
"ClusterIdentifier": "cluster-name",
"Database": "database_name",
"DbUser": "user_name",
"Sql.$": "$.sql"
}
},
"End": true,
"ResultPath": null
}
}
},
"ResultPath": null,
"MaxConcurrency": 1,
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "例外時通知"
}
],
"Next": "クラスター停止させるか",
"InputPath": "$.data"
},
"例外時通知": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:123456789123:function:slack_notification:$LATEST",
"Payload": {
"message": "!!!Stepfunctionsの実行に失敗しました。確認してください。!!!"
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"End": true
},
"クラスター停止させるか": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.stopCluster",
"BooleanEquals": false,
"Next": "成功時通知"
}
],
"Default": "クラスター停止"
},
"クラスター停止": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"Payload.$": "$",
"FunctionName": "arn:aws:lambda:ap-northeast-1:123456789123:function:stopRedshiftCluster:$LATEST"
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"Next": "成功時通知"
},
"成功時通知": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "arn:aws:lambda:ap-northeast-1:123456789123:function:slack_notification:$LATEST",
"Payload": {
"message": "Stepfunctionsの実行に成功しました。"
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2
}
],
"End": true
}
}
}
今回はSQLの生成にはLambdaを使っており、引数を渡して生成できるようにしています(このあたりは参考までに)。
実行時は以下のようなjsonを渡します。今回は単なるSQL実行に加えて以下の昨日も加えています。
stopClusterは終了時にクラスターを停止させるか(あまり要件としてはないかもですが、今回必要だったので作っています)です。
あとは処理の完了時にslack通知のLambdaを呼び出しています。Chatbotを使っても良いかもしれません。
{
"stopCluster": true,
"data": [
{
"hoge": "foo"
},
{
"hoge": "bar"
},
{
"hoge": "huga"
},
]
}
このJsonであれば3回ループが走ってhogeというkeyからSQLをLambdaから生成、そのSQLを実行するという形を取っています。
同期処理を作成するところから戸惑ったので今回記事にしました。改善点等あればご指摘ください。