2
2

More than 1 year has passed since last update.

Step Functions で Athena クエリを(動的に)実行する

Posted at

Step FunctionsからAthenaのクエリを実行したい!

ETL処理において、これまでStep FunctionsからAthenaクエリを実行する場合は、Lambdaを呼び出してboto3を利用してクエリを実行していました。
クエリも動的に変わることが多かったので基本これで問題なかったのですが、処理データの増加にしたがい、クエリが15分以上かかることが発生してしまい、Lambdaのタイムアウトエラーに遭遇する問題が発生しました。

Step Functionから直接Athenaクエリを実行する

上記公式サイトから直接クエリを実行できることがわかりました。

以下がその例です

{
  "Comment": "A Hello World example of the Amazon States Language using Pass states",
  "StartAt": "SQL実行",
  "States": {
    "SQL実行": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "insert into foo select * from hoge",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://example-backet"
        },
        "QueryExecutionContext": { 
          "Database": "db_name" 
       }
      },
      "End": true
    }
  }
}

いくつか付け足すと
arn:aws:states:::athena:startQueryExecution.sync という指定で最後に.syncをつけていると同期実行になります。なくすと非同期になるのでクエリの完了を待たずに次の処理へと移ります。Step Functionsを利用する際は殆どの場合は同期実行なのではないでしょうか。
Databaseを指定していますが、ここはoptionなので指定なしでも、クエリでDBを指定することも可能です。

動的にSQLを変えたい

場合によってSQLを修正したいというのは当然発生すると思われます。その場合はSQLを引数で受け取ることが可能です。
自分の場合はLambdaでSQLを生成してそれを引数で渡すというやり方をとりました。

{
  "Comment": "A Hello World example of the Amazon States Language using Pass states",
  "StartAt": "SQL取得",
  "States": {
    "SQL取得": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "OutputPath": "$.Payload",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "arn:aws:lambda:ap-northeast-1:1234567890:function:getSQL:$LATEST"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Next": "SQL実行"
    },
    "SQL実行": {
      "Type": "Task",
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString.$": "$.sql",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://example-backet"
        },
        "QueryExecutionContext": { 
          "Database": "db_name"
       }
      },
      "End": true
    }
  }
}

呼び出しているLambdaは

def lambda_handler(event, context):
    # 動的にSQLを生成する
    sql = f"""
        insert into foo select * from hoge
    """
    return {
        'sql': sql
    }

ポイントは"QueryString.$": "$.sql",とドルマークをつけて引数を受け取ります。
これでタイムアウトを気にせずクエリを実行できるようになりました。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2