Snowpark経由で、UDF,UDTF,SPなどを定義、及び実行する方法について簡単に整理してみました。それぞれにおいて公式ドキュメントに詳細などの記載があるのでそちらを参照ください。
UDF、UDTF、SPそれぞれにおいて下記を整理してみました。
- snowflake上でのSQLによる定義方法
- SQLでの実行方法
- snowpark(python)上からの定義方法、実行方法
▪️ユーザー定義関数 (UDF)
SQLでの実行方法
Snowflakeに登録されたUDF(スカラ関数)は、通常の関数と同様にSQL文中で呼び出せます。例えば、整数を1減らすUDF minus_one が定義されていれば、SELECT文で minus_one(列) のように利用できます。具体例として、テーブルの列 col1 に対し minus_one 関数を適用するには以下のように書きます
SELECT minus_one(col1)
FROM my_table;
単一の値に対しては SELECT minus_one(1); のように呼び出すこともできます 。複数引数を取るUDFも、関数呼び出しの形式で同様に使用可能です。
SQLでの作成方法 (Python UDF)
Snowflake上でPython UDFを作成するには、CREATE FUNCTION 文を使用し、言語にPYTHONを指定します。関数名・引数・戻り型を宣言し、ハンドラとなるPython内の関数名とコード本体を指定します。例えば、引数の整数に1を加えるUDF addone を作成するSQL文は次のようになります
CREATE OR REPLACE FUNCTION addone(i INT)
RETURNS INT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.9'
HANDLER = 'addone_py'
AS $$
def addone_py(i):
return i + 1
$$;
上記では、Snowflake上の関数名はaddone、実際に実行されるPython関数名はaddone_pyとしており、HANDLER句で関連付けています。RUNTIME_VERSIONでは使用するPythonのバージョンを指定できます(例: 3.9)。必要に応じてPACKAGES句で依存パッケージ(Anaconda経由で提供されるパッケージ群)を指定できます。作成したUDFはSQL上でSELECT addone(5);
のように呼び出せます。
HANDLER句は、UDFを定義するpythonのエントリーポイントの定義
をしているようなイメージですね。
Snowpark Pythonでの定義と実行
Snowpark Pythonを使うと、Pythonコード内からUDFを定義・登録し、そのままデータフレーム処理で利用できます。
UDFの作成には、下記の2つのパターンがある。
- 匿名UDFの作成
- 名前付きのUDFの作成
名前付きには、永続化の設定をすることも可能である(その場合はis_permanent=True
、ステージパスの指定などが必要になる)
呼び出し方法として、「匿名UDF側はUDFを変数化してその変数を呼ぶ方法」と「名前付きUDFに対してはその関数名をcall_udf
によって呼び出す方法」などがある。
種類 | 登録方法 | 呼び出し方法 |
---|---|---|
匿名UDF | udf(lambda x: ...) |
df.select(変数(col(...))) |
名前付きUDF | udf(..., name="関数名") |
call_udf("関数名", col(...)) |
SnowparkでのUDF定義と登録
Snowflakeが提供するデコレータや登録関数を用いて、Pythonのローカル関数をSnowflake上のUDFとして登録できます。簡単な方法は@udfデコレータ、またはsnowflake.snowpark.functions.udf関数を使うことです。例えば、あるセッション中に整数に1を足す匿名UDFを登録するには次のように記述できます
なお、匿名UDFとはSnowflakeで一時的に定義してすぐに使う、名前のないユーザー定義関数
のことです。
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import udf
add_one = udf(lambda x: x + 1, return_type=IntegerType(), input_types=[IntegerType()])
名前付きUDFと永続化
複数のクエリやセッションから参照したい場合、UDFに名前を付けて登録できます。session.udf.registerやfunctions.udfにname="関数名"引数を指定すると、指定名のUDFがデータベースに作成されます。またis_permanent=Trueを指定すると、セッション終了後も残る永続的なUDFとしてデプロイされます(この場合、UDFコードと依存関係を格納するステージをstage_location引数で指定する必要があります)。例えば、一時的なUDFを名前付きで登録する例と、永続的UDFをデコレータで登録する例は次の通りです。
# セッション内で使える一時UDFを登録(名前付き)
add_one = udf(lambda x: x+1, return_type=IntegerType(), input_types=[IntegerType()],
name="my_udf", replace=True)
# 永続的なUDFをデコレータで登録(ステージパス指定が必要)
@udf(name="minus_one", is_permanent=True, stage_location="@my_stage", replace=True)
def minus_one(x: int) -> int:
return x - 1
デコレータを使用してUDFを登録する場合は、stage_locaion
を指定することができる。
Snowparkデータフレーム上でのUDF実行
上記で登録したUDFは、SnowparkのDataFrame操作で利用可能です。上記で登録したadd_oneやminus_oneは、DataFrameのカラムに対して以下のように適用できます
df = session.create_dataframe([[1, 2], [3, 4]], schema=["a", "b"])
result_df = df.select(add_one(df["a"]), minus_one(df["b"]))
result = result_df.collect()
# 結果: [Row(MY_UDF("A")=2, MINUS_ONE("B")=1), Row(MY_UDF("A")=4, MINUS_ONE("B")=3)]:contentReference[oaicite:16]{index=16}
Snowparkから既存UDF
を呼び出す
Snowflakeに既に定義済みのUDFをSnowparkから使うことも可能です。方法の一つは、snowflake.snowpark.functions.call_udfを使って名前で呼び出す方法です。例えば、Snowflake上に登録済みのminus_oneをデータフレームの列に適用するには以下のようにします。
from snowflake.snowpark.functions import call_udf, col
df = session.create_dataframe([[1], [3]], schema=["col1"])
df.select(call_udf("minus_one", col("col1"))).collect()
# 結果: [Row(MINUS_ONE("COL1")=0), Row(MINUS_ONE("COL1")=2)]:contentReference[oaicite:20]{index=20}:contentReference[oaicite:21]{index=21}
※追記
なお、UDFにはベクトル化UDFというものを定義できる。
これはPandas DataFrameのインプット行を引数にして、Pandasリスト若しくはPandas Seriesを返却するUDFである。下記のように定義することができ、snowflake側のサーバ上で定義できるものである。
CREATE FUNCTION add_inputs(x INT, y FLOAT)
RETURNS FLOAT LANGUAGE PYTHON RUNTIME_VERSION = 3.9 PACKAGES=('pandas')
HANDLER='add_inputs'
AS $$
import pandas
from _snowflake import vectorized
@vectorized(input=pandas.DataFrame)
def add_inputs(df):
return df[0] + df[1]
$$;
▪️ユーザー定義テーブル関数 (UDTF)
SQLでの実行方法
UDTFテーブル関数は、クエリのFROM句でテーブルのように扱って呼び出します。Snowflakeでは、UDTFを呼ぶ際にTABLE(関数名(引数...))構文を使うことができます。例えば、UDTF my_udtf が単一整数引数を取り複数行を返す場合、以下のようにSELECT文を記述します:
SELECT *
FROM TABLE(my_udtf(42));
この例では、my_udtf(42)が返す表(複数行の結果)をFROM句で参照しています。既存のテーブルと組み合わせて使う場合は、FROM <テーブル>, TABLE(udtf(...))やJOIN TABLE(udtf(...))といった形でリレーションを結合します。例えば、入力テーブルの各行に対しUDTFを適用する場合、SnowflakeではLATERAL結合の形式で次のように書けます
SELECT t.id, u.val
FROM source_table AS t
JOIN TABLE(my_udtf(t.id)) AS u
ON true;
(SnowflakeではJOIN TABLE(udtf(...))と書くことで暗黙的にLATERAL結合になります。)
また、UDTFはオプションでOVER (PARTITION BY ...)句を付与することで、入力をパーティション単位で処理できます。
SQLでの作成方法 (Python UDTF)
Pythonで実装したUDTFをSnowflake上に作成するには、CREATE FUNCTION ... RETURNS TABLE構文を用います。UDFと異なり戻り値が複数行・複数列となるため、RETURNS TABLE(<列名> <データ型>, ... )の形式でスキーマを指定します。また、ハンドラとしてPythonクラスを指定し、その中のprocessメソッドなどが実行されるように定義します。 例えば、株式の売買明細からシンボルごとの売上合計を計算するUDTF stock_sale_sumを定義するSQLは以下のようになります
CREATE OR REPLACE FUNCTION stock_sale_sum(symbol VARCHAR, quantity NUMBER, price NUMBER(10,2))
RETURNS TABLE (symbol VARCHAR, total NUMBER(10,2))
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'StockSaleSum'
AS $$
class StockSaleSum:
def __init__(self):
self._cost_total = 0
self._symbol = ""
def process(self, symbol, quantity, price):
self._symbol = symbol
cost = quantity * price
self._cost_total += cost
yield (symbol, cost)
def end_partition(self):
yield (self._symbol, self._cost_total)
$$;
上記では、PythonクラスStockSaleSumのprocessメソッドが入力行ごとに呼ばれ、yieldで行を返すことでUDTFの出力行を生成しています。end_partitionメソッドはパーティション終了時に呼ばれ、各シンボルの合計行を追加で出力しています(※OVER (PARTITION BY symbol)で呼び出された場合のみ有効)。HANDLER = 'StockSaleSum'は、このクラスがハンドラであることを指定しています。
process
メソッドは、必須メソッドでinit
,end_partition
メソッドはオプショナルのメソッドになります。
ベクトル化UDTFについ
なお、UDTFにはベクトル化UDTFというものを定義できる。
これはPandas DataFrameのインプット行を引数にして、PandasDataFrame,Pandasリスト若しくはPandas Seriesを返却するUDTFである。下記のように定義することができ、snowflake側のサーバ上で定義できるものである。
from _snowflake import vectorized
import pandas
class handler:
def __init__(self):
# initialize a state
@vectorized(input=pandas.DataFrame)
def end_partition(self, df):
# process the DataFrame
return result_df
Snowpark Pythonでの定義と実行
Snowpark Pythonでは、PythonクラスをそのままUDTFハンドラとして登録し、データフレームやセッションから利用できます
SnowparkでのUDTF定義と登録
UDTFはPythonクラスとして実装し、Snowparkのudtf(...)関数またはデコレータを使って登録します。クラスには少なくともprocessメソッド(各入力行に対する処理)が必要で、必要に応じてend_partitionやinitialize(init)を定義することができます(Optional)。例えば、単一整数を入力に取り0からn-1までの数値を返すUDTFをSnowparkで登録・実行するコードは以下の通りです
process()メソッドとend_partition()メソッドについては下記の通り。
項目 | 説明 |
---|---|
process() |
入力行ごとに自動呼び出しされ、yield で1行ずつ返す。必須。 |
end_partition() |
パーティション処理の終了時に1回呼ばれる。任意。 |
from snowflake.snowpark.types import IntegerType, StructType, StructField
from snowflake.snowpark.functions import udtf, lit
class GeneratorUDTF:
def process(self, n: int):
for i in range(n):
yield (i,)
# UDTFを登録(一時的なセッションUDFとして)
generator_udtf = udtf(
GeneratorUDTF,
output_schema=StructType([StructField("number", IntegerType())]),
input_types=[IntegerType()]
)
# UDTFを呼び出して結果を収集
session.table_function(generator_udtf(lit(3))).collect()
# 結果: [Row(NUMBER=0), Row(NUMBER=1), Row(NUMBER=2)]:contentReference[oaicite:37]{index=37}
まずGeneratorUDTFクラスを定義し、udtf()関数でそれをSnowflakeに登録しています。output_schemaには返り値のスキーマ(列名と型のリストかStructType)を指定し、input_typesには入力引数の型を指定します(Pythonの型ヒントをつけていれば省略可)。この例ではセッションローカルなUDTFとして登録しており、generator_udtfオブジェクト`を通じてSnowpark上で呼び出せます。session.table_function(generator_udtf(lit(3)))により、Snowflake側でUDTFが実行され、結果がDataFrame経由で取得されています
名前付きUDTFと永続化
UDFと同様、UDTFもname引数を指定して名前付きで登録できます。またis_permanent=Trueとstage_locationを指定すれば、Snowflake上に永続的オブジェクトとして作成されます。例えば、3つの引数を取り計算を行うUDTFをデコレータで永続登録する場合、次のように記述できます
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, FloatType
from snowflake.snowpark.functions import udtf
schema = StructType([
StructField("symbol", StringType()),
StructField("cost", IntegerType()),
])
@udtf(output_schema=schema, input_types=[StringType(), IntegerType(), FloatType()],
stage_location="@my_stage", is_permanent=True, name="test_udtf", replace=True)
class StockSale:
def process(self, symbol: str, quantity: int, price: float):
cost = quantity * price
yield (symbol, cost)
このコードでは、StockSaleクラス
が@udtfデコレータによって永続UDTF test_udtfとして登録されます。
以降は他のセッションからもSQLで TABLE(test_udtf(...))
として上記の永続UDTFを呼び出すことができます。
ここでsnopark上から定義したUDTFを呼び出す方法を見ていこう
上記で、snowparkのフレームワーク経由でUDTFを定義できましたが、これをsnowparkから呼び出すサンプルを見てみましょう。
from snowflake.snowpark.functions import table_function, col
1 セッション経由での呼び出し
# UDTF名から table_function ラッパーを作成
test_udtf_fn = table_function("test_udtf")
# 引数(symbol, quantity, price)を指定して呼び出す
result_df = session.table_function(
test_udtf_fn(
lit("AAPL"),
lit(10),
lit(155.0)
)
)
# 結果を確認
result_df.show()
2 DataFrame から呼び出してみる
# UDTF名から table_function ラッパーを作成
test_udtf_fn = table_function("test_udtf")
# 入力となるデータフレーム(複数レコード)
df = session.create_dataframe([
("AAPL", 10, 155.0),
("GOOG", 5, 200.0)
], schema=["symbol", "quantity", "price"])
# join_table_function + over(partition_by=...) で UDTF 呼び出し(行ごとに UDTF 適用)
result_df = df.join_table_function(
test_udtf_fn(df["symbol"], df["quantity"], df["price"])
)
共通で必要になるのは、table_function("UDTF")
になります。これは「snowflake上で定義したUDTF」をPython オブジェクトとしてラップする役割を担っています。
Snowparkから既存UDTFを呼び出す
既にSnowflake上に存在するUDTFをSnowparkから使うには、Session.table_function()メソッドやDataFrame.join_table_function()メソッドを利用します。Snowparkでは、snowflake.snowpark.functions.table_function("名前")を使ってUDTFの呼び出しオブジェクトを取得できます。例えばSnowflake上にmy_udtfが登録済みであれば:
from snowflake.snowpark.functions import table_function
my_udtf_fn = table_function("my_udtf")
# 別のテーブルと結合せず単独で結果を得る場合:
session.table_function(my_udtf_fn(lit(42))).collect()
# 他のDataFrameと横展開(LATERAL JOIN)する場合:
df.join_table_function(my_udtf_fn(df["id"]).over(partition_by="category"))
前者ではsession.table_functionで直接UDTFの結果を取得し、後者ではDataFrame.join_table_functionにより入力DFに対してレコードを展開しています。
ここでjoin_table_function(...)
を見ていきたいと思います。
Snowpark では df.join_table_function(...) がこの LATERAL JOIN と同じ動きをします。
(LATERAL JOIN は動的なテーブル関数を「行ごと」に実行するということをしています。)
df.join_table_function(udtf_func(df["value"]))
これにより、DataFrame の各行ごとにUDTFが呼ばれ、その戻り結果を横に結合する形になります。
元テーブル
id | val |
---|---|
1 | A |
2 | B |
↓ join_table_function(udtf(val))
展開後
id | val | result |
---|---|---|
1 | A | A |
1 | A | A |
2 | B | B |
2 | B | B |
※Snowpark 1.5以降、session.table_function(udtf_name, arg1, arg2, ...)のように名前と引数を直接指定して呼び出すことも可能です
ストアドプロシージャ (Stored Procedure, SP)
SQLでの実行方法
ストアドプロシージャは、CALL 文によって呼び出します。たとえば、引数を1つ取り値を返すプロシージャyour_proc_nameを実行するには次のように書きます:
CALL your_proc_name(1);
CALL文は手続き型の処理を実行し、必要に応じて戻り値を返します。戻り値がある場合はSELECT CALL proc(...);のようにして取得することもできますが、通常SnowflakeではCALLは単独で実行し、戻り値はプロシージャの結果として表示されます(スカラー値の場合は結果グリッドに1行1列で表示)。 Snowflakeのストアドプロシージャは副作用のある処理(データベースの更新や条件分岐を含むロジック)をカプセル化するのに使われ、UDF/UDTFとは異なり内部でSnowflakeへのクエリ実行が可能です。
※実行権限に関しては、デフォルトではEXECUTE AS OWNER(プロシージャ所有者の権限で実行)であり、必要に応じてEXECUTE AS CALLERとすることもできます
SQLでの作成方法 (Pythonストアドプロシージャ)
Pythonでハンドラが書かれたストアドプロシージャをSnowflake上で作成するには、CREATE PROCEDURE文を使用し、言語をPYTHONに設定します。手続き名・引数・戻り型を宣言し、HANDLER句で呼び出すPython関数名を指定して、コード本体を$$ ... $$内に記述します。 以下は、Snowparkの並列処理機能を使って0~9の平方根計算を並列実行し、その結果リストを文字列で返すプロシージャを作成するSQL例です
CREATE OR REPLACE PROCEDURE joblib_multiprocessing_proc(i INT)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'joblib_multiprocessing'
PACKAGES = ('snowflake-snowpark-python', 'joblib')
AS $$
import joblib
from math import sqrt
def joblib_multiprocessing(session, i):
result = joblib.Parallel(n_jobs=-1)(
joblib.delayed(sqrt)(i ** 2) for i in range(10)
)
return str(result)
$$;
ポイントとして、第一引数には必ずsessionオブジェクトを受け取るように関数を定義します(Snowflakeが内部でこの引数に現在のセッションを渡します)。上記HANDLER = 'joblib_multiprocessing'は、コード内の関数joblib_multiprocessingをエントリーポイントとする指定です。
PACKAGESにはプロシージャ内で使用するサードパーティパッケージ(この例ではSnowparkとjoblib)をリストしています。 ストアドプロシージャはこのようにして作成後、
CALL joblib_multiprocessing_proc(123);
のようにSQLで呼び出せます。複雑な処理や、トランザクション制御・一時テーブル操作なども、このPythonストアドプロシージャ内でsession.sql("...")を用いて行えます。
Snowpark Pythonでの定義と実行
Snowpark Pythonライブラリから、直接Python関数をSnowflakeのストアドプロシージャとして登録・実行することも可能です。
SnowparkでのSP定義と登録
UDF/UDTFと同様、@sprocデコレータやsession.sproc.registerメソッドを用いてPython関数を登録できます。ストアドプロシージャの場合、関数の第一引数にsession: snowflake.snowpark.Sessionを必ず取る点が異なります。また、packages=["snowflake-snowpark-python"]
のパッケージの指定は必須になるので注意する。
例えば、単一整数を入力しそれに1を足して返すストアドプロシージャをSnowparkで登録する場合、以下のように記述できます
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.types import IntegerType
# ラムダ関数を使って一時的なプロシージャを登録
add_one_sp = sproc(lambda session, x: session.sql(f"SELECT {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()],
packages=["snowflake-snowpark-python"])
# 名前付きで登録(replace=Trueで上書き可能に)
add_one_sp = sproc(lambda session, x: session.sql(f"SELECT {x} + 1").collect()[0][0],
return_type=IntegerType(), input_types=[IntegerType()],
name="my_sproc", replace=True, packages=["snowflake-snowpark-python"])
2つ目の例では、Snowflake上にmy_sprocという名前の一時ストアドプロシージャが作成されます。
永続的に保存する場合
永続的に保存したい場合はis_permanent=True
とstage_locationを指定し、コードをステージにアップロードできるようにします。また、packages引数でSnowflake上で必要となるパッケージ(Snowpark APIを利用するなら"snowflake-snowpark-python"など)を列挙します。
is_permanent is True
として永続的に保存する場合は、stage location
は必須になるので指定する。この際に一時ステージや外部ステージなどいずれのステージで問題ない。
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark.functions import sproc
add_one_sp = sproc(
lambda session, x: session.sql(f"SELECT {x} + 1").collect()[0][0],
return_type=IntegerType(),
input_types=[IntegerType()],
name="my_permanent_sproc",
is_permanent=True,
stage_location="@my_stage",
replace=True,
packages=["snowflake-snowpark-python"]
)
Snowflake側でプロシージャを作成する際に、指定したライブラリ群が自動で利用可能となります。 デコレータを使えば、関数定義と同時に登録も可能です。例えば、引数の整数から1を引くプロシージャを永続オブジェクトとして登録するには次のように書けます
@sproc(name="minus_one", is_permanent=True, stage_location="@my_stage",
replace=True, packages=["snowflake-snowpark-python"])
def minus_one(session: snowflake.snowpark.Session, x: int) -> int:
return session.sql(f"select {x} - 1").collect()[0][0]
Snowpark上でこのように登録すると、SnowflakeにPythonストアドプロシージャがデプロイされます(内部ではCREATE PROCEDURE文が実行されています)。以降、その名前で通常のSQL CALL文やSnowparkからの呼び出しが可能です。
Snowparkから既存SPを呼び出す
Snowpark Pythonセッションから、Snowflake上のストアドプロシージャを直接呼び出すこともできます。Session.call(proc_name, *args)メソッドを使用すると、指定したプロシージャを実行し、その戻り値をPythonオブジェクトとして取得できます。例えば、上で登録したmy_sprocをSnowparkから呼ぶには:
result = session.call("my_sproc", 5)
print(result) # 6 (5+1 の結果)
このときSnowflake上では実際にCALL my_sproc(5)が実行されており、その戻り値がresultに格納されます
docs.snowflake.com。戻り値が無い(RETURN VOID相当)のプロシージャでは、session.callはNoneを返します。
以上、UDF・UDTF・ストアドプロシージャそれぞれについて、SQL上での実行方法、SQLによる定義方法、およびSnowpark Pythonを用いた定義から実行までの流れを説明しました