短い結論
- LambdaからSnowflakeにLambdaの生存期間(最大15分)を超える実行時間のかかるクエリを発行するときは、Snowflake Connector for Pythonで用意されている
SnowflakeConnection.execute_asyncメソッドを利用して、非同期にクエリを発行する - バックグラウンドで動いているクエリが、接続がなくなっても動き続けられるよう
ABORT_DETACHED_QUERYパラメーターをFalseに設定する - 一定時間経ったらクエリがタイムアウトして終了するように、
STATEMENT_TIMEOUT_IN_SECONDSパラメーターにタイムアウトする秒数を設定する
シンプルな実装
Pythonを使ってSnowflakeに問い合わせを行うには、通常Snowflake Connector for Pythonを使うと思います。
このライブラリには、クエリを発行することのできるSnowflakeConnection.executeメソッドが用意されています。
このメソッドは同期的にクエリを発行します。
つまりクエリが終了するまで、次の行は実行されません。
import snowflake.connector
def lambda_handler(event, context):
with snowflake.connector.connect(
...
) as conn:
with conn.cursor() as cur:
cur.execute("ここに時間のかかるSQL")
# クエリが終了するまで次の行には進まない
table = cur.fetch_arrow_all()
...
シーケンス図で書くと次のようになります。
シンプルな実装の問題点
この実装の問題は、Lambdaが最大15分しか処理を継続できないという仕様があることです。
つまり、executeメソッドが15分以上結果を返さないと、大元のlambda_handler自体がAWSにより強制終了されてしまうということです。
非同期でクエリを発行する実装
この問題に対応するためには、SnowflakeCursor.execute_asyncメソッドを使って、クエリを非同期で実行します。
その際に、実行されたクエリに割り振られるクエリIDを取得しておきます。
そして、このクエリIDを使ってクエリの状態を確認して、クエリが正常に終了するか、タイムアウトで強制終了するまでループするようにします。
また、Lambdaの処理時間を超えそうな場合には、一旦Lambdaを終了させて同じLambdaを再起動します。
このような実装を行うことで、Lambdaの最大実行時間を超えてクエリの結果を待つことができます。
import snowflake.connector
def lambda_handler(event, context):
with snowflake.connector.connect(
...
) as conn:
with conn.cursor() as cur:
qid = cur.execute_async("ここに時間のかかるSQL")
# クエリが終了の終了待たずに次の処理に移る
return {"qid": qid}
import time
import snowflake.connector
def lambda_handler(event, context):
qid = event["body"].get("qid")
with snowflake.connector.connect(
...
session_parameters={
'ABORT_DETACHED_QUERY': False,
'STATEMENT_TIMEOUT_IN_SECONDS': 60 * 30, # (30分)
}
) as conn:
while True:
status = conn.get_query_status(qid)
if conn.is_an_error(status):
# タイムアウトなどでクエリが強制終了した
raise Exception("クエリが強制終了しました")
if context.get_remaining_time_in_millis() < 5000:
# Lambdaの残りの実行時間が5秒を切ったら、Lambdaを再実行する
return {"is_restart": True}
if conn.is_still_running(status):
# クエリが正常終了したらループを抜ける
break
# 少し待ってから、もう一度クエリの状態を確認する
time.sleep(1)
with conn.cursor() as cur:
table = cur.fetch_arrow_all()
...
return {"is_restart": False}
注:非同期クエリを発行するLambdaが終了したら、クエリの終了まで待つLambdaを実行する処理や、クエリの終了まで待つLambdaが返すレスポンスの中身(ここでは"is_restart")を見て、同じLambdaを再実行する仕組みについては、ここでは書いていません。そのようなオーケストレーションの制御は、別途Step Functionsなどを使って実装してください。
シーケンス図で書くと次のようになります。
ABORT_DETACHED_QUERY
ABORT_DETACHED_QUERYは、セッションの突然の終了(ネットワークの停止、ブラウザの終了、サービスの中断など)により接続が失われた場合に、進行中のクエリに対してSnowflakeが実行するアクションを指定します。TRUEの場合、接続が失われると、進行中のクエリは5分後に中止されます。
非同期クエリを発行するLambdaで作成した接続は、execute_asyncを実行するとすぐに不要となり、通常はすぐに閉じられます。そのため、もしABORT_DETACHED_QUERYがTRUEの場合、5分でクエリが強制終了してしまいます。そのため、接続時にFALSEを指定しています。
STATEMENT_TIMEOUT_IN_SECONDS
STATEMENT_TIMEOUT_IN_SECONDSは実行中の SQL ステートメント(クエリ、 DDL 、 DML など)がSnowflakeによってキャンセルされるまでの時間です(秒単位)。
ここでは、60秒 * 30分 = 1800秒を指定しています。