User-defined functions - Python | Databricks on AWS [2021/3/17時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
UDFとして関数を登録する
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
オプションとしてUDFの戻り値の型を設定することができます。デフォルトの戻り値はStringType
となります。
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQLでUDFを呼び出す
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
データフレームでUDFを使う
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
あるいは、アノテーション構文を用いて同じUDFを宣言することができます。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
評価順序とnullチェック
Spark SQL(SQL、データフレーム、データセットAPIを含む)は、サブ表現の評価順序を保証しません。特に、オペレーターや関数の入力は必ずしも左から右、あるいは固定の順序で評価されません。例えば、論理演算のAND
やOR
表現には、左から右の「短絡」のセマンティクスはありません。
そのため、WHERE
やHAVING
句のようなブール演算の評価順序はクエリー最適化やクエリー計画の間で並び替えられる可能性があるためこの順序や副作用に依存するのは危険です。特にUDFがnullチェックで短絡セマンティクスに依存している場合、以下のようにUDFを呼び出す前にnullチェックが行われることは保証されません。
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
このWHERE
句はnullを除外する前にstrlen
UDFが呼び出されることを保証しません。
適切にnullチェックを行うためには、以下のいずれかを行うことをお勧めします。
- UDF自身でnullに対応するようにし、UDF内でnullチェックを行うようにします。
- nullチェックに
IF
やCASE WHEN
表現を使用し、条件分岐でUDFを呼び出します。
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok