この記事は、Distributed computing (Apache Spark, Hadoop, Kafka, ...) Advent Calendar 2021 の12/25のエントリです。
はじめに
ビッグデータにおける分散型ストレージと聞いて、皆さんは何を思い浮かべますか。例として、Apache Hadoop HDFSがあげられるでしょう。HDFSなどの分散型ストレージの誕生によって、大量のデータを保存・活用ができるようになったわけです。これらの分散型ストレージはスケーラビリティや耐故障性の面で優れています。一方、データ処理や分析のデータストアとして比較に挙げられがちなのはRDBMSですが、RDBMSと比べると特徴が一部失われていることもあります。例えば、並列分散化に加えてある程度の塊で書き込む工夫のトレードオフとして読み込み・書き込みを低レイテンシで行うことが難しくなった、書き込みスループット増大を目的としたためACIDトランザクションが実装されていない、などという点が挙げられます。
近年、ビッグデータ活用のユースケースの多様化に伴い、分散型ストレージにさらなる特徴を追加したいというニーズがあります。HDFSのような分散型ストレージ、そしてApache Sparkのような分散処理フレームワークをそのまま利用でき、その上で分散型ストレージに新たな機能を追加するOSSストレージレイヤSWがここ数年でいくつか誕生してきました。その中でも分散型ストレージにACIDトランザクションの機能を与えることでデータセットの信頼性の向上を図るDelta Lakeについてこの記事で取り上げます。Delta Lakeの概要から、ソースコードレベルでの実装の確認といったDeepな内容まで交えて、Delta LakeのACIDトランザクションについて探っていきます。
Delta Lakeとは?
Delta Lakeとは、2019年にDatabricks社よりOSSとして発表されたストレージレイヤソフトウェアです。Delta Lakeによって、HDFS, Amazon S3などの分散型ストレージにACIDトランザクションを実現することで、データセットの信頼性を保つことができます。
Delta Lakeを導入する方法はシンプルで、Sparkのアプリケーション実装時にMavenやSBTでパッケージをインストールするだけです。
(以下はMavenの例)
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.12</artifactId>
<version>1.1.0</version>
</dependency>
シンプルなデータの読み書きだけなら、Spark SQLアプリケーションの変更はごくわずかで済みます
// Write
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
// Read
val df = spark.read.format("delta").load("/tmp/delta-table")
ご覧の通り、formatが "delta"
に変更になっただけで、他はSpark SQLのRead/Writeと同じです。これだけで、これまで通りParquetのデータ形式でデータの読み書きが実施できるのに加えて、読み書きのACIDトランザクションが担保されます。
加えて、これまでの分散型ストレージでは扱いが難しかったデータのUpsert (SQL用語でMergeとも言います) もDelta Lakeに依ってシンプルに実装できるようになりました。
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // 新しく書き込み・更新するデータ
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId") // 新しいデータと既存データで、同じIDのレコードがあるか否かで条件分岐
.whenMatched // 既に同じIDのデータが存在する場合は
.updateExpr( // 上書き
Map("data" -> "updates.data"))
.whenNotMatched // 同じIDのデータが無い場合は
.insertExpr( // 追加
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
HDFSなどのストレージでは仕様を割り切ることで高いスケーラビリティを達成しました。結果として、データを更新したい場合は、以下のような操作をする必要がありました。
- 更新したいデータセットを全て読み取る
- 読み込んだデータセットに対して、データの追加・更新・削除を行う
- 新しいデータセットを書き込む
- 古いデータセットを消す
Delta Lakeによって、これらが一つの操作で完了できるようになりました。
分散型ストレージのACIDトランザクションについて
まず、ACIDトランザクションとは何かをそれぞれ確認します。
- Atomicity: トランザクションの中に含まれている操作群について、「全て結果が反映される」「一つも反映されない」のどちらかが保証される
- Consistency: トランザクションの前後で、データの整合性が担保されること。
- Isolation: 各トランザクションの処理が、他のトランザクションに影響を与えないこと。
- Durability: トランザクションが完了したデータは、システム障害などが発生しても失われないこと。
多数のマシンを使って構成する分散型のストレージでは、いずれかのマシンが壊れてしまうこともしばしば生じることから、標準でDorability (耐障害性) をもつよう実装されているものもあります。例えばHDFSでは、デフォルトで3つのレプリカが作成され、別のノードに跨って保存されることで、Durabilityを実現しています。
残りの3つについては、分散型のストレージでは担保されていないケースが多いです。HDFSを例として、一つずつ確認していきます。
Atomicityについては、先ほどのUpsertでの処理で考えていきます。(再掲)
- 更新したいデータセットを全て読み取る
- 読み込んだデータセットに対して、データの追加・更新・削除を行う
- 新しいデータセットを書き込む
- 古いデータセットを消す
ここで、3が完了した時点でアプリケーションが異常終了していまったと仮定します。この場合、古いデータセットも新しいデータセットもどちらもHDFSに残ってしまいます。すぐに気づけば何の問題も生じないかもしれませんが、放置されてしまうと他のアプリケーションから古いデータセットが利用される、気づいたらデータセットがたくさんあってどれが最新なのか分からなくなる、どいう問題が生じる場合があります。
Consistencyに関しては、データセットのスキーマの問題が考えられます。HDFSでは、書き込まれたデータの中身を精査することがないため、例えばParquetの書き込みでカラムが増えている、jsonで階層が増えている、といったことが起こりえます。スキーマの不整合が多いとデータセットが成立しなくなりデータ処理や分析に移れなくなるケースがあります。
IsolationもAtomicityと同じケースで考えて見ます。(再掲)
- 更新したいデータセットを全て読み取る
- 読み込んだデータセットに対して、データの追加・更新・削除を行う
- 新しいデータセットを書き込む
- 古いデータセットを消す
HDFSでは、Isolation Levelは最も低いRead Uncommited、即ちトランザクションの同時実行制御がされていません。この場合、例えば1から3の間に別のトランザクションによってデータセットの更新が入り、データの整合性が取れないというようなことが生じる可能性が有ります。
Delta Lakeのトランザクションについて
Delta Lakeでは、HDFSを始めとするDurabilityを持った分散型ストレージをデータストアとして利用することを前提としています。そのため、Durabilityに関しては分散型ストレージにて担保される形になります。加えて、Atomicity, Consistency, IsolationがDelta Lakeを利用することで担保されます。
「Delta Lakeとは」の章で説明した、
ご覧の通り、formatが
"delta"
に変更になっただけで、他はSpark SQLのRead/Writeと同じです。これだけで、これまで通りParquetのデータ形式でデータの読み書きが実施できるのに加えて、読み書きのACIDトランザクションが担保されます。
の話をもう少し詳しく説明します。formatを"delta"
に変更することで、Writeは以下のように実行されます
- データの実体であるParquetファイルを分散ストレージに書き込む
- 1のデータのメタデータである "Delta Log" を分散ストレージに書き込む (=コミット)
Readは以下のように実行されます。
- これまでの"Delta Log"を読み込むことで、必要となるParquetファイルを特定する
- 特定したParquetファイルを読み込む
以下はDelta Logの一例です。
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"ac72d2ec-7bdc-42cd-88cc-72da634f0873","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1640262316180}}
{"add":{"path":"part-00000-1c9604e7-6b21-4392-acf1-1fb45e31744b-c000.snappy.parquet","partitionValues":{},"size":296,"modificationTime":1640262319000,"dataChange":true}}
ただログを書き込んだだけでは?、という感じがしますが、実はこのDelta Logというものの存在によって、Atomicity, Consistency, Isolationが担保されます。ここからは一つずつ見ていきます。
まずはAtomicityで、こちらはシンプルです。Delta Logを書き込むことそのものがコミットなので、Delta Logがあれば全更新が反映されている、Delta Logがなければ更新は何も反映されていない、という状態を作り出すことができます。どのDelta Logの中にも記載のないParquetファイルはReadで読み込まれることがないので、トランサクションの一部の処理だけがParqeutファイルとして分散型ストレージに書き込まれてしまった場合でも問題ありません。
続いてConsistencyです。先程のスキーマに関する課題でいえば、Delta Logに掛かれているMetadataに矛盾がないかどうかを確かめることで実現されます。Metadataに抱えられているSchemaと合致しない場合は更新が拒否される (=スキーマバリデーション)、という形でデータセットの整合性を保つ仕組みが働きます。また、矛盾が発生しない範囲であれば、スキーマ内のカラムを増やしたり (=スキーマエボリューション)、パーティションのカラムを増やしたりといったことも可能になります。
(スキーマエボリューションの具体的な挙動は https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution )
最後にIsolationです。Delta Lakeでは、Multi-Version Concurrency Control (MVCC) と楽観的排他制御が利用されています。Delta Logには項番が振られており、その番号がデータセットのバージョンを意味します。データの読み込みではバージョン番号をSparkのほうで保持しておくことで、他のトランザクションによる更新が生じても同じバージョンのデータセット (=スナップショット) を読み続けることができます (Read時にアプリ側から特定のバージョンを指定することもできます)。書き込みに関しては、トランザクションの開始時にデータセットの最新バージョンが何番か控えておき、コミットする際に (最新バージョン + 1) に既にDelta Logがあれば、他のトランザクションとのデータの競合が発生するかしないかを確認するという方法を取ります。
複数のトランサクションの同時実行制御がどのように行われているかソースから見てみる (DeepDive)
Delta Lakeのソースコード (https://github.com/delta-io/delta) から、Delta Lakeの楽観的排他制御の仕組みを見ていきます。なお、ここではUpsertを対象に楽観的排他制御の仕組みを見ていきます。(再掲)
// Upsert
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... // 新しく書き込み・更新するデータ
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId") // 新しいデータと既存データで、同じIDのレコードがあるか否かで条件分岐
.whenMatched // 既に同じIDのデータが存在する場合は
.updateExpr( // 上書き
Map("data" -> "updates.data"))
.whenNotMatched // 同じIDのデータが無い場合は
.insertExpr( // 追加
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
Delta Lakeには、Delta Log間のコンフリクトを検出するためのクラスConflictChecker (https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala#L87) があります。そして、この中にあるcheckConflicts()
メソッドがDelta Log間のコンフリクトを探すメインのメソッドです。
private[delta] class ConflictChecker(
spark: SparkSession,
initialCurrentTransactionInfo: CurrentTransactionInfo,
winningCommitVersion: Long,
isolationLevel: IsolationLevel) extends DeltaLogging {
def checkConflicts(): CurrentTransactionInfo = {
checkProtocolCompatibility()
checkNoMetadataUpdates() # 今回の対象
checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn() # 今回の対象
checkForDeletedFilesAgainstCurrentTxnReadFiles() # 今回の対象
checkForDeletedFilesAgainstCurrentTxnDeletedFiles()
checkForUpdatedApplicationTransactionIdsThatCurrentTxnDependsOn()
reportMetrics()
currentTransactionInfo
}
}
このメソッドが、自分のトランザクションより先にコミットされたDelta Logの数だけ実施されます。一回でもConflictがあると判断された場合は、例外スローとなり処理が受け付けられません。
checkConflicts()
メソッドには6つのcheckメソッドがありますが、InsertとUpsertで重要となる3つを今回取り上げます。そのうち、最初の1つはConsistencyに関わるもので、残り2つがisolationに関わるものになります。
まずはcheckNoMetadataUpdates()
メソッドを見ていきます。
protected def checkNoMetadataUpdates(): Unit = {
// Fail if the metadata is different than what the txn read.
if (winningCommitSummary.metadataUpdates.nonEmpty) {
throw DeltaErrors.metadataChangedException(winningCommitSummary.commitInfo)
}
}
ここでは、先行したトランザクション (winningCommitSummary
という変数にDelta Logの情報が入っている) でMedadataの更新がある場合、後続のトランザクションは例外スローでコミットされないという結果になります。
{"metaData":{"id":"ac72d2ec-7bdc-42cd-88cc-72da634f0873","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1640262316180}}
Metadataには、パーティションやスキーマなどの情報が入っています。こららの変更が先行したトランザクションによって更新されていた場合、即ちスキーマエボリューションやパーティションカラムの追加が実施されていた場合、後続のトランザクションを受け入れてしまうとデータの整合性が保てなくなる可能性があるので、コミットを受け付けないという処理になります。
続いて、2つ目のcheckForAddedFilesThatShouldHaveBeenReadByCurrentTxn()
メソッドを見ていきます。このメソッドの役割は、データの更新をしようとしたパーティション内において、先行のトランザクションがデータを追加・更新している場合、「Serializableな書き込みであれば後続のトランザクションの更新処理に含まれなければならないデータが、更新されずに残っている」という状況が発生することを防ぐことにあります。Insertの場合はただデータを追加するだけなので問題有りませんが、Upsertの場合は非常に重要です。以下、ソースコードの一部抜粋です。
protected def checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(): Unit = {
...
// 自トランザクションの更新対象パーティション内に、先行トランザクションのデータがaddされていないか確認
val predicatesMatchingAddedFiles = ExpressionSet(
currentTransactionInfo.readPredicates).iterator.flatMap { p =>
val conflictingFile = DeltaLog.filterFileList(
currentTransactionInfo.metadata.partitionSchema,
addedFilesToCheckForConflicts.toDF(), p :: Nil).as[AddFile].take(1)
conflictingFile.headOption.map(f => getPrettyPartitionMessage(f.partitionValues))
}.take(1).toArray
// 一件でもあればコミットさせない
if (predicatesMatchingAddedFiles.nonEmpty) {
...
throw DeltaErrors.concurrentAppendException(
winningCommitSummary.commitInfo,
predicatesMatchingAddedFiles.head,
retryMsg)
}
}
}
もう少し重要なところを分解して考えると、
-
ExpressionSet(currentTransactionInfo.readPredicates)
で、自トランザクションが対象とするパーティション (readPredicates
)を一覧化 -
.iterator.flatMap
で、各パーティションについて以下を実施。- 先行トランザクションで追加されたファイル群
addedFilesToCheckForConflicts.toDF()
がそのパーティションに入ってしまっていないか確認する。
- 先行トランザクションで追加されたファイル群
- 自トランザクションの更新範囲で追加されたファイルがある (
predicatesMatchingAddedFiles
がEmptyではない) 場合は、例外スローでコミットさせない。
という処理が行われています。
最後に、checkForDeletedFilesAgainstCurrentTxnReadFiles()
メソッドを見てきます。このメソッドは、更新しようと思ったデータが先行のトランザクションで既に消されていた場合に、後続の自トランザクションの処理が受け付けられてしまうと、消されたデータが復活した上に更新までされるということが生じてしまう (=ほぼデータセットが壊れてしまう) ので、それを防ぐため役割をします。以下、ソースコードです。
protected def checkForDeletedFilesAgainstCurrentTxnReadFiles(): Unit = {
recordTime("checked-deletes") {
// Fail if files have been deleted that the txn read.
// 自トランザクションが一度でも見たParqeutファイルのリスト
val readFilePaths = currentTransactionInfo.readFiles.map(
f => f.path -> f.partitionValues).toMap
// 先行トランザクションが削除したParquetファイルのリストに、上記のファイルが含まれているか確かめる
val deleteReadOverlap = winningCommitSummary.removedFiles
.find(r => readFilePaths.contains(r.path))
// 含まれていたらOUT。例外スローでコミット受け付けない。
if (deleteReadOverlap.nonEmpty) {
val filePath = deleteReadOverlap.get.path
val partition = getPrettyPartitionMessage(readFilePaths(filePath))
throw DeltaErrors.concurrentDeleteReadException(
winningCommitSummary.commitInfo, s"$filePath in $partition")
}
// 先行トランザクションがデータセットのフルスキャンをしていた場合もOUT。
if (winningCommitSummary.removedFiles.nonEmpty && currentTransactionInfo.readWholeTable) {
val filePath = winningCommitSummary.removedFiles.head.path
throw DeltaErrors.concurrentDeleteReadException(
winningCommitSummary.commitInfo, s"$filePath")
}
}
}
後半2つの排他制御のまとめると、以下のような仕組みによってDelta LakeのSeriarizableなIsolationが実現されています。
- 全てのトランザクションに、追加したファイル (
addedfile
) と不要になったファイル (removedFile
) が記載されている - 自トランザクションが参照したパーティション (
readPredicates
) とParquetファイル (readFiles
)を、本当に処理の対象になったかに関わらず、トランザクションのコミットが完了するまで変数として持ち続けている。 - 上記2つを照らし合わせることで、Seriarizableとならないトランザクションはコミットさせないようにしている。
考察: 書き込みの競合判定範囲とSerializableの定義 (Deep Dive)
checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn()
メソッドでは、先行トランザクションで追加されたファイルが、自トランザクションが対象とした "パーティション" に含まれていたら競合と判断し例外スローを返すという処理が行われています。自トランザクションの処理が結果として先行トランザクションの処理に影響を与えなくても、同じパーティションにファイルが追加されていればコミットは拒否されます。簡単な例で確認します。
- テーブル情報
- スキーマはid (int)と value (int) の2つのカラムからなる
- パーティションキーはidで、10のレンジ
- keyが1から10
- keyが11から20
- ...
- トランザクション
- T1: Insert
- (key = 1, value = 10) のデータを入れる
- T2: Upsert
- key = 2 のデータはvalueに100を足す
- T1: Insert
T1とT2が同じバージョンのデータセットに対して実行され、T1が先に完了したとします。この場合、T1はkey = 2のデータを追加していないので、T2はT1に影響を与えることはないですが、id = 1と id = 2 のデータは同じパーティションに入るため、T2はコミットが拒否されます。パーティションの中にはParquetファイル群が含まれ、それらはソートされた状態でならんでいるわけではないため、パーティション内の全てのデータを調べるとそのコストが大きくなるため、このような実装になっていると考えらえれます。
ちなみに、Upsertをこれまでこの記事で紹介した通りに記述すると、Upsertの対象パーティションは全て、即ちテーブルをフルスキャンしてしまいます。先行のトランザクションが何かデータを追加していた場合、その時点でコミットされないということが確定となります。対処方法としては、以下のように対象としたいパーティションを明示的に指定するというやり方があります。
// Upsert
import io.delta.tables._
import org.apache.spark.sql.functions._
val updatesDF = ... /
DeltaTable.forPath(spark, "/data/events/")
.as("events")
.merge(
updatesDF.as("updates"),
"events.eventId = updates.eventId AND events.eventId in (1, 2)") // ここで具体的に範囲指定を行う。
.whenMatched
.updateExpr(
Map("data" -> "updates.data"))
.whenNotMatched
.insertExpr(
Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"))
.execute()
なお、Databricks社が提供するDatabricks Delta では、このInsertからUpsertの順番でもUpsertのコミットを受け付けられるようにIsolation Levelの変更が可能となっている模様です (https://docs.databricks.com/delta/optimizations/isolation-level.html)。Delta Lakeと同じレベルが "Serializable"、Databricks DeltaでのInsert->Upsertを許可するレベルが "WriteSerializable" と定義されています。(ちなみに、OSSのDelta LakeにもWriteSerializableの定義は存在してはいますが、現時点の実装では利用されず、利用するようなconfigurationも出来ないです。)
Databricks Deltaのほうでは動作確認は出来ていないのですが、具体的な処理で考えてみます。
- テーブル情報
- スキーマはid (int)と value (int) の2つのカラムからなる
- パーティションキーはidで、10のレンジ
- keyが1から10
- (id = 5, value = 1) というデータが入っている
- keyが11から20
- ...
- keyが1から10
- トランザクション
- T1: Insert
- (id = 5, value = 50) のデータを入れる
- T2: Upsert
- key = 5 のデータはvalueに100を足す
- T1: Insert
- 結果
- 同じバージョンに対して同時にスタートした場合:
- (id = 5, value = 101), (id = 5, value = 50) のデータが含まれる
- T1からT2の順番で逐次実行:
- (id = 5, value = 101), (id = 5, value = 150) のデータが含まれる
- T2からT1の順番で逐次実行:
- (id = 5, value = 101), (id = 5, value = 50) のデータが含まれる
- 同じバージョンに対して同時にスタートした場合:
並列実行とT2->T1の逐次実行の結果が同じになります。Databricks Delta では、これをデータの矛盾が生じていないという判断で、InsertとUpsertの同時実行で後続のUpsertのコミットを許可するIsolation Levelが定義されているものと考えられます。
なお、自分のデータベースの知識不足もあり、Seriarizableの定義を調べてみたところ
- 複数の並行に動作するトランザクションそれぞれの結果が、「いかなる場合でも」、それらのトランザクションを時間的重なりなく逐次実行した場合と同じ結果となる。(ANSI/ISO SQL標準, https://ja.wikipedia.org/wiki/%E3%83%88%E3%83%A9%E3%83%B3%E3%82%B6%E3%82%AF%E3%82%B7%E3%83%A7%E3%83%B3%E5%88%86%E9%9B%A2%E3%83%AC%E3%83%99%E3%83%AB)
- トランザクション T1, …, Tn を並行処理したときの実行結果が,それらを「何らかの順序で」逐次処理したときの実行結果と一致すること (https://www.db.is.i.nagoya-u.ac.jp/~ishikawa/lectures/db19/12.pdf)
と、定義が一つに定まっていない様子が見られました。Delta LakeおよびDatabricks Deltaの場合だと、前者がSerializable、後者がWrite Serializableと言えそうです。
まとめ
長くなりましたが、Delta LakeのACIDトランザクションについてDeepDiveも含めて記載しました。データベース、特にIsolation周りの知識不足で、内容に誤りがありましたら、マサカリを投げる代わりにコメント欄で指摘していただけると幸いです。
[全体を通じての引用]
- Delta Lake 公式ドキュメント (https://docs.delta.io/latest/index.html)
- Delta Lake GitHub (https://github.com/delta-io/delta)