はじめに
Oracle Cloud Infrastructure (OCI) Data Flow で Delta Lake がサポートされました。
ということで、基本的な動作を試してみたいと思います。
OCI Data Flow のドキュメントは こちら
Delta Lake とは
Delta Lake のサイト では、「Lakehouse アーキテクチャを構築できるオープンソース」だと書いてありますが、これではちょっと分からない...。
Delta Lake を使うと、オブジェクトストレージにあるデータソースに対してトランザクショナルな更新処理を行うようなユースケースに対応できます。また、タイムトラベル機能によって更新処理毎に都度スナップショットが記録されるため、タイムスタンプやバージョン番号で任意の時点のスナップショットを取得できます。素の Spark だと、書き込み処理のモードは、"overwrite" か "append" かのどちらかですが、Delta Lake を使うと特定の行を挿入・更新・削除するなどもう少し賢い更新処理が可能です。さらに、更新処理は ACID に行われるので、複数の更新が同時に発生した場合でもデータの一貫性が保たれます。その他、Delta Lake にはスキーマを検証して乱雑な書き込みを防ぎデータの品質を保つ機能などが備わっています。
OCI Data Flow で Delta Lake を使う
OCI Data Flow の Spark 3.2.1 処理エンジンと統合されているため、アプリケーション側で追加の Spark 構成を設定する必要はありません。Sparkのバージョンは 3.2.1 を選択して下さい (2022年11月現在)。
Python (pyspark)
ローカルで実行する訳ではないので、Delta Lake のライブラリが必須ではないですが、IDE での開発効率を高めるために pip でインストールしておきます。
pip install delta-spark
SparkSession の作成はいつも通りのやり方で OK です。
spark = SparkSession.builder.appName("Delta Lake Test").getOrCreate()
Scala
build.sbt に Delta Lake のライブラリを追加してコンパイルできるようにします。
libraryDependencies += "io.delta" %% "delta-core" % "2.1.1" % "provided"
こちらも SparkSession の作成はいつも通りのやり方で OK です。
val spark = SparkSession.builder().appName("Delta Lake Test").getOrCreate()
動作確認をする
今回使ったソース、Python (pyspark)版 と Scala 版はこちらに。
ここでは、pyspark 版で動作確認を見ていきます。
まずはオブジェクト・ストレージのパス (oci://~で始まる文字列) を引数から受取って、Delta Lake のファイルを書き出します。
deltaTablePath = sys.argv[1] # URL of Object Storage - oci://~
print(f">>> deltaTablePath: {deltaTablePath}")
print(">>> Create table with id in range(0, 5)")
spark.range(0, 5).withColumn("data", lit("create") ) \
.write.format("delta").mode("overwrite").save(deltaTablePath)
spark.read.format("delta").load(deltaTablePath).show()
出力
>>> Create table with id in range(0, 5)
+---+------+
| id| data|
+---+------+
| 2|create|
| 3|create|
| 4|create|
| 0|create|
| 1|create|
+---+------+
さらに 5行分データを追加します。
print(">>> Append rows with id in range(5, 10)")
spark.range(5, 10).withColumn("data", lit("append")) \
.write.format("delta").mode("append").save(deltaTablePath)
spark.read.format("delta").load(deltaTablePath).show()
出力
>>> Append rows with id in range(5, 10)
+---+------+
| id| data|
+---+------+
| 2|create|
| 3|create|
| 4|create|
| 7|append|
| 8|append|
| 9|append|
| 0|create|
| 1|create|
| 5|append|
| 6|append|
+---+------+
id が偶数の行のデータを更新してみます。
deltaTable = DeltaTable.forPath(spark, deltaTablePath)
print(">>> Update rows where id is even")
deltaTable.update(condition = expr("id % 2 == 0"), set = { "data" : lit("update") })
deltaTable.toDF().show()
出力
>>> Update rows where id is even
+---+------+
| id| data|
+---+------+
| 2|update|
| 3|create|
| 4|update|
| 7|append|
| 8|update|
| 9|append|
| 0|update|
| 1|create|
| 5|append|
| 6|update|
+---+------+
さらに、id が 3 の倍数の行を削除します。
print(">>> Delete rows having id which remainder divided by 3 is 0")
deltaTable.delete(condition = expr("id % 3 == 0"))
deltaTable.toDF().show()
出力
>>> Delete rows having id which remainder divided by 3 is 0
+---+------+
| id| data|
+---+------+
| 2|update|
| 4|update|
| 7|append|
| 8|update|
| 1|create|
| 5|append|
+---+------+
新しい DataFrame を作って、マージします。
print(">>> Upsert (merge) new data with id in range(0, 20)")
newData = spark.range(0, 20).withColumn("data", lit("upsert") )
deltaTable.alias("oldData") \
.merge(newData.alias("newData"), "oldData.id = newData.id") \
.whenMatchedUpdate(set = { "data" : concat(col("oldData.data"), lit("/merge")) }) \
.whenNotMatchedInsert(values = { "id" : newData.id, "data" : lit("/merge") }) \
.execute()
deltaTable.toDF().show()
出力
>>> Upsert (merge) new data with id in range(0, 20)
+---+------------+
| id| data|
+---+------------+
| 0| /merge|
| 1|create/merge|
| 2|update/merge|
| 3| /merge|
| 4|update/merge|
| 5|append/merge|
| 6| /merge|
| 7|append/merge|
| 8|update/merge|
| 9| /merge|
| 10| /merge|
| 11| /merge|
| 12| /merge|
| 13| /merge|
| 14| /merge|
| 15| /merge|
| 16| /merge|
| 17| /merge|
| 18| /merge|
| 19| /merge|
+---+------------+
全件削除してみます。
print(">>> Delete all rows")
deltaTable.delete(col("id") >= 0)
deltaTable.toDF().show()
出力
>>> Delete all rows
+---+----+
| id|data|
+---+----+
+---+----+
ここまでの更新履歴を確認します。
print(">>> Show history")
deltaTable.history().show()
出力
>>> Show history
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|userMetadata| engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
| 5|2022-10-28 02:33:08| null| null| DELETE|{predicate -> ["(...|null| null| null| 4| Serializable| false|{numRemovedFiles ...| null|Apache-Spark/3.2....|
| 4|2022-10-28 02:32:58| null| null| MERGE|{predicate -> (ol...|null| null| null| 3| Serializable| false|{numTargetRowsCop...| null|Apache-Spark/3.2....|
| 3|2022-10-28 02:32:49| null| null| DELETE|{predicate -> ["(...|null| null| null| 2| Serializable| false|{numRemovedFiles ...| null|Apache-Spark/3.2....|
| 2|2022-10-28 02:32:39| null| null| UPDATE|{predicate -> ((i...|null| null| null| 1| Serializable| false|{numRemovedFiles ...| null|Apache-Spark/3.2....|
| 1|2022-10-28 02:32:22| null| null| WRITE|{mode -> Append, ...|null| null| null| 0| Serializable| true|{numFiles -> 2, n...| null|Apache-Spark/3.2....|
| 0|2022-10-28 02:32:06| null| null| WRITE|{mode -> Overwrit...|null| null| null| null| Serializable| false|{numFiles -> 2, n...| null|Apache-Spark/3.2....|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
バージョン 4 のデータを取り出します。
print(">>> Show version 4")
spark.read.format("delta").option("versionAsOf", 4).load(deltaTablePath).show()
出力
>>> Show version 4
+---+------------+
| id| data|
+---+------------+
| 0| /merge|
| 1|create/merge|
| 2|update/merge|
| 3| /merge|
| 4|update/merge|
| 5|append/merge|
| 6| /merge|
| 7|append/merge|
| 8|update/merge|
| 9| /merge|
| 10| /merge|
| 11| /merge|
| 12| /merge|
| 13| /merge|
| 14| /merge|
| 15| /merge|
| 16| /merge|
| 17| /merge|
| 18| /merge|
| 19| /merge|
+---+------------+
最後に、この状態でオブジェクト・ストレージがどうなっているか見てみましょう。
_delta_log
ディレクトリ内の json ファイルでメタデータが管理されています。
まとめ
OCI Data Flow で Delta Lake を試してみました。アプリケーション側に特別な設定等は必要なく直ぐに Delta Lake の機能を利用できます。従来単純にオブジェクト・ストレージから読み込んだり逆に書き出したりするだけでしたが、Delta Lake により更新履歴付きの更新処理が可能になったことで、ぐんとアプリケーションの幅が広がると思います。
補足
今回は OCI Data Flow から Delta Lake を使ってみましたが、もちろん普通に 素の Spark から使うこともできます。OCI のオブジェクト・ストレージを使って Delta Lake のファイルを操作するための設定方法も Delta Lake のドキュメントの一部として提供されています。