Isolation levels and write conflicts on Databricks | Databricks on AWS [2022/9/22時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
テーブルのアイソレーションレベルは、トランザクションを同時実行オペレーションによってなされる変更からどの程度分離するのかを定義します。Databricksにおける書き込みの競合はアイソレーションレベルに依存します。
Delta Lakeは読み込みと書き込みの間のACIDトランザクションを保証します。これは以下のことを意味します:
- 複数のクラスターに存在する複数のライター(writer)は、同時にテーブルのパーティションを変更することができます。ライターは一貫性のあるテーブルのスナップショットビューを参照し、書き込みは連続した順序で発生します。
- リーダー(reader)は、ジョブの過程でテーブルが変更されたとしても、Databricksのジョブが開始した時点の一貫性のあるテーブルのスナップショットビューを参照し続けます。
What are ACID guarantees on Databricks?をご覧ください。
注意
Databricksではデフォルトで全てのテーブルにDelta Lakeを使用します。本書ではDatabricksにおけるDelta Lakeの挙動を説明します。
Databricksにおける書き込みの競合
以下のテーブルでは、それぞれのアイソレーションレベルでどの書き込みオペレーションの組み合わせで競合が起きるのかを説明しています。
INSERT | UPDATE, DELETE, MERGE INTO | OPTIMIZE | |
---|---|---|---|
INSERT | 競合しません | ||
UPDATE, DELETE, MERGE INTO | Serializableでは競合することがあります。WriteSerializableで最初に読み込まずにテーブルに書き込む場合には競合は起きません。 | SerializableとWriteSerializableで競合することがあります。 | |
OPTIMIZE | 競合しません | SerializableとWriteSerializableで競合することがあります。 | SerializableとWriteSerializableで競合することがあります。 |
Write serializableアイソレーションレベル 対 serializableアイソレーションレベル
テーブルのアイソレーションレベルは、トランザクションを同時実行オペレーションによってなされる変更からどの程度分離するのかを定義します。DatabricksのDelta Lakeでは、2つのアイソレーションレベルSerializableとWriteSerializableをサポートしています。
-
Serializable: 最も強いアイソレーションレベルです。コミットされた書き込みオペレーションと全ての読み込みがSerializableであることを保証します。複数のオペレーションは一度に一回実行する際に順序が存在されており、テーブルに同じ結果を生じさせる場合にのみ許可されます。書き込みオペレーションにおいては、実行順序と同じ内容がテーブルの履歴に現れます。
-
WriteSerializable(デフォルト): Serializableより弱いアイソレーションレベルです。書き込みオペレーションのみ(すなわち読み込みは除きます)がserializableであることが保証されます。しかし、これはスナップショットアイソレーションよりは強いものです。データの一貫性と多くの一般的なオペレーションにおける可用性のバランスがすぐれているので、WriteSerializableがデフォルトのアイソレーションレベルとなっています。
このモードでは、Deltaテーブルのコンテンツはテーブルの履歴から予想されるオペレーションと順序と異なる場合があります。これは、このモードでは特定の同時実行の書き込みのペア(例えばオペレーションXとY)において、履歴ではXの後にYがコミットされている(すなわち、二者間がシリアライズ可能)ことを示していたとしても、Xの前にYが実行される結果を許容するためです。この様な並び替えを許容しない場合には、これらのトランザクションが失敗するようにテーブルのアイソレーションレベルをSerializableに設定します。
読み込みのオペレーションは常にスナップショットのアイソレーションです。書き込みのアイソレーションレベルは、履歴によると「存在しない」テーブルのスナップショットをリーダーが参照する場合があるかどうかを規定します。
Serializableレベルにおいては、リーダーは常に履歴に準拠したテーブルのみを参照します。WriteSerializableレベルでは、リーダーはDeltaログに存在しないテーブルを参照する場合があります。
例えば、長時間実行される削除処理のtxn1と、txn1で削除されたデータをインサートするtxn2を考えてみます。txn2とtxn1が完了し、その順序で履歴に記録されます。履歴によると、txn2によってインサートされたデータはテーブルに存在してはなりません。Serializableレベルでは、リーダーはtxn2によってインサートされたデータは参照することはありません。しかし、WriteSerializableレベルの場合、ある時点でリーダーはtxn2によってインサートされたデータを参照する場合があります。
それぞれのアイソレーションレベルでどのタイプのオペレーションが互いに競合を起こし得るのか、起こり得るエラーに関しては、パーティショニングとコマンド条件の分離による競合の回避をご覧ください。
アイソレーションレベルの設定
ALTER TABLE
コマンドでアイソレーションレベルを設定します。
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = <level-name>)
<level-name>
はSerializable
かWriteSerializable
となります。
例えば、アイソレーションレベルをデフォルトのWriteSerializable
からWriteSerializable
に変更するには以下を実行します。
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')
パーティショニングとコマンド条件の分離による競合の回避
「競合する場合があります」と記述された全てのケースにおいて、2つのオペレーションが競合するかどうかは、それらが同じファイルセットを操作するかどうかに依存します。オペレーションの条件で同じカラムに用いるのと同じ様に、テーブルをパーティショニングすることで2つのファイルセットを分離することができます。例えば、2つのコマンドUPDATE table WHERE date > '2010-01-01' ...
とDELETE table WHERE date < '2010-01-01'
は、テーブルが日付でパーティショニングされていない場合には、両方が同じファイルセットを変更しようとするため競合します。date
によるテーブルのパーティショニングによって競合を回避することができます。このため、コマンドで頻繁んい用いられる条件に基づいてテーブルをパーティショニングすることで、劇的に競合を削減することができます。しかし、高いカーディナリティを持つカラムによるテーブルのパーティショニングは、大量のサブディレクトリを作成することで別のパフォーマンスの課題を引き起こすことがあります。
競合による例外
トランザクションの競合が起きた際、以下の例外のいずれかに遭遇することになります。
ConcurrentAppendException
この例外は、同時実行オペレーションが読み込みを行なっている同じパーティションにファイルを追加する際に発生します。ファイルの追加はINSERT
、DELETE
、UPDATE
、MERGE
オペレーションで起こり得ます。
デフォルトのアイソレーションレベルであるWriteSerializable
では、盲目的なINSERT
オペレーション(すなわち、いかなるデータを読み取ることなしに盲目的にデータを追加するオペレーション)は、同じパーティション(あるいは、パーティショニングされていないテーブルのどこか)を触っていたとしてもいかなるオペレーションとも競合は起きません。アイソレーションレベルがSerializable
に設定されると、盲目的な追加でも競合する場合があります。
この例外は、多くの場合DELETE
、UPDATE
、MERGE
の同時実行によって引き起こされます。同時実行オペレーションは物理的には異なるパーティションディレクトリをアップデートし、他の同時実行処理がアップデートしている同じパーティションを読み込む場合があり、競合を引き起こします。オペレーションの条件に明示的に分離を行うことでこれを回避することができます。以下のサンプルを考えてみます。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
このペレーションは異なる日付、国に対して実行されるので安全となります。
ConcurrentDeleteReadException
この例外は、同時実行オペレーションがあなたのオペレーションが読み取っているファイルを削除した際に発生します。ファイルを再度書き込むDELETE
、UPDATE
、MERGE
オペレーションで多くの場合発生します。
ConcurrentDeleteDeleteException
この例外は、あなたのオペレーションが削除しているファイルを別の同時実行オペレーションが削除した際に発生します。同じファイルに再度書き込みを行う同時実行のコンパクションオペレーションで発生することがあります。
MetadataChangedException
この例外は、Deltaテーブルのメタデータを同時実行のトランザクションがアップデートする際に発生します。よくある原因としては、ALTER TABLE
オペレーションやテーブルのスキーマを更新するDeltaテーブルに書き込みを行うというものです。
ConcurrentTransactionException
同じチェックポイントの場所を用いたストリーミングクエリーが複数回同時に起動し、同時にDeltaテーブルに書き込みを試行した際に起こります。2つのストリーミングクエリーが同じチェックポイントの場所を使用し、同時に実行すべきではありません。
ProtocolChangedException
この例外は以下の原因で発生します。
- お使いのDeltaテーブルが新しいプロトコルバージョンにアップグレードされた際。将来のオペレーションを成功させるためには、Databricksランタイムをアップグレードする必要があるかもしれません。
- 複数のライターが同時にテーブルを作成、テーブルの置換を行おうとした際。
- 複数のライターが同時に空のパスに書き込みをした際。
詳細はDelta Lakeのテーブルプロトコルのバージョン管理をご覧ください。