LoginSignup
1
0

More than 1 year has passed since last update.

AWS Glueでデータレイクフレームワーク(Hudi)を試してみた

Last updated at Posted at 2023-03-26

背景・目的

Glueでいくつかのデータレイクフレームワークが利用できますが、それぞれどのような特徴かわからなかったので
整理しつつ、簡単に触ってみたいと思います。ここでは、Hudiを扱います。

まとめ

  • Glueでは、Hudiのバージョンは0.12.1または、0.10.1がサポートされています。
  • Hudiには下記の2つのテーブルタイプをサポートしており、ワークロードにより選択します。
    • Copy On Write Table
    • Merge On Read

概要

AWS Glue での Hudi フレームワークの使用

Glueバージョン Hudiバージョン
4.0 0.12.1
3.0 0.10.1

データレイクフレームワーク

AWS Glueのドキュメントによると、下記の特徴があると記載されています。

  • データレイクに保存するファイルの増分データ処理を簡素化できる
  • S3に保存するデータに対する読み書きが一貫性のあるトランザクションにより実行できる

また、Glue3.0以降では下記のOSSデータレイクフレームワークがサポートされているとのことです。

  • Apache Hudi
  • Linux Foundation Delta Lake
  • Apache Iceberg

以降で、それぞれの特徴を整理します。

Apache Hudiの概要

Apache Hudiのドキュメントを元に特徴を整理します。

  • 発音は、hooodieとのこと。
  • ストリーミングデータレイクプラットフォーム
  • Hudi は、下記を提供しながらデータをオープン ソース ファイル形式に保つ。
    • テーブル
    • トランザクション
    • 効率的な upsert/delete
    • 高度なインデックス
    • ストリーミング インジェスト サービス
    • データ クラスタリング/圧縮の最適化
    • 同時実行性
  • ストリーミングだけではなく増分バッチパイプラインもサポート

Core Concepts to Learn

Timeline

Hudiは、テーブルに対して実行された全てのアクションのタイムラインを維持し、テーブルの瞬間的なビューを提供するのに役立つ。
Hudi instantは下記のコンポーネントを提供する。

  • Instant action
    • テーブルで実行されるアクションのタイプ
  • Instant time
    • タイムスタンプ
  • state
    • インスタントの現在の状況

タイムラインで実行されるアクションが瞬時に基づいてアトミックタイムラインの一貫性があることを保証している。

  • COMMITS
    • バッチをテーブルにアトミックに書き込む
  • CLEANS
    • テーブル内の古いバージョンのファイルを削除するバックグラウンド
  • DELTA_COMMIT
    • レコードのバッチをMergeOnReadタイプのテーブルにアトミックに書き込むを指定し、デルタログに書き込む
  • COMPACTION
    • 差分データ構造を調整するためのバックグラウンドアクティビティ。
    • 更新の行ベースのログファイルを列形式に移動する。
  • ROLLBACK
    • Commit/DELTA_COMMITが失敗してロールバックされ、そのような書き込み中に生成された部分ファイルが削除されたことを示します
  • SAVEPOINT
    • 特定のファイル グループを「保存済み」としてマークし、クリーナーがそれらを削除しないようにする。
    • 災害/データ復旧シナリオの場合に、タイムライン上のポイントにテーブルを復元するのに役立つ。

インスタントのStateは下記のものがあります。

  • REQUESTED
    • アクションがスケジュールされているが、開始されていないことを示します
  • INFLIGHT
    • アクションが現在実行中であることを示します
  • COMPLETED
    • タイムライン上のアクションの完了を示します

File Layouts

Hoodieのファイルレイアウト構造は、下記のとおりです。

  • ベースパスの下のディレクトリ構造にデータテーブルを配置する。
  • テーブルはパーティションに分割される。
  • パーティション毎に、ファイルグループ単位にファイルは編成される。また、ファイルIDによって一意に特定される。
  • 各ファイルグループには、複数のファイルスライスが含まれる。
  • 各スライスには、特定のCommit/Compactionの瞬間に生成されたParquet(これをベースファイルという)と、ベースファイルへのInsert/Updateを含む一連のログファイル(.log)が含まれる。

image.png
※ 出典:Apache HudiのFile Layouts

Table & Query Types

Hudiのテーブルタイプは下記を定義する。

  • DFSのデータのインデックス作成方法
  • レイアウト
  • データの書き込み方法

