概要
特定の条件下で、display
結果とテーブルへの書き込み結果に差異が生じる事象が確認されたため、暫定的な対応方法を共有します。
今回、monotonically_increasing_id
関数から生成される値を Pandas の Index に相当する列として扱い、2 つに分割したデータフレームを最終的に結合してテーブルへ書き込む処理を実施しました。
しかし、テーブルへの書き込み時に想定通りの結果になりませんでした。書き込みを行うデータフレームと、実際に書き込まれたテーブルの値を照合したところ、データフレーム上では delete_flg
列に値が入っているのに、テーブル上では NULL
になっていました。
検証を進めた結果、以下の条件をすべて満たす場合に想定外の動作が起きる可能性があるようです。推測ですが、monotonically_increasing_id
列の値が、データフレームを display
したときと、テーブルへ書き込むときで異なることが原因と考えられます。
-
monotonically_increasing_id
関数の出力を基に 2 つのデータフレームを結合している - ソースからのフィルタリング条件が前回の処理と同じ
- テーブルへの書き込みを実施している
対応方針
monotonically_increasing_id
が生成する値を固定化させるため、一時的な領域にデータフレームを書き出して再読み込みする方法をとっています。
# monotonically_increasing_id の値を実体化
tgt_df.write.format("delta").mode("overwrite").option("mergeSchema", True).save(volume_dir)
tgt_df = spark.read.format("delta").load(volume_dir)
データフレームに対してキャッシュを利用する方法もあるようですが、Serverless ではキャッシュ機能を利用できないためその方法を選択しませんでした。また、標準クラスター上でキャッシュを試した場合も、私のプログラムでは想定通り動作しなかったため、採用する際には事前の検証が必要だと考えられます。
出所:サーバレスコンピュートの制限 | Databricks Documentation
Databricks Runtime 14.1 以降を利用できる場合には、行追跡(row tracking)機能の_metadata.row_id
列を利用することも選択できます。
出所:Delta テーブルの行追跡を使用する | Databricks Documentation
想定外の動作の再現方法
事前準備
%sql
CREATE CATALOG IF NOT EXISTS meget_test;
%sql
CREATE SCHEMA IF NOT EXISTS manabian_split_test;
エラーの再現方法
テーブルの作成
src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
spark.sql(f"DROP TABLE IF EXISTS {src_table};")
spark.sql(f"""CREATE TABLE {src_table} (
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
);""")
spark.sql(f"DROP TABLE IF EXISTS {tgt_table};")
spark.sql(f"""CREATE TABLE {tgt_table} (
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
);""")
write_to_table
ノートブックの作成と書き込み用コードの定義
src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
from pyspark.sql.functions import expr, monotonically_increasing_id
tgt_df = spark.table(src_table)
# ROW_ID 列を追加
tmp_rowid_col_name = "tmp_rowid_col_name"
tgt_df = tgt_df.withColumn(
tmp_rowid_col_name,
monotonically_increasing_id(),
)
# 異なるデータフレームとして定義を実施して、 orignal_df の方のカラムを DROP
orignal_df = tgt_df
orignal_df = orignal_df.drop(
"delete_flg",
)
# ROW ID 相当の列結合後に DROP
joined_df = orignal_df.alias("orignal_df").join(
tgt_df.alias("dq_checked_df"),
orignal_df[tmp_rowid_col_name] == tgt_df[tmp_rowid_col_name],
"left",
)
joined_df = joined_df.select(
"orignal_df.*",
f"dq_checked_df.delete_flg",
)
joined_df = joined_df.drop(tmp_rowid_col_name)
# テーブルに書き込み
output_tbl_name = joined_df
joined_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tgt_table)
# 書き込み後のデータフレームを display
print("out_df")
joined_df.display()
# 書き込み後のテーブルの確認
df = spark.table(tgt_table)
df.display()
1 回目のデータ準備
import datetime
schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
{
"COL_1": "P0",
"ingest_date": datetime.date(2020, 1, 2),
"delete_flg": 0,
"rescued_data": "",
}
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("overwrite").saveAsTable(src_table)
1 回目の書き込み処理
dbutils.notebook.run("./write_to_table", 0)
display の結果とテーブルの比較 (1 回目)
想定通りの結果となります。
2 回目のデータ準備
import datetime
schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
{
"COL_1": "P1",
"ingest_date": datetime.date(2020, 1, 2),
"delete_flg": 0,
"rescued_data": "",
}
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("append").saveAsTable(src_table)
2 回目の書き込み処理
dbutils.notebook.run("./write_to_table", 0)
display の結果とテーブルの比較 (2 回目)
想定外の結果となりました。
暫定対応方法
事前準備
%sql
CREATE CATALOG IF NOT EXISTS manabian_split_test;
%sql
CREATE SCHEMA IF NOT EXISTS manabian_split_test;
エラーの再現方法
テーブルの再作成
src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
spark.sql(f"DROP TABLE IF EXISTS {src_table};")
spark.sql(f"""CREATE TABLE {src_table} (
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
);""")
spark.sql(f"DROP TABLE IF EXISTS {tgt_table};")
spark.sql(f"""CREATE TABLE {tgt_table} (
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
);""")
Volume の作成
volume_name = "meget_test.manabian_split_test.volume_01"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name};")
dbutils.fs.rm("/Volumes/meget_test/manabian_split_test/volume_01")
write_to_table__treated
ノートブックの作成と書き込み用コードの定義
src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
volume_dir = "/Volumes/meget_test/manabian_split_test/volume_01"
from pyspark.sql.functions import expr, monotonically_increasing_id
tgt_df = (
spark.table(src_table)
# Hint 前回と異なる抽出条件の際には想定通りの動作となる
# .where("COL_1 = 'P1'")
)
# ROW_ID 列を追加
tmp_rowid_col_name = "tmp_rowid_col_name"
tgt_df = tgt_df.withColumn(
tmp_rowid_col_name,
monotonically_increasing_id(),
)
# monotonically_increasing_id の値を実体化
tgt_df.write.format("delta").mode("overwrite").option("mergeSchema", True).save(volume_dir)
tgt_df = spark.read.format("delta").load(volume_dir)
# ROW_ID 列を追加
test_col = "test_col"
tgt_df = tgt_df.withColumn(
test_col,
expr("cast(tmp_rowid_col_name AS STRING)"),
)
# ROW_ID 列を追加
tmp_rowid_col_name = "tmp_rowid_col_name"
tgt_df = tgt_df.withColumn(
tmp_rowid_col_name,
monotonically_increasing_id(),
)
# 異なるデータフレームとして定義を実施して、 orignal_df の方のカラムを DROP
orignal_df = tgt_df
orignal_df = orignal_df.drop(
"delete_flg",
)
# ROW ID 相当の列結合後に DROP
joined_df = orignal_df.alias("orignal_df").join(
tgt_df.alias("dq_checked_df"),
orignal_df[tmp_rowid_col_name] == tgt_df[tmp_rowid_col_name],
"left",
)
joined_df = joined_df.select(
"orignal_df.*",
f"dq_checked_df.delete_flg",
)
joined_df = joined_df.drop(tmp_rowid_col_name)
# テーブルに書き込み
output_tbl_name = joined_df
joined_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tgt_table)
# 書き込み後のデータフレームを display
print("out_df")
joined_df.display()
# 書き込み後のテーブルの確認
df = spark.table(tgt_table)
df.display()
1 回目のデータ準備
import datetime
schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
{
"COL_1": "P0",
"ingest_date": datetime.date(2020, 1, 2),
"delete_flg": 0,
"rescued_data": "",
}
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("overwrite").saveAsTable(src_table)
1 回目の書き込み処理
dbutils.notebook.run("./write_to_table__treated", 0)
display の結果とテーブルの比較 (1 回目)
想定通りの結果になりました。
2 回目のデータ準備
import datetime
schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
{
"COL_1": "P1",
"ingest_date": datetime.date(2020, 1, 2),
"delete_flg": 0,
"rescued_data": "",
}
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("append").saveAsTable(src_table)
2 回目の書き込み処理
dbutils.notebook.run("./write_to_table", 0)
display の結果とテーブルの比較 (2 回目)
想定通りの結果が得られました。