概要
Snowflake で,テーブルやデータマートを更新する際,タスク機能を用いると定期実行(自動更新)が可能である.また,更新のSQLやPythonプログラムをタスク化する際,そのプログラムをストアドプロシージャで関数化しておくと便利である.
以下は,ストアドプロシージャ内にSQLプログラムを記載し,その関数をタスクで毎日午前0時に呼び出している.
CREATE OR REPLACE PROCEDURE <PROCUDURE>()
RETURNS VARCHAR NOT NULL
LANGUAGE SQL
AS
$$
BEGIN
CREATE OR REPLACE TABLE <TABLE_NAME> (
id INT,
name STRING
);
RETURN message;
END;
$$
;
create or replace task TASK
warehouse=<WAREHOUSE_NAME>
schedule='using cron 0 0 * * * UTC'
as call <PROCEDURE>;
しかしながら,このためには,ストアドプロシージャの権限とタスクの権限が必要になる.ユーザにこの権限を渡せない場合,システム管理者がこのタスクを作成する必要がある.その回避策として,プログラムを外部ファイル化し,ユーザがそのファイルを編集できるようにする方法を紹介する.ただし,システム動作の間にユーザが介在することになり,システム側とユーザ側の責任を分離できなくなるため,あまりお勧めはしない.
サンプルコード(SQL編)
プログラムをSnowflake 内部ステージにアップロードする
CREATE OR REPLACE TABLE <TABLE_NAME> (
id INT,
name STRING
);
アップロードしたファイルのパスをメモする.
例:'@"<DATABASE_NAME>"."<SCHEMA_NAME>"."<STAGE_NAME>"/<FILENAME>.sql'
ストアドプロシージャとタスクの作成
CREATE OR REPLACE PROCEDURE <PROCUDURE>()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
execute immediate from '@"<DATABASE_NAME>"."<SCHEMA_NAME>"."<STAGE_NAME>"/<FILENAME>.sql';
RETURN 'successfully';
RETURN message;
END;
$$
;
create or replace task TASK
warehouse=<WAREHOUSE_NAME>
schedule='using cron 0 0 * * * UTC'
as call <PROCEDURE>;
これにより,ユーザにストアドプロシージャの権限とタスクの権限を付与することなく,ユーザ側で任意のSQLプログラムを定期実行することができる.
サンプルコード(Python編)
プログラムをSnowflake 内部ステージにアップロードする
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
def main(session: snowpark.Session):
tableName = 'information_schema.packages'
dataframe = session.table(tableName).filter(col("language") == 'python')
dataframe.write.save_as_table(f"{<DATABASE_NAME>}"."{<SCHEMA_NAME>}"."{<TABLE_NAME>}")
return dataframe
アップロードしたファイルのパスをメモする.
例:'@"<DATABASE_NAME>"."<SCHEMA_NAME>"."<STAGE_NAME>"/<FILENAME>.py'
ストアドプロシージャとタスクの作成
CREATE OR REPLACE PROCEDURE <PROCUDURE>()
returns TABLE()
language python
runtime_version = '3.11'
PACKAGES = ('snowflake','snowflake-snowpark-python')
handler = '<FILENAME>.main'
imports = ('@"<DATABASE_NAME>"."<SCHEMA_NAME>"."<STAGE_NAME>"/<FILENAME>.py')
create or replace task TASK
warehouse=<WAREHOUSE_NAME>
schedule='using cron 0 0 * * * UTC'
as call <PROCEDURE>;
これにより,ユーザにストアドプロシージャの権限とタスクの権限を付与することなく,ユーザ側で任意のPythonプログラムを定期実行することができる.