1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks にて monotonically_increasing_id 利用時の結合後の書き込み結果が想定外となる事象への暫定対応方法

Last updated at Posted at 2025-04-10

概要

特定の条件下で、display 結果とテーブルへの書き込み結果に差異が生じる事象が確認されたため、暫定的な対応方法を共有します。

今回、monotonically_increasing_id 関数から生成される値を Pandas の Index に相当する列として扱い、2 つに分割したデータフレームを最終的に結合してテーブルへ書き込む処理を実施しました。

しかし、テーブルへの書き込み時に想定通りの結果になりませんでした。書き込みを行うデータフレームと、実際に書き込まれたテーブルの値を照合したところ、データフレーム上では delete_flg 列に値が入っているのに、テーブル上では NULL になっていました。

image.png

検証を進めた結果、以下の条件をすべて満たす場合に想定外の動作が起きる可能性があるようです。推測ですが、monotonically_increasing_id 列の値が、データフレームを display したときと、テーブルへ書き込むときで異なることが原因と考えられます。

  1. monotonically_increasing_id 関数の出力を基に 2 つのデータフレームを結合している
  2. ソースからのフィルタリング条件が前回の処理と同じ
  3. テーブルへの書き込みを実施している

対応方針

monotonically_increasing_id が生成する値を固定化させるため、一時的な領域にデータフレームを書き出して再読み込みする方法をとっています。

# monotonically_increasing_id の値を実体化
tgt_df.write.format("delta").mode("overwrite").option("mergeSchema", True).save(volume_dir)
tgt_df = spark.read.format("delta").load(volume_dir)

image.png

データフレームに対してキャッシュを利用する方法もあるようですが、Serverless ではキャッシュ機能を利用できないためその方法を選択しませんでした。また、標準クラスター上でキャッシュを試した場合も、私のプログラムでは想定通り動作しなかったため、採用する際には事前の検証が必要だと考えられます。

image.png

出所:サーバレスコンピュートの制限 | Databricks Documentation

Databricks Runtime 14.1 以降を利用できる場合には、行追跡(row tracking)機能の_metadata.row_id列を利用することも選択できます。

image.png

出所:Delta テーブルの行追跡を使用する | Databricks Documentation

想定外の動作の再現方法

事前準備

%sql
CREATE CATALOG IF NOT EXISTS meget_test;
%sql
CREATE SCHEMA IF NOT EXISTS manabian_split_test;

image.png

エラーの再現方法

テーブルの作成

src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
spark.sql(f"DROP TABLE IF EXISTS {src_table};")
spark.sql(f"""CREATE TABLE {src_table} (
    COL_1 string,
    ingest_date date,
    delete_flg integer,
    rescued_data string
);""")

spark.sql(f"DROP TABLE IF EXISTS {tgt_table};")
spark.sql(f"""CREATE TABLE {tgt_table} (
    COL_1 string,
    ingest_date date,
    delete_flg integer,
    rescued_data string
);""")

image.png

write_to_table ノートブックの作成と書き込み用コードの定義

src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"

from pyspark.sql.functions import expr, monotonically_increasing_id

tgt_df = spark.table(src_table)

# ROW_ID 列を追加
tmp_rowid_col_name = "tmp_rowid_col_name"
tgt_df = tgt_df.withColumn(
    tmp_rowid_col_name,
    monotonically_increasing_id(),
)

# 異なるデータフレームとして定義を実施して、 orignal_df の方のカラムを DROP
orignal_df = tgt_df
orignal_df = orignal_df.drop(
    "delete_flg",
)

# ROW ID 相当の列結合後に DROP
joined_df = orignal_df.alias("orignal_df").join(
    tgt_df.alias("dq_checked_df"),
    orignal_df[tmp_rowid_col_name] == tgt_df[tmp_rowid_col_name],
    "left",
)
joined_df = joined_df.select(
    "orignal_df.*",
    f"dq_checked_df.delete_flg",
)
joined_df = joined_df.drop(tmp_rowid_col_name)

# テーブルに書き込み
output_tbl_name = joined_df
joined_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tgt_table)
# 書き込み後のデータフレームを display
print("out_df")
joined_df.display()
# 書き込み後のテーブルの確認
df = spark.table(tgt_table)
df.display()

image.png

1 回目のデータ準備

import datetime

schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
    {
        "COL_1": "P0",
        "ingest_date": datetime.date(2020, 1, 2),
        "delete_flg": 0,
        "rescued_data": "",
    }
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("overwrite").saveAsTable(src_table)

image.png

1 回目の書き込み処理

dbutils.notebook.run("./write_to_table", 0)

image.png

display の結果とテーブルの比較 (1 回目)

想定通りの結果となります。

image.png

2 回目のデータ準備

