0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Lakeのコンカレンシーコントロール

Last updated at Posted at 2022-03-07

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Delta Lakeは読み書きにおけるACIDトランザクション保証を提供します。これは以下のことを意味します。

  • 複数クラスターにまたがる複数のwrtierは、同時にテーブルのパーティションを変更でき、テーブルに対する一貫性のあるスナップショットビューを参照でき、これらの書き込みの連続した順序が存在します。
  • Databricksジョブがスタートし、ジョブの過程でテーブルが変更された際でも、readerは一貫性のあるテーブルのスナップショットビューを見続けることになります。

楽観的コンカレンシーコントロール

Delta Lakeは、書き込み間でトランザクション保証を提供する楽観的コンカレンシーコントロールを使用します。このメカニズムのもと、書き込みは3つのステージでオペレーションを行います。

  1. 読み込み: どのファイルを修正すべきかを特定するために、テーブルの最新バージョンを(必要であれば)読み込みます。
  2. 書き込み: 新規データファイルを書き込むことで全ての変更をステージングします。
  3. 検証及びコミット: 変更をコミットする前に、スナップショットが読み込まれた以降に、提案された変更が同時にコミットされた他の変更と競合するかどうかをチェックします。競合がない場合には、ステージングされた全ての変更は新規バージョンのスナップショットとしてコミットされ、書き込みオペレーションが成功します。しかし、競合がある場合、Parquetテーブルに対する書き込みオペレーションが行われるので、テーブルを破壊するのではなく同時変更の例外で書き込みオペレーションは失敗します。

テーブルのアイソレーションレベルは、どのトランザクションを同時オペレーションによってなされる変更から分離すべきなのかの度合いを定義します。DatabricksのDelta Lakeによってサポートされるアイソレーションレベルに関してはアイソレーションレベルを参照ください。

書き込みの競合

以下のテーブルでは、書き込みオペレーションのどの組み合わせが、それぞれのアイソレーションレベルで競合しうるのかを説明しています。

INSERT UPDATE、DELETE、MERGE INTO OPTIMIZE
INSERT 競合しません。
UPDATE、DELETE、MERGE INTO Serializableでは競合し得ます。WriteSerializableでは競合しません。 SerializableとWriteSerializableで競合し得ます。
OPTIMIZE 競合しません。 SerializableとWriteSerializableで競合し得ます。 SerializableとWriteSerializableで競合し得ます。

パーティショニング、分割コマンド条件を用いることによる競合の回避

「競合し得る」とマークされている全てのケースにおいて、2つのオペレーションが競合するかどうかは同じファイルセットに対してオペレーションしているかどうかに依存します。オペレーションの条件で使用される同じカラムを用いてテーブルをパーティショニングすることで、ファイルセットを分割することができます。例えば、2つのコマンドUPDATE table WHERE date > '2010-01-01' ...DELETE table WHERE date < '2010-01-01'は、日付でテーブルがパーティショニングされていない場合は、両方が同じファイルセットを更新しようとするので、競合することになります。dateでテーブルをパーティショニングすることで、競合を回避することができます。すなわち、コマンドで一般的に用いられる条件に沿ってテーブルをパーティショニングすることで、競合を劇的に減らすことができます。しかし、高いカーディナリティを持つカラムでテーブルをパーティションすると、大量にサブディレクトリが生成されることで他のパフォーマンスの問題を引き起こす場合があります。

競合の例外

トランザクションの競合が起きると、以下の例外のいずれかを見ることになります。

ConcurrentAppendException

この例外は、オペレーションで読み込む同じパーティション(あるいはパーティショニングされていないテーブルのどこか)において、オペレーションが同時にファイルを追加した際に発生します。ファイルの追加はINSERTDELETEUPDATEMERGEによって引き起こされます。

WriteSerializableのデフォルトのアイソレーションレベルでは、ブラインドINSERTオペレーション(すなわち、いかなるデータを読み込むことなしに盲目的にデータを追加するオペレーション)は、同じパーティション(あるいはパーティショニングされていないテーブルのどこか)を操作していたとしても他のいかなるオペレーションと競合しません。アイソレーションレベルがSerializableに設定されると、ブラインドな追加処理は競合するかもしれません。

この例外は多くの場合、DELETEUPDATEMERGEオペレーションの同時実行でスローされます。同時オペレーションは物理的には異なるパーティションディレクトリを更新するかもしれませんが、それらの一つが別のオペレーションが同時更新している同じパーティションを読み込むかもしれず、この場合には競合が起きます。オペレーション条件で明示的に分離することで、これを回避することができます。以下の例を考えてみましょう。

Scala
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

異なる日付や国に対して上記のコードを同時に実行するものとします。それぞれのジョブはターゲットのDeltaテーブルで独立したパーティションを処理するので、競合は起こり得ません。しかし、条件が十分に明示的ではなく、テーブル全体をスキャンすると、他の任意のパーティションを更新する同時オペレーションと競合する場合があります。代わりに、以下のようにマージ条件に特定の日付、国を追加するように文を書き直すことができます。

Scala
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
    source.as("s"),
    "s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
  .whenMatched().updateAll()
  .whenNotMatched().insertAll()
  .execute()

これによって、異なる日付と国に対してどう実行するので、安全なオペレーションになります。

ConcurrentDeleteReadException

この例外は、オペレーションが読み込んでいるファイルが同時オペレーションで削除された際に起こります。一般的な原因として、ファイルを再書き込みするDELETEUPDATEMERGEが挙げられます。

ConcurrentDeleteDeleteException

この例外は、オペレーションが削除しているファイルが同時オペレーションで削除された際に起こります。これは、同じファイルを再書き込みする2つの同時コンパクションオペレーションによって引き起こされます。

MetadataChangedException

この例外は、Deltaテーブルのメタデータを更新する同時トランザクションで引き起こされます。一般的なケースは、テーブルスキーマを更新するDeltaテーブルに書き込みを行うALTER TABLEとなります。

ConcurrentTransactionException

同じ場所のチェックポイントを用いるストリーミングクエリーが同時に複数回起動し、同時にDeltaテーブルに書き込みを行おうとした際に発生します。同じチェックポイントを用いる2つのストリーミングクエリーを同時に使うべきではありません。

ProtocolChangedException

以下のケースで例外が発生します。

  • Deltaテーブルが新規バージョンにアップグレードされたとき。最新のオペレーションを成功させるためには、Delta Lakeのバージョンをアップグレードする必要があるかもしれません。
  • 複数のwriterが同時にテーブルを作成する際、あるいは、テーブルを置換する際。
  • 複数のwriterが同時に空のパスに対して書き込みを行う際。

詳細はDelta Lakeのテーブルバージョン管理をご覧ください。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?