概要
Databricks (Spark) における Update 文の動作仕様を確認した上で、 Update 文を多用した際のパフォーマンスの懸念事項をその対処方法を記述します。
Databricks では、述語(Where 句)を指定しない場合に Update 文を実行すると、Overwrite (Turncate + Load)の動作となり全件データの再書き込みが実施されれます。
RDB のように部分的なデータ更新が行われるわけではないため、多数の Update 文を実行するとパフォーマンスの問題が発生します
その事象に対しては、次のような対応方法があります。
- Update 文を 1 つにまとめて実行
- データフレーム操作として書き込みを実行
それぞれの方法の実行時間を下記表にまとめました。提示した対応方法の実行時間という観点で改善されていることを確認できます。なお、この実行時間自体が、Databricks の性能を示してはいないことに注意してください。
# | 実施方法 | 実施時間の平均 | 1回目 | 2回目 | 3回目 |
---|---|---|---|---|---|
1 | 多数の Update 文を実行 | 1276 | 1265.104 | 1250.705 | 1312.276 |
2 | Update 文を 1 つにまとめて実行 | 84 | 83.05164 | 85.75425 | 83.97393 |
3 | データフレーム操作として書き込みを実行 | 84 | 86.87745 | 84.12714 | 81.81258 |
本記事では、事前準備として、Update の動作確認を行います。その後、多数の Update 文を実行した際のパフォーマンス問題を確認し、パフォーマンスを改善する方法を紹介します。
事前準備
検証で利用するカタログやテーブルを作成します。サンプルデータとして、databricks-datasets
上のファイルを利用し、29,999,795 件のレコードがあります。
tgt_tbl_name = "update_test_01.schema_01.lineitem"
spark.sql("CREATE CATALOG IF NOT EXISTS update_test_01")
spark.sql("CREATE SCHEMA IF NOT EXISTS update_test_01.schema_01;")
spark.sql(f"DROP TABLE IF EXISTS {tgt_tbl_name};")
spark.sql(
f"""
CREATE TABLE {tgt_tbl_name} (
L_ORDERKEY STRING,
L_PARTKEY STRING,
L_SUPPKEY STRING,
L_LINENUMBER STRING,
L_QUANTITY STRING,
L_EXTENDEDPRICE STRING,
L_DISCOUNT STRING,
L_TAX STRING,
L_RETURNFLAG STRING,
L_LINESTATUS STRING,
L_SHIPDATE STRING,
L_COMMITDATE STRING,
L_RECEIPTDATE STRING,
L_SHIPINSTRUCT STRING,
L_SHIPMODE STRING,
L_COMMENT STRING
);
"""
)
filepath = "dbfs:/databricks-datasets/tpch/data-001/lineitem/lineitem.tbl"
schema = """
L_ORDERKEY STRING ,
L_PARTKEY STRING ,
L_SUPPKEY STRING ,
L_LINENUMBER STRING ,
L_QUANTITY STRING ,
L_EXTENDEDPRICE STRING ,
L_DISCOUNT STRING ,
L_TAX STRING ,
L_RETURNFLAG STRING ,
L_LINESTATUS STRING ,
L_SHIPDATE STRING ,
L_COMMITDATE STRING ,
L_RECEIPTDATE STRING ,
L_SHIPINSTRUCT STRING ,
L_SHIPMODE STRING ,
L_COMMENT STRING
"""
df = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)
df.write.mode("overwrite").saveAsTable(tgt_tbl_name)
df = spark.table(tgt_tbl_name)
print(f"count: {df.count()}")
df.display()
Update の動作確認
Update の実行
Update 文を実行して、Delta lake の Transaction Log の表示します。
# Upadte を実行
sql = f"""
UPDATE {tgt_tbl_name}
SET L_ORDERKEY = TRIM(L_ORDERKEY)
"""
spark.sql(sql)
# Delta Log を表示
spark.sql(f"desc history {tgt_tbl_name}").display()
Update の実行結果を確認
Delta LogのoperationMetricsの内容は以下の通りです。Version 1のoperationMetrics
には、numOutputBytes
のサイズが 1245802377 バイトと記録されています。これは、Version 2のnumRemovedBytes
と同等のサイズです。また、numAddedBytes
はほぼ同等のサイズで、numUpdatedRows
では全件が処理されていることが確認できます。これらの情報から、初回の書き込みで全データが削除され、その後で全件が再度書き込まれたことがわかります。
# Version 1 の結果
{
"numFiles": "29",
"numOutputRows": "29999795",
"numOutputBytes": "1245802377"
}
# Version 2 の結果
{
"numRemovedFiles": "29",
"numRemovedBytes": "1245802377",
"numCopiedRows": "0",
"numDeletionVectorsAdded": "0",
"numDeletionVectorsRemoved": "0",
"numAddedChangeFiles": "0",
"executionTimeMs": "79448",
"scanTimeMs": "31",
"numAddedFiles": "10",
"numUpdatedRows": "29999795",
"numAddedBytes": "1237449311",
"rewriteTimeMs": "79417"
}
多数の Update 文を実行した際のパフォーマンス問題の確認
カラムをTRIM する処理を行う処理を行う Update 文をカラムごとに実行します。
tgt_columns = [
"L_ORDERKEY",
"L_PARTKEY",
"L_SUPPKEY",
"L_LINENUMBER",
"L_QUANTITY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_TAX",
"L_RETURNFLAG",
"L_LINESTATUS",
"L_SHIPDATE",
"L_COMMITDATE",
"L_RECEIPTDATE",
"L_SHIPINSTRUCT",
"L_SHIPMODE",
"L_COMMENT",
]
tgt_sqls = []
for tgt_col in tgt_columns:
sql = f"""
UPDATE {tgt_tbl_name}
SET {tgt_col} = TRIM({tgt_col})
"""
tgt_sqls.append(sql)
print(len(tgt_sqls))
import time
start_time = time.time()
for tgt_sql in tgt_sqls:
spark.sql(tgt_sql)
end_time = time.time()
time_difference = end_time - start_time
print(f"The time difference in seconds is: {time_difference}")
パフォーマンス改善
1. Update 文を 1 つにまとめて実行
tgt_sqls = []
tgt_columns = [
"L_ORDERKEY",
"L_PARTKEY",
"L_SUPPKEY",
"L_LINENUMBER",
"L_QUANTITY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_TAX",
"L_RETURNFLAG",
"L_LINESTATUS",
"L_SHIPDATE",
"L_COMMITDATE",
"L_RECEIPTDATE",
"L_SHIPINSTRUCT",
"L_SHIPMODE",
"L_COMMENT",
]
cols = ',\n '.join([f"{col} = TRIM({col})" for col in tgt_columns])
sql = f"""
UPDATE {tgt_tbl_name}
SET
{cols}
;
"""
print(sql)
tgt_sqls.append(sql)
print(len(tgt_sqls))
以下の SQL を実行する想定です。
UPDATE update_test_01.schema_01.lineitem
SET
L_ORDERKEY = TRIM(L_ORDERKEY),
L_PARTKEY = TRIM(L_PARTKEY),
L_SUPPKEY = TRIM(L_SUPPKEY),
L_LINENUMBER = TRIM(L_LINENUMBER),
L_QUANTITY = TRIM(L_QUANTITY),
L_EXTENDEDPRICE = TRIM(L_EXTENDEDPRICE),
L_DISCOUNT = TRIM(L_DISCOUNT),
L_TAX = TRIM(L_TAX),
L_RETURNFLAG = TRIM(L_RETURNFLAG),
L_LINESTATUS = TRIM(L_LINESTATUS),
L_SHIPDATE = TRIM(L_SHIPDATE),
L_COMMITDATE = TRIM(L_COMMITDATE),
L_RECEIPTDATE = TRIM(L_RECEIPTDATE),
L_SHIPINSTRUCT = TRIM(L_SHIPINSTRUCT),
L_SHIPMODE = TRIM(L_SHIPMODE),
L_COMMENT = TRIM(L_COMMENT)
;
import time
start_time = time.time()
for tgt_sql in tgt_sqls:
spark.sql(tgt_sql)
end_time = time.time()
time_difference = end_time - start_time
print(f"The time difference in seconds is: {time_difference}")
2. データフレーム操作として書き込みを実行
from pyspark.sql.functions import expr
tgt_columns = [
"L_ORDERKEY",
"L_PARTKEY",
"L_SUPPKEY",
"L_LINENUMBER",
"L_QUANTITY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_TAX",
"L_RETURNFLAG",
"L_LINESTATUS",
"L_SHIPDATE",
"L_COMMITDATE",
"L_RECEIPTDATE",
"L_SHIPINSTRUCT",
"L_SHIPMODE",
"L_COMMENT",
]
columns_conf = {}
for tgt_col in tgt_columns:
columns_conf[tgt_col] = expr(f"TRIM({tgt_col})")
print(columns_conf)
import time
start_time = time.time()
df = spark.table(tgt_tbl_name)
df = df.withColumns(columns_conf)
df.write.mode("overwrite").saveAsTable(tgt_tbl_name)
end_time = time.time()
time_difference = end_time - start_time
print(f"The time difference in seconds is: {time_difference}")
事後処理
カタログを削除
spark.sql("DROP CATALOG IF EXISTS update_test_01 CASCADE")