1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksにおけるUpdate文の効率的な利用:多数のUpdate文実行によるパフォーマンス問題とその解決策

Last updated at Posted at 2024-02-23

概要

Databricks (Spark) における Update 文の動作仕様を確認した上で、 Update 文を多用した際のパフォーマンスの懸念事項をその対処方法を記述します。

Databricks では、述語(Where 句)を指定しない場合に Update 文を実行すると、Overwrite (Turncate + Load)の動作となり全件データの再書き込みが実施されれます。

RDB のように部分的なデータ更新が行われるわけではないため、多数の Update 文を実行するとパフォーマンスの問題が発生します

その事象に対しては、次のような対応方法があります。

  1. Update 文を 1 つにまとめて実行
  2. データフレーム操作として書き込みを実行

それぞれの方法の実行時間を下記表にまとめました。提示した対応方法の実行時間という観点で改善されていることを確認できます。なお、この実行時間自体が、Databricks の性能を示してはいないことに注意してください。

# 実施方法 実施時間の平均 1回目 2回目 3回目
1 多数の Update 文を実行 1276 1265.104 1250.705 1312.276
2 Update 文を 1 つにまとめて実行 84 83.05164 85.75425 83.97393
3 データフレーム操作として書き込みを実行 84 86.87745 84.12714 81.81258

本記事では、事前準備として、Update の動作確認を行います。その後、多数の Update 文を実行した際のパフォーマンス問題を確認し、パフォーマンスを改善する方法を紹介します。

事前準備

検証で利用するカタログやテーブルを作成します。サンプルデータとして、databricks-datasets上のファイルを利用し、29,999,795 件のレコードがあります。

tgt_tbl_name = "update_test_01.schema_01.lineitem"
spark.sql("CREATE CATALOG IF NOT EXISTS update_test_01")
spark.sql("CREATE SCHEMA IF NOT EXISTS update_test_01.schema_01;")

image.png

spark.sql(f"DROP TABLE IF EXISTS {tgt_tbl_name};")
spark.sql(
    f"""
    CREATE TABLE {tgt_tbl_name} (
    L_ORDERKEY STRING,
    L_PARTKEY STRING,
    L_SUPPKEY STRING,
    L_LINENUMBER STRING,
    L_QUANTITY STRING,
    L_EXTENDEDPRICE STRING,
    L_DISCOUNT STRING,
    L_TAX STRING,
    L_RETURNFLAG STRING,
    L_LINESTATUS STRING,
    L_SHIPDATE STRING,
    L_COMMITDATE STRING,
    L_RECEIPTDATE STRING,
    L_SHIPINSTRUCT STRING,
    L_SHIPMODE STRING,
    L_COMMENT STRING
    );
    """
)

image.png

filepath = "dbfs:/databricks-datasets/tpch/data-001/lineitem/lineitem.tbl"

schema = """
  L_ORDERKEY    STRING ,
  L_PARTKEY     STRING ,
  L_SUPPKEY     STRING ,
  L_LINENUMBER  STRING ,
  L_QUANTITY    STRING ,
  L_EXTENDEDPRICE  STRING ,
  L_DISCOUNT    STRING ,
  L_TAX         STRING ,
  L_RETURNFLAG  STRING ,
  L_LINESTATUS  STRING ,
  L_SHIPDATE    STRING ,
  L_COMMITDATE  STRING ,
  L_RECEIPTDATE STRING ,
  L_SHIPINSTRUCT STRING ,
  L_SHIPMODE     STRING ,
  L_COMMENT      STRING
"""

df = spark.read.format("csv").schema(schema).option("sep", "|").load(filepath)

df.write.mode("overwrite").saveAsTable(tgt_tbl_name)
df = spark.table(tgt_tbl_name)
print(f"count: {df.count()}")
df.display()

image.png

Update の動作確認

Update の実行

Update 文を実行して、Delta lake の Transaction Log の表示します。

# Upadte を実行
sql = f"""
UPDATE {tgt_tbl_name}
SET L_ORDERKEY = TRIM(L_ORDERKEY)
"""
spark.sql(sql)
# Delta Log を表示
spark.sql(f"desc history {tgt_tbl_name}").display()

image.png

Update の実行結果を確認

Delta LogのoperationMetricsの内容は以下の通りです。Version 1のoperationMetricsには、numOutputBytesのサイズが 1245802377 バイトと記録されています。これは、Version 2のnumRemovedBytesと同等のサイズです。また、numAddedBytesはほぼ同等のサイズで、numUpdatedRowsでは全件が処理されていることが確認できます。これらの情報から、初回の書き込みで全データが削除され、その後で全件が再度書き込まれたことがわかります。

# Version 1 の結果
{
  "numFiles": "29",
  "numOutputRows": "29999795",
  "numOutputBytes": "1245802377"
}

# Version 2 の結果
{
  "numRemovedFiles": "29",
  "numRemovedBytes": "1245802377",
  "numCopiedRows": "0",
  "numDeletionVectorsAdded": "0",
  "numDeletionVectorsRemoved": "0",
  "numAddedChangeFiles": "0",
  "executionTimeMs": "79448",
  "scanTimeMs": "31",
  "numAddedFiles": "10",
  "numUpdatedRows": "29999795",
  "numAddedBytes": "1237449311",
  "rewriteTimeMs": "79417"
}