クエリの種類は、基となるデータがクエリに公開される方法を定義する。
テーブルタイプと、サポートされるクエリタイプは下記のとおりです。

Table Type Supported Query Types
Copy On Write Snapshot Queries + Incremental Queries
Merge On Read Snapshot Queries + Incremental Queries + Read Optimized Queries

Table Type

下記のテーブルタイプがあります。

  • Copy On Write
    • カラムナー形式のみ使用する。
    • 更新時には、同期マージを実行することによりファイルを単純にバージョン管理及び再書き込みする。
  • Merge On Read
    • カラムナーベースとRowベースの組み合わせを使用して保存する。
    • 更新はdelta fileに記録される。その後圧縮され新しいバージョンのカラムナーファイルが同期or非同期で生成される。

トレードオフは下記のとおりです。

Trade-off Copy On Write Merge On Read
データレイテンシー 高い 低い
クエリレイテンシー 低い 高い
Update Cost(I/O) 高い
(Parquetへの再書き込み)
低い
(deltaログへの書き込み)
Parquet File Size 小さい
(更新コストは高い)
大きい
(更新コストは低い)
書き込みの増幅幅(Write Amplification) ※1 高い 低い(コンパクション戦略による)

※1 対象のサイズより書き込みサイズが大きくなること?

Query types

下記のクエリタイプがあります。

  • Snapshot Queries
    • 特定のCommit/Compactionの時点のテーブルの最新スナップショットを参照する。
    • Copy on Writeの場合には、既存のParquetのテーブルの置換を提供すると同時に更新と削除を提供する。
    • Merge on Readの場合には、最新のファイルスライスのベースファイルと、delta ファイルを On the flyでマージすることでニアリアルタイムのデータを公開する。
  • Incremental Query
    • 特定のCommit/Compactionの以降のテーブルに書き込まれた新しいデータのみを参照する。これにより効果的に変更ストリームが提供され増分データパイプラインが有効になる。
  • Read Optimized Queries
    • 特定のCommit/Compactionの時点のテーブルの最新スナップショットを参照する。
    • 最新のファイルスライス内のベース/カラムナーテーブルと比較して、同じカラムナークエリパフォーマンスを保証する。

トレードオフは下記のとおりです。

Trade-off Snapshot Read Optimized
データレイテンシー 低い 高い
クエリレイテンシー 高い
(マージベース/カラムファイルと行ベースのdelta / log file)
低い
(Rowベース/カラムファイルのパフォーマンス)

Copy On Write Table

Copy On Write Tableのファイル スライスにはベース ファイルとカラムナ ファイルのみが含まれ、各コミットはベース ファイルの新しいバージョンを生成する。
列方向のデータのみが存在するように、すべてのコミットで暗黙的に圧縮する。 その結果、書き込み増幅 (受信データの 1 バイトに対して書き込まれるバイト数) ははるかに大きくなり、読み取り増幅はゼロになる。 主に読み取り負荷が高い分析ワークロードにとって望ましい特性。

下記は、Copy On Write Tableのテーブルに書き込まれて、2つのクエリが実行されている場合に概念的にどのように機能するか示したもの。

image.png
※ 出典:Apache Hudiの Table & Query Types

Merge On Read Table

Merge On Read Tableは、Copy On Write Tableのスーパーセット。
このテーブルタイプは、読み取りと書き込みをバランスさせて、ほぼリアルタイムのデータを提供しようとする。
compactorが重要な役割を担う。compactorはどのdelta logをカラムナーベースのベースファイルにマージするか選択してクエリのパフォーマンスを抑制する。

下記は、Merge On Read Tableがどのように機能しているか。スナップショットクエリと、Read Optimized Queriesの2種類を示したもの。

image.png
※ 出典:Apache Hudiの Table & Query Types

  • スナップショットクエリでは、マージ前のデータは見えない。

Indexing

Hudiは、インデックスメカニズムを使用して、効果的にUpsertを提供している。

  • インデックスメカニズムは、Hudi key(レコードキー+パーティションパス)をファイルIDにマッピングしている。
  • レコードキーとファイルグループ/ファイルIDのマッピングは、レコードの最初のバージョンがファイルに書き込まれると変更されることはない。(すべてのバージョンが含まれる。)
  • Copy-On-Write tablesの場合、これにより書き換えるファイルを決定するために、データセット全体に対して結合する必要がないので、Upsert/Deleteが高速
  • Merge-On-Read tablesの場合、特定のベースファイルをマージするレコード量を制限できる。
    • 特定のベースファイルは、そのベースファイルの一部であるレコードの更新に対してマージする必要がある。
    • インデックス作成コンポーネントを使用しない設計では、すべての受信、削除レコードに対して全てのベースファイルをマージする必要がある。
      image.png
      ※ 出典:Apache Hudiの Indexing

