概要
Databricks にて 空白の値、あるいは、スペースの値となっているカラムを NULL に置換する Python関数を共有します。
from pyspark.sql.functions import expr
def replace_blank_or_space_to_null(
tgt_df,
tgt_cols,
value_after_replacement="NULL",
):
for tgt_col in tgt_cols:
case_state = f"""
CASE WHEN TRIM({tgt_col}) != ''
THEN {tgt_col}
ELSE {value_after_replacement}
END
"""
tgt_df = tgt_df.withColumn(tgt_col, expr(case_state))
return tgt_df
Spark 3.3 以降であれば、下記の関数を利用してください。
from pyspark.sql.functions import expr
def replace_blank_or_space_to_null(
tgt_df,
tgt_cols,
value_after_replacement="NULL",
):
with_conds = {}
for tgt_col in tgt_cols:
case_state = f"""
CASE WHEN TRIM({tgt_col}) != ''
THEN {tgt_col}
ELSE {value_after_replacement}
END
"""
with_conds[tgt_col] = expr(case_state)
tgt_df = tgt_df.withColumns(with_conds)
return tgt_df
動作確認
1. 関数を定義
from pyspark.sql.functions import expr
def replace_blank_or_space_to_null(
tgt_df,
tgt_cols,
value_after_replacement="NULL",
):
for tgt_col in tgt_cols:
case_state = f"""
CASE WHEN TRIM({tgt_col}) != ''
THEN {tgt_col}
ELSE {value_after_replacement}
END
"""
tgt_df = tgt_df.withColumn(tgt_col, expr(case_state))
return tgt_df
Spark 3.3 以降の場合
from pyspark.sql.functions import expr
def replace_blank_or_space_to_null(
tgt_df,
tgt_cols,
value_after_replacement="NULL",
):
with_conds = {}
for tgt_col in tgt_cols:
case_state = f"""
CASE WHEN TRIM({tgt_col}) != ''
THEN {tgt_col}
ELSE {value_after_replacement}
END
"""
with_conds[tgt_col] = expr(case_state)
tgt_df = tgt_df.withColumns(with_conds)
return tgt_df
2. データフレームを作成
data = [
{
"col_001": "1",
"col_002": "",
"col_003": " ",
"col_004": " ",
}
]
df = spark.createDataFrame(data)
df.display()
3. 関数の利用
空白、あるいは、スペースとなっているcol_002
、col_003
、及び、col_004
がNULL
となっていること確認
tgt_col_info = {
"col_001",
"col_002",
"col_003",
"col_004",
}
result_df = replace_blank_or_space_to_null(df, tgt_col_info)
result_df.display()