import datetime

schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
    {
        "COL_1": "P1",
        "ingest_date": datetime.date(2020, 1, 2),
        "delete_flg": 0,
        "rescued_data": "",
    }
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("append").saveAsTable(src_table)

image.png

2 回目の書き込み処理

dbutils.notebook.run("./write_to_table", 0)

image.png

display の結果とテーブルの比較 (2 回目)

想定外の結果となりました。

image.png

暫定対応方法

事前準備

%sql
CREATE CATALOG IF NOT EXISTS manabian_split_test;
%sql
CREATE SCHEMA IF NOT EXISTS manabian_split_test;

image.png

エラーの再現方法

テーブルの再作成

src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
spark.sql(f"DROP TABLE IF EXISTS {src_table};")
spark.sql(f"""CREATE TABLE {src_table} (
    COL_1 string,
    ingest_date date,
    delete_flg integer,
    rescued_data string
);""")

spark.sql(f"DROP TABLE IF EXISTS {tgt_table};")
spark.sql(f"""CREATE TABLE {tgt_table} (
    COL_1 string,
    ingest_date date,
    delete_flg integer,
    rescued_data string
);""")

image.png

Volume の作成

volume_name = "meget_test.manabian_split_test.volume_01"

spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name};")

dbutils.fs.rm("/Volumes/meget_test/manabian_split_test/volume_01")

image.png

write_to_table__treated ノートブックの作成と書き込み用コードの定義

src_table = "meget_test.manabian_split_test.src_table_01"
tgt_table = "meget_test.manabian_split_test.tgt_table_01"
volume_dir = "/Volumes/meget_test/manabian_split_test/volume_01"

from pyspark.sql.functions import expr, monotonically_increasing_id

tgt_df = (
    spark.table(src_table)
    # Hint 前回と異なる抽出条件の際には想定通りの動作となる
    # .where("COL_1 = 'P1'")
)

# ROW_ID 列を追加
tmp_rowid_col_name = "tmp_rowid_col_name"
tgt_df = tgt_df.withColumn(
    tmp_rowid_col_name,
    monotonically_increasing_id(),
)

# monotonically_increasing_id の値を実体化
tgt_df.write.format("delta").mode("overwrite").option("mergeSchema", True).save(volume_dir)
tgt_df = spark.read.format("delta").load(volume_dir)


# ROW_ID 列を追加
test_col = "test_col"
tgt_df = tgt_df.withColumn(
    test_col,
    expr("cast(tmp_rowid_col_name AS STRING)"),
)


# ROW_ID 列を追加
tmp_rowid_col_name = "tmp_rowid_col_name"
tgt_df = tgt_df.withColumn(
    tmp_rowid_col_name,
    monotonically_increasing_id(),
)

# 異なるデータフレームとして定義を実施して、 orignal_df の方のカラムを DROP
orignal_df = tgt_df
orignal_df = orignal_df.drop(
    "delete_flg",
)

# ROW ID 相当の列結合後に DROP
joined_df = orignal_df.alias("orignal_df").join(
    tgt_df.alias("dq_checked_df"),
    orignal_df[tmp_rowid_col_name] == tgt_df[tmp_rowid_col_name],
    "left",
)
joined_df = joined_df.select(
    "orignal_df.*",
    f"dq_checked_df.delete_flg",
)
joined_df = joined_df.drop(tmp_rowid_col_name)

# テーブルに書き込み
output_tbl_name = joined_df
joined_df.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(tgt_table)
# 書き込み後のデータフレームを display
print("out_df")
joined_df.display()
# 書き込み後のテーブルの確認
df = spark.table(tgt_table)
df.display()

image.png

1 回目のデータ準備

import datetime

schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
    {
        "COL_1": "P0",
        "ingest_date": datetime.date(2020, 1, 2),
        "delete_flg": 0,
        "rescued_data": "",
    }
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("overwrite").saveAsTable(src_table)

image.png

1 回目の書き込み処理

dbutils.notebook.run("./write_to_table__treated", 0)

image.png

display の結果とテーブルの比較 (1 回目)

想定通りの結果になりました。

image.png

2 回目のデータ準備

import datetime

schema_01 = """
COL_1 string,
ingest_date date,
delete_flg integer,
rescued_data string
"""
data_01 = [
    {
        "COL_1": "P1",
        "ingest_date": datetime.date(2020, 1, 2),
        "delete_flg": 0,
        "rescued_data": "",
    }
]
df = spark.createDataFrame(data_01, schema_01)
df.write.mode("append").saveAsTable(src_table)

image.png

2 回目の書き込み処理

dbutils.notebook.run("./write_to_table", 0)

image.png

display の結果とテーブルの比較 (2 回目)

想定通りの結果が得られました。

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?