Index Types in Hudi

Hudiがサポートしているインデックス作成オプションは、下記の通りです。

  • Bloom Index(fefault)
    • レコードキーから構築されたBloom filterを使用してオプションでレコードキー範囲を使用して候補ファイルをプルーニングする
  • Simple index
    • ストレージ上のテーブルから抽出されたキーに対して、Update/Deleteのlean 結合する。(ちょっとわからない。)
  • HBase Index
    • HBaseのテーブルでインデックスをマッピングする
  • Bring your own implementation
    • パブリックAPIを拡張してカスタムインデックスを作成できる

Writeはhoodie.index.typeの構成オプションを使用して、これらのオプションのいずれかを選択する。
さらに、hoodie.index.classを使用して、SparkHoodieIndexサブクラスを提供し、カスタムインデックスの実装を採用することもできる。(Apache Spark Write用)

Indexには、Global IndexとNon-Global Indexがある。

  • Global index
    • テーブルの全てのパーティションに渡りキーの一意性を強制する。特定のレコードキーに対してテーブル1つにレコードが存在することを保証する。そのため、Update/Deleteのコストは、サイズとともに増加する。
  • Non-Global Index
    • 規定のインデックスの実装では、特定のパーティション内のみ適用
    • インデックスルックアップは、更新と削除されたレコードの数になり優れたパフォーマンスを提供する。

なお、BloomとSimpleの両方に、下記のオプションがある。

  • hoodie.index.type=GLOBAL_BLOOM
  • hoodie.index.type=GLOBAL_SIMPLE
    ※ HBase Indexは、本質的にGlobal Index一択。

Indexing Strategies

データはさまざまな量、速度、さまざまなアクセス パターンで受信されるため、ワークロード タイプごとにさまざまなインデックスを使用できる。
いくつかの典型的なワークロード タイプについて下記で説明する。

Workload 1: Late arriving updates to fact tables

大量のトランザクション データを NoSQL データ ストアに格納している場合。例)ライドシェアの場合のトリップテーブル、株式の売買、電子商取引サイトでの注文等が考えられる。
これらのテーブルは通常、最新のデータがランダムに更新され、トランザクションが後で決済されるかデータが修正されたため、古いデータにロングテールの更新が行われ、常に拡大していく。
つまり、ほとんどの更新は最新のパーティションに送られ、古いパーティションに送られる更新はほとんどない、

image.png
※ 出典:Apache Hudiの Indexing

このようなワークロードでは、BLOOM インデックスが適切に機能するとのこと。
これは、インデックス ルックアップが適切なサイズのブルーム フィルターに基づいて多くのデータ ファイルをプルーニングするため。 さらに、特定の順序を持つようにキーを構成できる場合は、比較するファイルの数は、範囲の枝刈りによってさらに削減される。 Hudi は、すべてのファイル キー範囲を使用してインターバル ツリーを構築し、更新/削除されたレコードのどのキー範囲にも一致しないファイルを効率的に除外する。

Workload 2: De-Duplication in event tables

イベントストリーミングの場合。Apache Kafka または同様のメッセージ バスからのイベントは、通常、ファクト テーブルの 10 ~ 100 倍のサイズであり、多くの場合、「時間」(イベントの到着時間/処理時間) を第一候補として扱う。たとえば、IoT イベント ストリーム、クリック ストリーム データ、広告インプレッションなどが考えられる。
ほとんどの場合、追加のみのデータであるため、挿入と更新は最後のいくつかのパーティションにのみ適用される。 重複イベントはend-end pipelineのどこにでも入るため、データ レイクに保存する前に重複を排除することが一般的な要件となる。

image.png
※ 出典:Apache Hudiの Indexing

