Databricksの標準データ形式のDeltaフォーマットのデータをGoogle Colaboratory上で扱ってみたいと思います。
Azure EventhubsのデータについてはADLS gen2上のDeltaフォーマットで出力できるので、そちらで試しています。
1: pysparkのインストール
!pip install pyspark
SparkSessionの設定
io.delta.sql.DeltaSparkSessionExtension
とパッケージを設定
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("Delta on ADLS Gen2")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0,org.apache.hadoop:hadoop-azure:3.3.4,org.apache.hadoop:hadoop-azure-datalake:3.3.4,com.microsoft.azure:azure-storage:7.0.1")
.getOrCreate()
)
3: ADLS gen2用のアカウントキーを設定
キーの部分は変更してください。
spark.conf.set(
"fs.azure.account.key.eventhubsteststoragegen2.dfs.core.windows.net",
"AbCdEfGhiJkLmNoPqRsT/uVwXyZaBcDe+FgH1a2b3c="
)
4: Deltaフォーマットで読み込み
ADLS gen2のコンテナ名とテーブル名は適宜指定してください。
df = spark.read.format("delta").load(
"abfss://eventhubs-container-name@eventhubsteststoragegen2.dfs.core.windows.net/test-eventhub-gen2"
)
5: データフレーム表示
display(df)
以下の感じで表示されます。
+---+---------------------+-----------+--------------------+
|aaa|EventProcessedUtcTime|PartitionId|EventEnqueuedUtcTime|
+---+---------------------+-----------+--------------------+
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
|ccc| 2024-09-21 13:03:...| 0|2024-09-21 13:03:...|
+---+---------------------+-----------+--------------------+
まとめ
Colaboratory上で pysparkを使用してDeltaフォーマットのデータを読み取る方法を記載しました。