多数の Update 文を実行した際のパフォーマンス問題の確認

カラムをTRIM する処理を行う処理を行う Update 文をカラムごとに実行します。

tgt_columns = [
    "L_ORDERKEY",
    "L_PARTKEY",
    "L_SUPPKEY",
    "L_LINENUMBER",
    "L_QUANTITY",
    "L_EXTENDEDPRICE",
    "L_DISCOUNT",
    "L_TAX",
    "L_RETURNFLAG",
    "L_LINESTATUS",
    "L_SHIPDATE",
    "L_COMMITDATE",
    "L_RECEIPTDATE",
    "L_SHIPINSTRUCT",
    "L_SHIPMODE",
    "L_COMMENT",
]

tgt_sqls = []

for tgt_col in tgt_columns:
    sql = f"""
    UPDATE {tgt_tbl_name}
    SET {tgt_col} = TRIM({tgt_col})
    """
    tgt_sqls.append(sql)

print(len(tgt_sqls))
import time

start_time = time.time()
for tgt_sql in tgt_sqls:
    spark.sql(tgt_sql)
end_time = time.time()
time_difference = end_time - start_time

print(f"The time difference in seconds is: {time_difference}")

image.png

image.png

パフォーマンス改善

1. Update 文を 1 つにまとめて実行

tgt_sqls = []

tgt_columns = [
    "L_ORDERKEY",
    "L_PARTKEY",
    "L_SUPPKEY",
    "L_LINENUMBER",
    "L_QUANTITY",
    "L_EXTENDEDPRICE",
    "L_DISCOUNT",
    "L_TAX",
    "L_RETURNFLAG",
    "L_LINESTATUS",
    "L_SHIPDATE",
    "L_COMMITDATE",
    "L_RECEIPTDATE",
    "L_SHIPINSTRUCT",
    "L_SHIPMODE",
    "L_COMMENT",
]

cols = ',\n    '.join([f"{col} = TRIM({col})" for col in tgt_columns])
sql = f"""
UPDATE {tgt_tbl_name}
  SET 
    {cols}
;
"""
print(sql)
tgt_sqls.append(sql)
print(len(tgt_sqls))

以下の SQL を実行する想定です。

UPDATE update_test_01.schema_01.lineitem
  SET 
    L_ORDERKEY = TRIM(L_ORDERKEY),
    L_PARTKEY = TRIM(L_PARTKEY),
    L_SUPPKEY = TRIM(L_SUPPKEY),
    L_LINENUMBER = TRIM(L_LINENUMBER),
    L_QUANTITY = TRIM(L_QUANTITY),
    L_EXTENDEDPRICE = TRIM(L_EXTENDEDPRICE),
    L_DISCOUNT = TRIM(L_DISCOUNT),
    L_TAX = TRIM(L_TAX),
    L_RETURNFLAG = TRIM(L_RETURNFLAG),
    L_LINESTATUS = TRIM(L_LINESTATUS),
    L_SHIPDATE = TRIM(L_SHIPDATE),
    L_COMMITDATE = TRIM(L_COMMITDATE),
    L_RECEIPTDATE = TRIM(L_RECEIPTDATE),
    L_SHIPINSTRUCT = TRIM(L_SHIPINSTRUCT),
    L_SHIPMODE = TRIM(L_SHIPMODE),
    L_COMMENT = TRIM(L_COMMENT)
;

image.png

import time

start_time = time.time()
for tgt_sql in tgt_sqls:
    spark.sql(tgt_sql)
end_time = time.time()
time_difference = end_time - start_time

print(f"The time difference in seconds is: {time_difference}")

image.png

2. データフレーム操作として書き込みを実行

from pyspark.sql.functions import expr

tgt_columns = [
    "L_ORDERKEY",
    "L_PARTKEY",
    "L_SUPPKEY",
    "L_LINENUMBER",
    "L_QUANTITY",
    "L_EXTENDEDPRICE",
    "L_DISCOUNT",
    "L_TAX",
    "L_RETURNFLAG",
    "L_LINESTATUS",
    "L_SHIPDATE",
    "L_COMMITDATE",
    "L_RECEIPTDATE",
    "L_SHIPINSTRUCT",
    "L_SHIPMODE",
    "L_COMMENT",
]

columns_conf = {}
for tgt_col in tgt_columns:
    columns_conf[tgt_col] = expr(f"TRIM({tgt_col})")

print(columns_conf)

image.png

import time

start_time = time.time()

df = spark.table(tgt_tbl_name)

df = df.withColumns(columns_conf)

df.write.mode("overwrite").saveAsTable(tgt_tbl_name)

end_time = time.time()
time_difference = end_time - start_time

print(f"The time difference in seconds is: {time_difference}")

image.png

事後処理

カタログを削除

spark.sql("DROP CATALOG IF EXISTS update_test_01 CASCADE")

image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?