一般に、低コストで解決するのが非常に困難な問題。 キー値ストアを使用して HBase Indexでこの重複除外を実行することもできるが、インデックス ストレージのコストはイベント数に比例して増加するため、非常に高価になる可能性がある。
実際、範囲刈り込みを使用した Bloom Indexは、ここでの最適なソリューション。 多くの場合、時間は第一候補であるという事実を利用して、挿入されたレコードが単調に増加するキーを持つように event_ts + event_id などのキーを構築できます。 これにより、最新のテーブル パーティション内であっても大量のファイルが整理され、大きな利益が得られる。

Workload 3: Random updates/deletes to a dimension table

これらのタイプのテーブルには、通常、高次元のデータが含まれており、ユーザー プロファイルやmerchant情報などの参照データが保持される。 これらは忠実度の高いテーブルであり、多くの場合、更新は小規模だが、データセットの古いものから新しいものまで、多数のパーティションとデータ ファイルにまたがる。 多くの場合、これらのテーブルもパーティション化されていない。これは、これらのテーブルをパーティション化する適切な方法がないため。

image.png
※ 出典:Apache Hudiの Indexing

範囲/フィルターを比較して十分な数のファイルを除外できない場合、Bloom インデックスはメリットをもたらさない可能性がある。
このようなランダムな書き込みワークロードでは、更新がテーブル内のほとんどのファイルに影響を与えるため、通常、Bloom Filterは、何らかの着信更新に基づいてすべてのファイルに対して真陽性を示す。
その結果、範囲/フィルターを比較することになり、最終的にすべてのファイルに対して着信更新をチェックするだけになる。 Simple Indexは、事前のプルーニング ベースを行わず、すべてのデータ ファイルから関連するフィールドと直接結合するため、より適している。 運用上のオーバーヘッドが許容範囲内で、これらのテーブルのルックアップ時間が大幅に短縮される場合は、HBase Index を使用できます。

グローバル インデックスを使用する場合、ユーザーは hoodie.bloom.index.update.partition.path=true または hoodie.simple.index.update.partition.path=true を設定して、パーティション パスの値が変更される可能性がある場合に対処することも検討する必要がある。(たとえば、自宅が都市でパーティション化された users テーブルなどの更新の場合、 ユーザーが別の都市に移動するケースがある。。 これらのテーブルは、Merge-On-Read テーブル タイプの優れた候補でもある。)

Metadata Table

Motivation for a Metadata Table

Apache Hudi メタデータ テーブルを使用すると、クエリの読み取り/書き込みパフォーマンスを大幅に向上させることができる。 メタデータ テーブルの主な目的は、「ファイルのリスト」操作の要件をなくすこと。

データの読み取りおよび書き込み時には、ファイルの一覧表示操作が実行され、ファイル システムの現在のビューが取得される。
データ セットが大きい場合、すべてのファイルを一覧表示することはパフォーマンスのボトルネックになる可能性があるが、クラウドストレージ システムの場合は、多数のファイル一覧表示要求により、特定の要求制限が原因でスロットリングが発生することがある。 代わりに、メタデータ テーブルは積極的にファイルのリストを維持し、再帰的なファイル リスト操作の必要性を取り除く。

Some numbers from a study

TPCDS ベンチマークを実行すると、単一フォルダーの p50 リストのレイテンシーは、ファイル/オブジェクトの量に比例して変化する。
一方、メタデータ テーブルからのリストは、ファイル/オブジェクト数に比例してスケーリングされず、非常に大きなテーブルであっても読み取りごとに約 100 ~ 500 ミリ秒かかる。 さらに、タイムラインサーバーはメタデータの一部をキャッシュし (現在はライターのみ)、リストに対して最大 10 ミリ秒のパフォーマンスを提供するとのこと。

Supporting Multi-Modal Index

マルチモーダル インデックスを使用すると、ファイル インデックスでのルックアップ パフォーマンスと、データ スキップによるクエリのレイテンシを大幅に改善できる。
ファイル レベルのブルーム フィルターを含むBloom Filter Indexは、キーの検索とファイルのプルーニングを容易にする。 すべての列の統計を含む列統計インデックスにより、たとえば Spark でのクエリ計画において、ライターとリーダーの両方でキーと列値の範囲に基づいてファイルのプルーニングが改善される。 マルチモーダル インデックスは、メタデータ テーブルのインデックスを含む独立したパーティションとして実装される。

Enable Hudi Metadata Table and Multi-Modal Index

0.11.0 以降、同期更新を伴うメタデータ テーブルとメタデータ テーブル ベースのファイル リストがデフォルトで有効になっている。

Deployment considerations

