LoginSignup
0

More than 1 year has passed since last update.

Deltaスタンドアローンの偏在性:Java、Scala、Hive、Presto、Trino、Power BIなどどこでも使えます!

The Ubiquity of the Delta Standalone Project for Delta Lake - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Deltaスタンドアローン 0.3.0はお使いのデータエンジニアリングフレームワークでACIDトランザクションを保証します。

Deltaテーブルへの書き込みをサポートするDelta Connectors 0.3.0のリリースを大変嬉しく思っています。このリリースにおけるキーフィーチャーは以下の通りです。

Deltaスタンドアローン

  • 書き込み機能 : このリリースでは、Apache Spark™なしにDeltaテーブルの作成、書き込みをサポートする新規APIが導入されました。外部の処理エンジンはParquetデータファイルに書き込みを行うことができ、Deltaテーブルに対してファイルを原子的にコミットするためにAPIを使用することができます。この実装はDeltaトランザクションログプロトコルに従い、複数のライターの書き込みを管理するために楽観的同時性制御を用い、自動でチェックポイントファイルを作成し、プロトコルに従ってログとチェックポイントのクリーンアップを管理します。mainのJavaクラスはOptimisticTransactionであり、DeltaLog.startTransaction()を用いてアクセスします。
    • OptimisticTransaction.markFilesAsRead(readPredicates)は、トランザクションの間で全てのメタデータ(DeltaLogではありません)を読む際に使用する必要があります。これは、同時更新を検知し、以前に解決されたトランザクションのコミットと、今回のトランザクションの間に論理的競合があるかどうかを判断するのに用いられます。
    • OptimisticTransaction.commit(actions, operation, engineInfo)はテーブルに対する変更のコミットに使用されます。競合するトランザクションが先にコミットされた場合(上述)は例外がスローされ、そうでない場合にはコミットされたテーブルのバージョンが返却されます。
    • 冪等性のある書き込みは、同一アプリケーションのコミットによって増加するバージョンをチェックするOptimisticTransaction.txnVersion(appId)を用いて実装されます。
    • それぞれのコミットにおいては、トランザクションによって実行されるオペレーションを指定する必要があります。
    • Microsoft AzureとAmazon S3に対する同時書き込みにおけるトランザクションが保証されます。このリリースでは、必要な原子性、頑健性の保証を持たないAzure、S3ストレージシステムへの同時書き込みをサポートするカスタム拡張が含まれています。単一のクラスターからS3への同時書き込みに対してのみ、このトランザクション保証が提供されることに注意してください。
  • スナップショットのファイルを読み込むためのメモリ最適化イテレータ実装 : DeltaScanは、スナップショットにおけるAddFilesを読み込むために、パーティションプルーニング(刈り込み)のサポートを持つイテレータ実装を導入します。Snapshot.scan()やSnapshot.scan(predicate)でアクセスすることができ、後者ではpredicate(述語)、ファイルメタデータのパーティションカラムに基づいてファイルをフィルタリングすることができます。このAPIによって、スナップショットからファイルを読み込む際、そして、DeltaLog(内部の使用量によって)のインスタンスを作成する際のメモリーフットプリントを劇的に削減することができます。
  • メタデータ読み込みにおけるパーティションフィルタリング、書き込み時の競合検知 : このリリースでは、メタデータのクエリーにおけるパーティションプルーニングのためのシンプルな表現フレームワークが導入されました。スナップショットのファイルを読み込む際、Snapshot.scan(predicate)にpredicateを引き渡すことで、返却されるパーティションカラムのAddFilesをフィルタリングすることができます。トランザクションの間にテーブルを更新する際、論理的競合を検知し、可能であればトランザクション競合を回避するために、OptimisticTransaction.markFilesAsRead(readPredicate)にreadPredicateを引き渡すことで、どのパーティションを読み取るのかを指定します。
  • その他のアップデート :
    • DeltaLog.getChanges()は、インクリメンタルなメタデータ変更APIを提供します。VersionLogはバージョン番号と当該バージョンのアクションをラップします。
    • ParquetSchemaConverterはStructTypeのスキーマをParquetスキーマに変換します。
    • RowRecordに対する#197のフィックスで、パーティションカラムの値を読めるようになりました。
    • その他のバグフィックス。

