Getting Started With ETL Ingestion into Delta Lake - The Databricks Blogの翻訳です。
データの取り込みは、Kafkaのように常に稼働し続けるストリーミングプラットフォームを使う必要があったり、どのファイルがまだ取り込まれていないのかを追跡できる必要があったりして、複雑かつ困難なものとなります。この記事では、データレイク上のフォルダからDelta Lakeテーブルにデータを取り込む二つの方法であるオートローダーとCOPY INTOに関して議論します。これら2つの機能は、Kafkaのような分散リアルタイムストリーミングデータシステムを使うことなしに、べき等性を保持しつつ、インクリメンタルにデータレイクのフォルダから直接データを取り込めるので、特にデータエンジニアにとって有用なものとなります。インクリメンタルETLプロセスを劇的に簡素化することに加えて、既存のデータを再処理するのではなく、新規データのみを取り込むので、データの取り込みが非常的に効率的となります。
ここまでで2つのコンセプトに触れました。べき等性とインクリメンタルETLです。これらが何を意味するのかを見ていきましょう。
- **べき等性(Idempotent)**は、同じデータを処理する際には、常に同じ結果を生み出すことを意味します。例えば、エレベーターのボタンはべき等性があると言えます。あなたが11階のボタンを押し、その後にエレベーターに入ってきた他の人も同じ階のボタンを押すことができます。これら全ての11階ボタンのプッシュは同じデータですので、処理は一度のみとなります。しかし、他の誰かが3階のボタンを押したら、これは新たなデータとなるので、それに応じた処理が行われます。 誰が押したのか、どの階に到着するのかに関係なく、特定の階のボタンを押し、対応する階に移動するということは、常に同じ結果をもたらします。
- インクリメンタルETL - べき等性はインクリメンタルETLの基礎となります。新たなデータのみがインクリメンタルに処理されるので、インクリメンタルETLは非常に効率的です。インクリメンタルETLはべき等性のあるデータ取り込みからスタートし、BIと機械学習で利用するゴールドテーブルに至るまで、この特性を複数のステージングテーブルと変換処理に適用していきます。
インクリメンタルETLアーキテクチャを以下に示します。この記事では、図の左に示すように外部ソースからテーブルにデータを取り込む方法にフォーカスします。
継続的かつインクリメンタルにデータを取り込むこともできますし、スケジュールジョブで取り込むこともできます。COPY INTOとオートローダーは両方をカバーしています。
COPY INTO
COPY INTOは、フォルダからDelta LakeテーブルにデータをロードするSQLコマンドです。以下のコードスニペットでは、いかに簡単にソースのingestLandingZone
から、コピー先のDelta LakeテーブルingestCopyIntoTablePath
にJSONファイルをコピーできるのかを示しています。
いくつか注意すべきことを列挙します。
- データソースに少数(数千)のファイルが存在する場合には、COPY INTOコマンドは、スケジュール処理、アドホックの取り込みにおいては完璧なものとなります。
- サポートしているファイルフォーマットは、JSON、CSV、AVRO、ORC、PARQUET、TEXT、バイナリーファイルです。
- 上の例のように、取り込み先にはデータベース上の既存Delta Lakeテーブルや、Delta Lakeテーブルの位置を指定することができます。
- ノートブックでCOPY INTOを使えるだけでなく、Databricks SQLでデータを取り込むのもベストな方法です。
オートローダー
Auto Loaderは、ディレクトリ一覧やファイルの通知を用いることで、フォルダの新規データをDelta Lakeテーブルに取り込むPython、Scalaのメソッドを提供します。オートローダーは、Apache Spark™の構造化ストリーミングのソースですが、継続的に動作し続ける必要はありません。以下で議論するように、処理をジョブに切り替えて処理後にオフにするtrigger onceオプションを使用することもできます。ディレクトリ一覧メソッドは、ディレクトリないのファイルをモニタリングし、新規ファイルあるいは前回新規データが処理されてから更新されたファイルを特定します。このメソッドはデフォルトであり、フォルダ内のファイルの数が少ない場合には望ましい選択肢となります。他のシナリオにおいては、ファイル通知メソッドが、新規ファイルの出現、更新の際にクラウドサービスを利用して通知を送信します。
チェックポイントは、ETLが停止された際に状態を保存します。チェックポイントを活用することで、オートローダーは継続的に稼働でき、定期的、スケジュールジョブの一部に組み込むことができます。オートローダーが停止され、再起動された際には、最新の状態に復帰するためにチェックポイントを利用し、既に処理されたファイルを再度処理することはしません。以下の例では、オートローダージョブを制御する他の方法として、trigger onceオプションが設定されています。一回だけジョブを実行するので、ジョブの開始後、初回ジョブが実行された際の新規データ全てを処理した後に停止することを意味します。
以下の例では、オートローダーに対して、どれだけ簡単に新規データを取り込み、Delta Lakeテーブルに書き込むように設定できるのかを示しています。
この一つのオートローダー文によって以下の設定が行われています。
- cloudFilesストリームの設定
- 期待されるファイルフォーマットの特定
- スキーマ情報の場所の指定
- 新規データをチェックする場所の指定
- 指定されたフォーマットでファイルを書き出し
- このオートローダーは一度のみの実行
- 新規データを格納するテーブルの指定
デフォルトではスキーマは推定されますが、上の例のように明示的にスキーマを設定することも可能です。本シリーズにおける次の記事では、スキーマ推定と、スキーマ進化、データのレスキューに関して詳細を説明します。
まとめ
Databricksでは、不可能を可能に、困難を容易にするための努力を日々続けています。COPY INTOとオートローダーは、スケジュールETL、継続的ETLにおけるインクリメンタルなデータ取り込みを容易かつシンプルにします。さあ、これであなたはCOPY INTOとオートローダーを利用する準備ができました。これを使って何を作るのかを楽しみにしています!