メタデータ テーブルが最新の状態に保たれるようにするには、同じ Hudi テーブルに対するすべての書き込み操作で、さまざまな展開モデルで上記以外に追加の構成が必要。 メタデータ テーブルを有効にする前に、同じテーブルのすべてのライターを停止する必要がある。

いくつかのデプロイモデルがあるが、詳細はドキュメントを参照してください。

Write Operations

Operation Types

UPSERT

これは、インデックスを検索することによって、入力レコードが最初に挿入または更新としてタグ付けされるデフォルトの操作。 レコードは、ヒューリスティックが実行されて、ファイル サイズなどを最適化するためにストレージにパックする最適な方法を決定した後に、最終的に書き込まれる。 この操作は、入力にほぼ確実に更新が含まれるデータベース変更キャプチャなどのユースケースに推奨される。 ターゲット テーブルに重複が表示されることはない。

INSERT

インデックス ルックアップ ステップを完全にスキップmする。 したがって、ログの重複排除などのユースケースでは、アップサートよりもはるかに高速になる可能性がある。(以下で説明する重複をフィルター処理するオプションと組み合わせて使用する)。 これは、テーブルが重複を許容できるユースケースにも適しているが、Hudi のトランザクション書き込み/増分Pull/ストレージ管理機能のみが必要。

BULK_INSERT

upsert 操作と insert 操作の両方が、入力レコードをメモリに保持して、(とりわけ) ストレージ ヒューリスティック計算を高速化するため、最初に Hudi テーブルを最初にロード/ブートストラップするのが面倒になる可能性がある。 Bulk Insertは、Insertと同じセマンティクスを提供すると同時に、ソートベースのデータ書き込みアルゴリズムを実装する。これは、数百 TB の初期ロードに対して非常に適切にスケーリングできる。 ただし、これは、挿入/アップサートのようにファイルサイズを保証するのではなく、ファイルのサイジングで最善の努力をするだけとのこと。

DELETE

Hudi は、ユーザーが別のレコード ペイロードの実装を指定できるようにすることで、Hudi テーブルに格納されたデータに対する 2 種類の削除の実装をサポートする。

  • Soft Deletes
    • レコード キーを保持し、他のすべてのフィールドの値を null にするだけ。
    • 適切なフィールドがテーブル スキーマで null 可能であることを確認し、これらのフィールドを null に設定した後にテーブルをアップサートするだけで実現できます。
  • Hard Deletes :
    • 物理削除
    • 下記の3つの異なる方法で実現する。
      • DataSource を使用して、OPERATION_OPT_KEY を DELETE_OPERATION_OPT_VAL に設定。これにより、送信されている DataSet 内のすべてのレコードが削除される
      • DataSource を使用して、PAYLOAD_CLASS_OPT_KEY を「org.apache.hudi.EmptyHoodieRecordPayload」に設定します。 これにより、送信されている DataSet 内のすべてのレコードが削除される。
      • DataSource または DeltaStreamer を使用して、_hoodie_is_deleted という名前の列を DataSet に追加します。 この列の値は、削除するすべてのレコードに対して true に設定する必要があり、Upsertするレコードに対して false または null のままにする必要がある。

Writing path

以下は、Hudi の書き込みパスと、書き込み中に発生する一連のイベントの内部を示している。

  1. Deduping
    • 入力レコードが同じバッチ内に重複キーを持つ可能性があり、重複をキーで結合または削減する必要がある。
  2. Index Lookup
    • インデックス ルックアップが実行され、入力レコードがどのファイル グループに属しているかを識別する。
  3. File Sizing
    • 次に、以前のコミットの平均サイズに基づいて、Hudi は小さなファイルに十分なレコードを追加して、構成された最大制限に近づける計画を立てる。
  4. Partitioning
    • 特定の更新と挿入をどのファイル グループに配置するか、または新しいファイル グループを作成するかを決定するパーティショニングに到達する
  5. Write I/O
    • ここで、新しいベース ファイルの作成、ログ ファイルへの追加、または既存のベース ファイルのバージョン管理のいずれかである書き込み操作を実際に実行する。
  6. Update Index
    • 書き込みが実行されたので、インデックスを更新する。
  7. Commit
    • 最後に、これらすべての変更をアトミックにコミットする。 (コールバック通知が公開される)
  8. Clean (if needed)
    • コミットに続いて、必要に応じてクリーニングが呼び出される。
  9. Compaction
    • MOR テーブルを使用している場合、圧縮はインラインで実行されるか、非同期でスケジュールされる
  10. Archive
    • 最後に、古いタイムライン アイテムをアーカイブ フォルダーに移動するアーカイブ ステップを実行する。

