概要
Spark から Hadoop Catalog の Apache Iceberg で処理する際に生成されるversion-hint.text
の必要性に疑問があったため検証しました。 Spark においては、version-hint.text
の内容に基づかずに、最新のバージョンのデータを取得するような動作となりました。検証コードと結果を共有します。
事前準備
Google Colab 上にノートブックを作成
Spark のバージョンを確認
!pyspark --version
SparkSession を定義
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("IcebergExample")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/iceberg")
.getOrCreate()
)
spark
Namespace を作成
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberf_gc ;")
テーブルの作成とデータ挿入
spark.sql("""
CREATE OR REPLACE TABLE iceberf_gc.first_table (
id INT,
name STRING
)
USING iceberg
;""")
spark.sql("""
INSERT INTO iceberf_gc.first_table
SELECT 1, 'a'
;""")
spark.sql("""
INSERT INTO iceberf_gc.first_table
SELECT 2, 'b'
;""")
spark.sql("""
INSERT INTO iceberf_gc.first_table
SELECT 3, 'c'
;""")
テーブルのデータの確認
display(
spark.read
.table("iceberf_gc.first_table")
.orderBy("id")
.toPandas()
)
テーブルのmetadata
ディレクトリを確認
!ls /iceberg/iceberf_gc/first_table/metadata
version-hint.text の内容を確認
!cat /iceberg/iceberf_gc/first_table/metadata/version-hint.text
version-hint.text の検証
version-hint.text の内容を2
に修正
version-hint.text
ファイルの内容を2
に上書きしてクエリを実行したところ、最新のテーブルのデータを取得したことを確認。
!echo "2" > /iceberg/iceberf_gc/first_table/metadata/version-hint.text
!cat /iceberg/iceberf_gc/first_table/metadata/version-hint.text
from pyspark.sql import SparkSession
if spark:
spark.stop()
print("SparkSession を停止しました。")
spark = (SparkSession.builder
.appName("IcebergExample")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/iceberg")
.getOrCreate()
)
spark
display(
spark.read
.table("iceberf_gc.first_table")
.orderBy("id")
.toPandas()
)
version-hint.text を削除
version-hint.text
ファイルを削除後にクエリを実行したところ、最新のテーブルのデータを取得したことを確認。
!rm /iceberg/iceberf_gc/first_table/metadata/version-hint.text
!ls /iceberg/iceberf_gc/first_table/metadata
from pyspark.sql import SparkSession
if spark:
spark.stop()
print("SparkSession を停止しました。")
spark = (SparkSession.builder
.appName("IcebergExample")
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.0")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hadoop")
.config("spark.sql.catalog.spark_catalog.warehouse", "/iceberg")
.getOrCreate()
)
spark
display(
spark.read
.table("iceberf_gc.first_table")
.orderBy("id")
.toPandas()
)