概要
Apache Iceberg で Spark を利用するときに、rewrite_table_path
プロシージャを試した結果を共有します。これは、DR 対策の一環として別のディレクトリに Apache Iceberg のデータを複製したい場合などに便利です。rewrite_table_path
プロシージャは Apache Iceberg 1.8.0 でリリースされました。
出所:Procedures - Apache Iceberg™
出所:1.8.0 release - Apache Iceberg™
Apache Iceberg ではメタデータファイルに絶対パスが記載されているため、ディレクトリごとデータをコピーしただけではテーブルとして認識されません。バックアップを別ストレージに移動するといったケースでは不便な仕様でした。なお、メタデータファイルの内容を確認する方法は下記の記事で紹介しています。
出所:Getting started with PyIceberg と Iceberg で作成されるファイル郡の確認を Google Colab でやってみた #Python - Qiita
rewrite_table_path
プロシージャを使うと、メタデータディレクトリ配下にパスを書き換えたメタデータファイルを生成し、あわせて移動元と移動先のファイルリストを作成します。その後、このファイルリストに基づいてコピーや移動を行うことで、Apache Iceberg テーブルの複製を可能にします。
検証コードと結果
検証内容
Google Colab 上で複製元の test_table
テーブルを作成し、rewrite_table_path
プロシージャを実行します。プロシージャが出力するファイルリストに従ってファイルを移動し、テーブルを複製できることを確認します。
本記事で紹介する方法は、あくまで動作確認が目的です。
Spark のバージョンを確認
!pyspark --version
SparkSession の定義
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("IcebergExample")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/content/iceberg")
.getOrCreate()
)
spark
複製元のテーブルを作成
spark.sql("DROP TABLE IF EXISTS original.test_table")
spark.sql(f"""
CREATE TABLE original.test_table (
id BIGINT,
data STRING
)
USING iceberg
""")
spark.sql("INSERT INTO original.test_table VALUES (1, 'a'), (2, 'b')")
spark.sql("SELECT * FROM original.test_table").show()
spark.sql("DESC EXTENDED original.test_table").show(truncate=False)
rewrite_table_path プロシージャの実行
# rewrite_table_path プロシージャの実行
return_df = spark.sql(f"""
CALL spark_catalog.system.rewrite_table_path(
table => 'original.test_table',
source_prefix => "/content/iceberg/original/test_table",
target_prefix => "/content/iceberg/cloning/rewrite_table"
)
""")
return_df.show(truncate=False)
metadata
ディレクトリ配下に、copy-table-staging
で始まるディレクトリと、複製元テーブルのメタデータファイルと同名のファイルが作成されています。
また、file-list
ディレクトリ配下には CSV ファイルが生成されているのを確認できます。この CSV ファイル(ヘッダーなし)には、移動元ディレクトリと移動先ディレクトリのパスがペアで記載されています。このファイルをもとに実際のデータを移動させることで、Iceberg テーブルを複製できます。
/content/iceberg/original/test_table/data/00000-0-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet | /content/iceberg/cloning/rewrite_table/data/00000-0-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet |
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/v1.metadata.json | /content/iceberg/cloning/rewrite_table/metadata/v1.metadata.json |
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/v2.metadata.json | /content/iceberg/cloning/rewrite_table/metadata/v2.metadata.json |
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/snap-8317511617499303009-1-cb7dc205-bc3d-4744-8dbd-dda7685fac32.avro | /content/iceberg/cloning/rewrite_table/metadata/snap-8317511617499303009-1-cb7dc205-bc3d-4744-8dbd-dda7685fac32.avro |
/content/iceberg/original/test_table/metadata/copy-table-staging-334b1c32-0808-4c3a-8021-b27676cf4971/cb7dc205-bc3d-4744-8dbd-dda7685fac32-m0.avro | /content/iceberg/cloning/rewrite_table/metadata/cb7dc205-bc3d-4744-8dbd-dda7685fac32-m0.avro |
/content/iceberg/original/test_table/data/00001-1-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet | /content/iceberg/cloning/rewrite_table/data/00001-1-e6e91fc2-1546-476e-8b14-ab56d497ed10-0-00001.parquet |
ファイルリストのデータを取得
file_list = return_df.collect()[0].asDict()
file_location_df = spark.read.csv(file_list["file_list_location"], header=False)
print("-- original")
file_location_df.show(truncate=False)
print("-- file_location_df_renamed")
file_location_df_renamed = file_location_df.withColumnsRenamed({"_c0":"src", "_c1":"dist"})
file_location_df_renamed.show(truncate=False)
ファイルリストのデータに基づきファイルを移動
import os
import shutil
move_list = [row.asDict() for row in file_location_df_renamed.collect()]
for item in move_list:
src = item['src']
dist = item['dist']
# 宛先ディレクトリがなければ作成
os.makedirs(os.path.dirname(dist), exist_ok=True)
try:
shutil.copy2(src, dist)
print(f"Copied {src} -> {dist}")
except Exception as e:
print(f"Error copying {src} -> {dist}: {e}")
複製先のテーブルを確認
spark.sql("SELECT * FROM cloning.rewrite_table").show()
spark.sql("DESC EXTENDED cloning.rewrite_table").show(truncate=False)
メタデータファイル内の location
を確認すると、移動先ディレクトリが正しく設定されていることがわかります。
!cat /content/iceberg/cloning/rewrite_table/metadata/v2.metadata.json