概要
Databricks に作成したDelta Lake 形式のデータを Snowflake 管理 Apache Iceberg テーブルの仕様(マイクロパーティションに近い仕様)に合わせる方法の検討結果を共有します。本記事では、成功した方法と失敗し方法を紹介します。
Snowflake 管理 Apache Iceberg テーブルのSTORAGE_SERIALIZATION_POLICY
オプションをOPTIMIZED
に指定した場合には下記の設定がされていることを確認できました。
- 圧縮形式は非圧縮(uncompressed)
- 1 つの Parquet ファイルのサイズが 16 ~ 17 MB 程度
その仕様に近い Parquet ファイルを出力するために、ファインサイズを最適化する関数による方法を確立しました。厳密には 16 MB ではないですが、微調整すれば 16 ~ 17 MB 程度に近づけることができます。
成功した方法
マイクロパーティション仕様に最適化する関数による方法
利用する関数
spark.sql.parquet.compression.codec
を指定していることで、2025年3月10日時点で Databricks Notebook Serverless にてエラーが発生するため、通常のクラスターで実行してください。
import math
def optimize_for_micro_partitions(
table_name: str,
filter_cond: str = None,
desired_partition_size_mb: int = 16,
set_uncompressed: bool = True,
overhead_factor: float = 1.1,
) -> None:
"""
Deltaテーブルに対して、指定されたフィルタ条件に基づくレコードのパーティションを
希望する1パーティションあたりのサイズ(MB単位、デフォルトは16MB)に最適化します。
Args:
table_name (str): 対象テーブル名(例: "db.schema.table")
filter_cond (str, optional): 書き出し対象のフィルタ条件(例: "l_quantity = 3")。Noneの場合、全件対象となる。
desired_partition_size_mb (int): 希望する1パーティションのサイズ (MB単位)
set_uncompressed (bool): Trueの場合、spark.sql.parquet.compression.codecを"uncompressed"に設定する
overhead_factor (float): 書き込み時のオーバーヘッドを見込むための係数(例: 1.1)
"""
# オプション設定: spark.conf設定を実施
if set_uncompressed:
spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
print("spark.sql.parquet.compression.codec を 'uncompressed' に設定しました。")
# 1. DESCRIBE DETAILでテーブル全体のサイズ(バイト)を取得する
detail_df = spark.sql(f"DESCRIBE DETAIL {table_name}")
table_detail = detail_df.collect()[0]
table_size_bytes = table_detail["sizeInBytes"]
print("テーブル全体のサイズ (bytes):", table_size_bytes)
# 2. テーブル全体の総レコード数を取得
full_df = spark.read.table(table_name)
total_count = full_df.count()
print("テーブル全体の総レコード数:", total_count)
# 3. フィルタ条件に基づき、フィルタ後のDataFrameを作成
if filter_cond:
filtered_df = full_df.filter(filter_cond)
filtered_count = filtered_df.count()
print("フィルタ後のレコード数:", filtered_count)
else:
filtered_df = full_df
filtered_count = total_count
print("filter_cond が空の場合、全件のレコード数:", filtered_count)
# 4. フィルタ後DataFrameの概算サイズ(bytes)を計算
# テーブル全体サイズに対するフィルタ件数の比率で概算サイズを求める
estimated_filtered_size = table_size_bytes * (filtered_count / total_count)
print("フィルタ後DataFrameの概算サイズ (bytes):", estimated_filtered_size)
# 5. 希望する1パーティションのサイズをバイト単位に変換
desired_partition_size = desired_partition_size_mb * 1024 * 1024
# 6. オーバーヘッドを考慮した概算サイズから必要なパーティション数を計算
num_partitions = math.ceil((estimated_filtered_size * overhead_factor) / desired_partition_size)
print("再分割するパーティション数:", num_partitions)
# 7. 推定1パーティションあたりのサイズ (MB)
estimated_partition_mb = (estimated_filtered_size * overhead_factor) / num_partitions / (1024 * 1024)
print(f"推定1パーティションあたりのサイズ: {estimated_partition_mb:.2f} MB")
# 8. DataFrame の再パーティション化と書き込み
filtered_df_repartitioned = filtered_df.repartition(num_partitions)
writer = filtered_df_repartitioned.write.format("delta").mode("overwrite")
if filter_cond:
print("Selective overwrite 開始")
writer = writer.option("replaceWhere", filter_cond)
writer.saveAsTable(table_name)
print("Selective overwrite 終了")
else:
print("Overwrite 開始")
writer.saveAsTable(table_name)
print("Overwrite 終了")
検証結果
検証用のデータベースを作成
%sql
CREATE CATALOG iceberg_snowflake_test;
テーブルのソースとなる一時ビューを作成します。
%sql
-- 1. CSV ファイルを読み込むテーブル(または一時ビュー)の作成
CREATE OR REPLACE TEMP VIEW _temp_customer_csv (
c_custkey long,
c_name string,
c_address string,
c_nationkey long,
c_phone string,
c_acctbal decimal(12, 2),
c_mktsegment string,
c_comment string
)
USING CSV
OPTIONS (
path 'dbfs:/databricks-datasets/tpch/data-001/customer/customer.tbl',
header 'false',
delimiter '|'
);
ファイルサイズの比較をしやすいように、テーブルの DROP 後にソースの一時ビューから CTAS によりテーブルを作成します。
%sql
DROP TABLE IF EXISTS iceberg_snowflake_share_test.default.customer_delta;
-- 2. 上記データから Delta Lake テーブルを作成(書き出し時のターゲットファイルサイズを 16 MB に設定)
CREATE OR REPLACE TABLE iceberg_snowflake_share_test.default.customer_delta
USING DELTA
TBLPROPERTIES (
'delta.enableDeletionVectors' = false
)
AS
SELECT * FROM _temp_customer_csv;
関数の定義と実行を行います。
import math
def optimize_for_micro_partitions(
table_name: str,
filter_cond: str = None,
desired_partition_size_mb: int = 16,
set_uncompressed: bool = True,
overhead_factor: float = 1.1,
) -> None:
"""
Deltaテーブルに対して、指定されたフィルタ条件に基づくレコードのパーティションを
希望する1パーティションあたりのサイズ(MB単位、デフォルトは16MB)に最適化します。
Args:
table_name (str): 対象テーブル名(例: "db.schema.table")
filter_cond (str, optional): 書き出し対象のフィルタ条件(例: "l_quantity = 3")。Noneの場合、全件対象となる。
desired_partition_size_mb (int): 希望する1パーティションのサイズ (MB単位)
set_uncompressed (bool): Trueの場合、spark.sql.parquet.compression.codecを"uncompressed"に設定する
overhead_factor (float): 書き込み時のオーバーヘッドを見込むための係数(例: 1.1)
"""
# オプション設定: spark.conf設定を実施
if set_uncompressed:
spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
print("spark.sql.parquet.compression.codec を 'uncompressed' に設定しました。")
# 1. DESCRIBE DETAILでテーブル全体のサイズ(バイト)を取得する
detail_df = spark.sql(f"DESCRIBE DETAIL {table_name}")
table_detail = detail_df.collect()[0]
table_size_bytes = table_detail["sizeInBytes"]
print("テーブル全体のサイズ (bytes):", table_size_bytes)
# 2. テーブル全体の総レコード数を取得
full_df = spark.read.table(table_name)
total_count = full_df.count()
print("テーブル全体の総レコード数:", total_count)
# 3. フィルタ条件に基づき、フィルタ後のDataFrameを作成
if filter_cond:
filtered_df = full_df.filter(filter_cond)
filtered_count = filtered_df.count()
print("フィルタ後のレコード数:", filtered_count)
else:
filtered_df = full_df
filtered_count = total_count
print("filter_cond が空の場合、全件のレコード数:", filtered_count)
# 4. フィルタ後DataFrameの概算サイズ(bytes)を計算
# テーブル全体サイズに対するフィルタ件数の比率で概算サイズを求める
estimated_filtered_size = table_size_bytes * (filtered_count / total_count)
print("フィルタ後DataFrameの概算サイズ (bytes):", estimated_filtered_size)
# 5. 希望する1パーティションのサイズをバイト単位に変換
desired_partition_size = desired_partition_size_mb * 1024 * 1024
# 6. オーバーヘッドを考慮した概算サイズから必要なパーティション数を計算
num_partitions = math.ceil((estimated_filtered_size * overhead_factor) / desired_partition_size)
print("再分割するパーティション数:", num_partitions)
# 7. 推定1パーティションあたりのサイズ (MB)
estimated_partition_mb = (estimated_filtered_size * overhead_factor) / num_partitions / (1024 * 1024)
print(f"推定1パーティションあたりのサイズ: {estimated_partition_mb:.2f} MB")
# 8. DataFrame の再パーティション化と書き込み
filtered_df_repartitioned = filtered_df.repartition(num_partitions)
writer = filtered_df_repartitioned.write.format("delta").mode("overwrite")
if filter_cond:
print("Selective overwrite 開始")
writer = writer.option("replaceWhere", filter_cond)
writer.saveAsTable(table_name)
print("Selective overwrite 終了")
else:
print("Overwrite 開始")
writer.saveAsTable(table_name)
print("Overwrite 終了")
# 使用例
optimize_for_micro_partitions(
table_name="iceberg_snowflake_share_test.default.customer_delta",
filter_cond=None,
)
下記の出力結果のlocation
列からストレージのディレクトリを確認
%sql
DESCRIBE DETAIL iceberg_snowflake_share_test.default.customer_delta;
30 MB だった parquet ファイルが 13 MB 程度になっていてことを確認します。
失敗した方法
delta.targetFileSize
プロパティを指定する方法
Delta Lake テーブルのdelta.targetFileSize
プロパティでファイルサイズを調整できないかと検討しましたが、ドキュメントにてベストエフォートと書かれてこともあり想定のファイルサイズにはなりませんでした。
指定されたサイズのファイルを生成するためのベストエフォートの試みが行われます。
出所:データ ファイル サイズを制御するように Delta Lake を構成する - Azure Databricks | Microsoft Learn