Delta Connectors

  • Hive ConnectorでのHive 3サポート
  • Microsoft PowerBIコネクターにおけるDeltaテーブル読み込みのネイティブサポート : 稼働中のSparkクラスターがなくてもサポートされているストレージシステムから直接PowerBIでDeltaテーブルを読み込みます。PowerBIサービスにおけるオンライン/スケジュール更新、Delta Lakeのタイムトラベルのサポート、Deltaテーブルのパーティションスキーマを用いたパーティションの消去などの機能が含まれています。詳細はREADME.mdを参照ください。

Delta Standaloneとは?

以前はDelta Standalone Reader (DSR)であった、Delta connectorsにおけるDeltaスタンドアローンプロジェクトは、Delta Lakeテーブルを読み書きするために用いられるJVMライブラリです。Delta Lake Coreとは異なり、このプロジェクトはテーブルの読み書きにSparkを使用せず、わずかな依存関係のみを有しています。Sparkクラスターを使用できないあらゆるアプリケーションで使用することができます(How to Natively Query Your Delta Lake with Scala, Java, and Pythonもご覧ください)。

このプロジェクトによって、開発者はマニフェストファイルを使用することなしに、Deltaプロトコルに従って、外部処理エンジンに対するDeltaコネクターを構築することができます。リーダーコンポーネントを用いることで、開発者は指定されたバージョンのDeltaテーブルに関連付けられた一連のParquetファイルを読み取ることができます。Deltaスタンドアローン 0.3.0の一部として、リーダーにはDeltaScan.getFilesに対するメモリ最適化遅延イテレータ実装(PR #194)が含まれています。以下のサンプルコードでは、パーティションプルーニングと最適化内部イテレータ実装をサポートするSnapshot::scan(filter)::getFilesを含むDeltaスタンドアローン (0.3.0時点)を用いて、分散処理でParquetファイルを読み込みます。

Python
import io.delta.standalone.Snapshot;

DeltaLog log = DeltaLog.forTable(new Configuration(), "$TABLE_PATH$");
Snapshot latestSnapshot = log.update();
StructType schema = latestSnapshot.getMetadata().getSchema();
DeltaScan scan = latestSnapshot.scan(
    new And(
        new And(
            new EqualTo(schema.column("year"), Literal.of(2021)),
            new EqualTo(schema.column("month"), Literal.of(11))),
        new EqualTo(schema.column("customer"), Literal.of("XYZ"))
    )
);

CloseableIterator iter = scan.getFiles();

try {
    while (iter.hasNext()) {
        AddFile addFile = iter.next();

        // Zappy engine to handle reading data in `addFile.getPath()` and apply any `scan.getResidualPredicate()`
    }
} finally {
    iter.close();
}

また、Deltaスタンドアローン 0.3.0では、新たにライターコンポーネントが導入され、開発者は自身でParquetファイルを生成し、冪等性のある書き込みのサポートによって、原子的にDeltaテーブルへファイルを追加することができます(詳細はDelta Standalone Writer design documentを参照下さい)。以下のコードスニペットでは、新規ファイルを追加するトランザクションログのコミット、ストレージへのParquetファイルの書き込み後に古くて不正なファイルを削除する方法を説明しています。

Scala
import io.delta.standalone.Operation;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
import io.delta.standalone.types.StructType;

List removeOldFiles = existingFiles.stream()
    .map(path -> addFileMap.get(path).remove())
    .collect(Collectors.toList());

List addNewFiles = newDataFiles.getNewFiles()
    .map(file ->
        new AddFile(
            file.getPath(),
            file.getPartitionValues(),
            file.getSize(),
            System.currentTimeMillis(),
            true, // isDataChange
            null, // stats
            null  // tags
        );
    ).collect(Collectors.toList());

List totalCommitFiles = new ArrayList<>();
totalCommitFiles.addAll(removeOldFiles);
totalCommitFiles.addAll(addNewFiles);

// Zippy is in reference to a generic engine

try {
    txn.commit(totalCommitFiles, new Operation(Operation.Name.UPDATE), "Zippy/1.0.0");
} catch (DeltaConcurrentModificationException e) {
    // handle exception here
}

Hive 3におけるDeltaスタンドアローンの活用

Deltaスタンドアローン 0.3.0では、Hive 2とHive 3をサポートしており、HiveでネイティブにDeltaテーブルを読み込むことができます。お使いのDeltaテーブルにアクセスするHive外部テーブルの作成例を以下に示します。

SQL
CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION '/delta/table/path'

Hiveのセットアップ方法の詳細については、Delta Connectors > Hive Connectorを参照ください。このコネクターはApache Hiveのみをサポートしていることに注意してください。Apache SparkやPrestoをサポートしていません。

PrestoDBからのDelta Lakeの読み込み

PrestoCon 2021Delta Lake Connector for Prestoセッションでデモした通り、最近マージされたPresto/Delta connectorは、マニフェストファイルなしにDeltaトランザクションログをネイティブで読み込むためにDeltaスタンドアローンプロジェクトを活用しています。Deltaスタンドアローン 0.3.0に含まれるメモリ最適化遅延イテレータによって、PrestoDBは効率的にDeltaトランザクションログのメタデータに対してイテレーションを行うことができ、大規模Deltaテーブルを読み込む際のOOMを回避することができます。

Presto/Delta connectorを用いることで、PrestoでネイティブにDeltaテーブルをクエリーすることに加え、タイムトラベルに対するクエリーを実行し、Deltaテーブルの以前のバージョンタイムスタンプを指定するために@文法を使用することができます。以下のサンプルコードはバージョン番号を指定してNYCTaxi 2019データセットの以前のバージョンのデータをクエリーします。

SQL
# Version 1 of s3:///nyctaxi_2019_part table 
WITH nyctaxi_2019_part AS (
  SELECT * FROM deltas3."$path$"."s3://…/nyctaxi_2019_part@v1)
SELECT COUNT(1) FROM nyctaxi_2019_part;

# output
59354546


# Version 5 of s3://…/nyctaxi_2019_part table
WITH nyctaxi_2019_part AS (
  SELECT * FROM deltas3."$path$"."s3:///nyctaxi_2019_part@v5)
SELECT COUNT(1) FROM nyctaxi_2019_part;

# output
78959576

このコネクターによって、お使いのメタストアのテーブルの指定、deltas3."$path$"."s3://…シンタックスによるDeltaテーブルに対する直接のクエリーが可能となります。

PrestoDB/Deltaコネクターの詳細については以下を参照ください。

現在、Delta Lakeとのネイティブな接続性を提供するために、Trino(こちらがTrino 359 Delta Lakeリーダーを含む現状のブランチです)とAthenaと共に取り組んでいることにご注意ください。

Power BIからのDelta Lakeのネイティブ読み込み

我々は、Delta Lakeに対するPower BIの接続性を改善し続けている、Gerhard Brueckl(github: gbrueckl)に近道を提供したいとも考えました。Delta Connectors 0.3.0の一部のPower BIコネクターでは、PowerBIサービスにおけるオンライン/スケジュール更新、Delta Lakeのタイムトラベル、Deltaテーブルのパーティションスキーマを用いたパーティション削除がサポートされています。


ソース: Reading Delta Lake Tables natively in PowerBI

詳細に関しては、Reading Delta Lake Tables natively in PowerBIあるいはコードベースを確認ください。

ディスカッション

データエンジニアリング、データサイエンスコミュニティにおける急速なDelta Lakeの普及に、本当に我々は驚いています。DeltaスタンドアローンやこれらのDeltaコネクターの詳細に興味があるのであれば、以下のリソースをチェックしてみてください。

クレジット

Delta Standalone 0.3.0のアップデート、ドキュメント更新、貢献をしてくれたAlex、Allison Portis、Denny Lee、Gerhard Brueckl、Pawel Kubit、Scott Sandre、Shixiong Zhu、Wang Wei、Yann Byron、Yuhong Chen、gurunathに感謝の意を表します。

Databricks 無料トライアル

Databricks 無料トライアル

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
0