概要
Google Colab の Spark を Iceberg クライアントとして Databricks の UniForm が有効な Delta テーブルを読み取る方法を紹介します。
手順
Databricks にてカタログとテーブルを構築
カタログ作成、テーブル作成、及び、データ挿入を実施します。テーブル作成時には、 Iceberg の
%sql
CREATE CATALOG delta_uniform;
%sql
CREATE TABLE delta_uniform.default.nation_iceberg (
n_nationkey integer
,n_name string
,n_regionkey integer
,n_comment string
)
TBLPROPERTIES(
'delta.columnMapping.mode' = 'name',
'delta.enableIcebergCompatV2' = 'true',
'delta.universalFormat.enabledFormats' = 'iceberg'
);
%sql
INSERT OVERWRITE delta_uniform.default.nation_iceberg
SELECT
n_nationkey
,n_name
,n_regionkey
,n_comment
FROM
samples.tpch.nation
25 レコードあることを確認します。
%sql
SELECT * FROM delta_uniform.default.nation_iceberg;
Databricks にて Token を取得
表示された値を控えます。
%pip install databricks-sdk --upgrade --q
dbutils.library.restartPython()
from databricks.sdk import WorkspaceClient
client = WorkspaceClient()
token_response = client.tokens.create(
lifetime_seconds=3600,
comment="PAT token for API access",
)
pat_token = token_response.token_value
print(pat_token)
Google Colab の Spark から Iceberg クライアントとしてアクセス
Google Colab のノートブックを作成し、 Spark のバージョンを確認します。
!pyspark --version
Databricks に関する情報をセットします。token には前の手順で取得した token を設定します。
# Databricks に関する情報をセット
catalog_name = "delta_uniform"
workspace_url = "adb-XXXX.XX.azuredatabricks.net"
token = "dkeaxxxx-3"
SparkSession を定義します。本手順は Azure Storage を想定しています。
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("iceberg_lab")
.config(
"spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,"
"org.apache.iceberg:iceberg-azure-bundle:1.8.1",
)
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",)
.config(f"spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog")
.config(f"spark.sql.catalog.spark_catalog.type", "rest")
.config(f"spark.sql.catalog.spark_catalog.uri",f"https://{workspace_url}/api/2.1/unity-catalog/iceberg")
.config(f"spark.sql.catalog.spark_catalog.token", token)
.config(f"spark.sql.catalog.spark_catalog.warehouse", catalog_name)
.getOrCreate()
)
spark
利用するクラウドベンダーごとに異なる Apache Iceberg の iceberg バンドルを指定する必要があります。
出所:Iceberg クライアントから Databricks テーブルを読み取る | Databricks Documentation
データを表示できることを確認します。
df = spark.table("default.nation_iceberg")
df.show(truncate=False)