概要
Google Colab の Spark にて Delta Lake の動作検証をした結果を共有します。 Valinlla Spark にて Delta Lake を動作させたかっため本検証を実施しました。
検証結果
Spark のバージョンを確認
!pyspark --version
SparkSession を定義
Spark のバージョンに応じてspark.jars.packages
で指定するライブラリのバージョンを変更する必要あり。
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("DeltaLakeExample")
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.1")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.warehouse.dir", "/delta")
.getOrCreate()
)
spark
テーブルを作成
spark.sql("""
CREATE OR REPLACE TABLE first_table (
id INT,
name STRING
)
USING delta
""")
テーブルにデータを書き込み
schema = """
id INT,
name STRING
"""
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, schema)
display(df.toPandas())
# Delta テーブルに上書き書き込み
df.write.mode("overwrite").format("delta").saveAsTable("first_table")
テーブルのデータを確認
display(spark.table("first_table").toPandas())
テーブルの履歴の確認
display(spark.sql("DESCRIBE HISTORY first_table").orderBy("version").toPandas())