1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LambdaからSnowflakeへ時間のかかるクエリを発行する方法

Last updated at Posted at 2025-10-08

短い結論

  • LambdaからSnowflakeにLambdaの生存期間(最大15分)を超える実行時間のかかるクエリを発行するときは、Snowflake Connector for Pythonで用意されている SnowflakeConnection.execute_async メソッドを利用して、非同期にクエリを発行する
  • バックグラウンドで動いているクエリが、接続がなくなっても動き続けられるよう ABORT_DETACHED_QUERYパラメーターをFalseに設定する
  • 一定時間経ったらクエリがタイムアウトして終了するように、STATEMENT_TIMEOUT_IN_SECONDSパラメーターにタイムアウトする秒数を設定する

シンプルな実装

Pythonを使ってSnowflakeに問い合わせを行うには、通常Snowflake Connector for Pythonを使うと思います。
このライブラリには、クエリを発行することのできるSnowflakeConnection.executeメソッドが用意されています。
このメソッドは同期的にクエリを発行します。
つまりクエリが終了するまで、次の行は実行されません。

Lambda
import snowflake.connector

def lambda_handler(event, context):
  with snowflake.connector.connect(
    ...
  ) as conn:
      with conn.cursor() as cur:
        cur.execute("ここに時間のかかるSQL")
        # クエリが終了するまで次の行には進まない
        table = cur.fetch_arrow_all()
        ...

シーケンス図で書くと次のようになります。

シンプルな実装の問題点

この実装の問題は、Lambdaが最大15分しか処理を継続できないという仕様があることです。
つまり、executeメソッドが15分以上結果を返さないと、大元のlambda_handler自体がAWSにより強制終了されてしまうということです。

非同期でクエリを発行する実装

この問題に対応するためには、SnowflakeCursor.execute_asyncメソッドを使って、クエリを非同期で実行します。
その際に、実行されたクエリに割り振られるクエリIDを取得しておきます。
そして、このクエリIDを使ってクエリの状態を確認して、クエリが正常に終了するか、タイムアウトで強制終了するまでループするようにします。
また、Lambdaの処理時間を超えそうな場合には、一旦Lambdaを終了させて同じLambdaを再起動します。
このような実装を行うことで、Lambdaの最大実行時間を超えてクエリの結果を待つことができます。

非同期クエリを発行するLambda
import snowflake.connector

def lambda_handler(event, context):
  with snowflake.connector.connect(
    ...
  ) as conn:
      with conn.cursor() as cur:
        qid = cur.execute_async("ここに時間のかかるSQL")
        # クエリが終了の終了待たずに次の処理に移る

  return {"qid": qid}
クエリの終了まで待つLambda
import time
import snowflake.connector

def lambda_handler(event, context):
  qid = event["body"].get("qid")
  
  with snowflake.connector.connect(
    ...
    session_parameters={
        'ABORT_DETACHED_QUERY': False,
        'STATEMENT_TIMEOUT_IN_SECONDS': 60 * 30, # (30分)
    }
  ) as conn:
    while True:
      status = conn.get_query_status(qid)
      if conn.is_an_error(status):
        # タイムアウトなどでクエリが強制終了した
        raise Exception("クエリが強制終了しました")
      if context.get_remaining_time_in_millis() < 5000:
        # Lambdaの残りの実行時間が5秒を切ったら、Lambdaを再実行する
        return {"is_restart": True}
      if conn.is_still_running(status):
        # クエリが正常終了したらループを抜ける
        break
      # 少し待ってから、もう一度クエリの状態を確認する
      time.sleep(1)
      
    with conn.cursor() as cur:
      table = cur.fetch_arrow_all()
      ...
  
  return {"is_restart": False}

注:非同期クエリを発行するLambdaが終了したら、クエリの終了まで待つLambdaを実行する処理や、クエリの終了まで待つLambdaが返すレスポンスの中身(ここでは"is_restart")を見て、同じLambdaを再実行する仕組みについては、ここでは書いていません。そのようなオーケストレーションの制御は、別途Step Functionsなどを使って実装してください。

シーケンス図で書くと次のようになります。

ABORT_DETACHED_QUERY

ABORT_DETACHED_QUERYは、セッションの突然の終了(ネットワークの停止、ブラウザの終了、サービスの中断など)により接続が失われた場合に、進行中のクエリに対してSnowflakeが実行するアクションを指定します。TRUEの場合、接続が失われると、進行中のクエリは5分後に中止されます。
非同期クエリを発行するLambdaで作成した接続は、execute_asyncを実行するとすぐに不要となり、通常はすぐに閉じられます。そのため、もしABORT_DETACHED_QUERYがTRUEの場合、5分でクエリが強制終了してしまいます。そのため、接続時にFALSEを指定しています。

STATEMENT_TIMEOUT_IN_SECONDS

STATEMENT_TIMEOUT_IN_SECONDSは実行中の SQL ステートメント(クエリ、 DDL 、 DML など)がSnowflakeによってキャンセルされるまでの時間です(秒単位)。
ここでは、60秒 * 30分 = 1800秒を指定しています。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?