Schema Evolution

スキーマの進化により、ユーザーは Hudi テーブルの現在のスキーマを簡単に変更して、時間とともに変化するデータに適応させることができる。 0.11.0 リリースの時点で、Spark SQL (Spark 3.1.x、3.2.1 以降) スキーマの進化のための DDL サポートが追加されており、実験的とのこと。

  • 列 (ネストされた列を含む) は、追加、削除、変更、および移動可能
  • パーティション列は展開できない。
  • 配列型のネストされた列に対して操作を追加、削除、または実行することはできない。

実践

AWS Glue での Hudi フレームワークの使用を元に試してみます。

事前準備

S3バケットとデータセットを用意

インプットとアウトプット用のS3バケットと、入力のデータセットを用意します。

$ cat *
{"id":1,"value":1}
{"id":2,"value":2}
{"id":3,"value":3}
{"id":4,"value":4}
{"id":5,"value":5}
{"id":6,"value":6}
{"id":7,"value":7}
{"id":8,"value":8}
{"id":9,"value":9}
{"id":10,"value":10}
{"id":11,"value":11}
{"id":12,"value":12}
{"id":13,"value":13}
{"id":14,"value":14}
{"id":15,"value":15}
{"id":16,"value":16}
{"id":17,"value":17}
{"id":18,"value":18}
{"id":19,"value":19}
{"id":20,"value":20}
{"id":21,"value":21}
{"id":22,"value":22}
{"id":23,"value":23}
{"id":24,"value":24}
{"id":25,"value":25}
{"id":26,"value":26}
{"id":27,"value":27}
{"id":28,"value":28}
{"id":29,"value":29}
{"id":30,"value":30}
{"id":31,"value":31}
{"id":32,"value":32}
{"id":33,"value":33}
{"id":34,"value":34}
{"id":35,"value":35}
{"id":36,"value":36}
{"id":37,"value":37}
{"id":38,"value":38}
{"id":39,"value":39}
{"id":40,"value":40}
$

$ aws s3 ls {入力用S3バケット}
2023-03-26 12:35:42        191 file1.json
2023-03-26 12:35:42        209 file2.json
2023-03-26 12:35:41        209 file3.json
2023-03-26 12:35:41        210 file4.json
$

Hudi の有効化

  • hudi を --datalake-formats のジョブパラメータの値として指定する詳細については、「AWS Glue ジョブのパラメータ」を参照してください。
  • AWS Glue ジョブ用に、--conf という名前でキーを作成し、それに次の値を設定する。または、スクリプトで SparkConf を使用して、次の構成を設定することも可能。これらの設定は、Apache Spark が Hudi テーブルを適切に処理するために使用する。

例: Hudi テーブルを Amazon S3 に書き込み、そのテーブルを AWS Glue データカタログに登録する

  1. Visual Editorでジョブの型を作成します。
    image.png

  2. ジョブパラメータを設定します。

    --datalake-formats hudi
    
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false
    

    image.png

  3. スクリプトを修正します。

    • オプションに各パラメータを設定します。
    • 書き込みは、DataFrameで行います。
    df = ApplyMapping_node2.toDF()
    
    additional_options={
        "hoodie.table.name": "hudi_table",
        "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
        "hoodie.datasource.write.operation": "upsert",
        "hoodie.datasource.write.recordkey.field": "id",
        "hoodie.datasource.write.precombine.field": "id",
        "hoodie.datasource.write.partitionpath.field": "id",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": "transaction_db",
        "hoodie.datasource.hive_sync.table": "hudi_table",
        "hoodie.datasource.hive_sync.partition_fields": "id",
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
        "path": "s3://{出力用S3バケット名}/hudi/"
    }
    
    df.write.format("hudi") \
        .options(**additional_options) \
        .mode("overwrite") \
        .save()
    
  4. ジョブを実行します。
    image.png

  5. 出力先のデータを確認します。ファイルができていました。
    image.png

考察

今回、データレイクのフレームワークの一つであるHudiを試してみました。次回は書き込みと読み込みの性能について、複数のテーブルタイプで見てみたいと思います。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0