Easy Ingestion to Lakehouse With COPY INTO - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
データレイクとして知られる新たなデータ管理アーキテクチャは、膨大なデータに対する直接のAIやBIをサポートするために数多くの企業やユースケースで独立して発生しました。分析や機械学習でデータレイクハウスを活用する際のキーとなる成功要因の一つは、オンプレミスのストレージプラットフォーム(データウェアハウス、メインフレーム)、リアルタイムストリーミングデータ、膨大なデータ資産を含む様々なデータをクイックかつ容易に取り込む能力となります。
レイクハウスへのデータ取り込みはETLパイプラインをフィードする継続的なプロセスなので、様々なフォーマット、タイプ、レーテンシーのデータを取り込むための複数のオプションを必要とすることでしょう。AWS S3、Google Cloud Storage、Azure Data Lake Storageのようなクラウドオブジェクトストレージに格納されているデータに対して、Databricksはデータエンジニアがクラウドストレージから継続的に数百万のファイルを取り込めるネイティブの組み込み機能であるAuto Loaderを提供しています。その他のストリーミングケース(センサーデータやクリックストリームデータなど)においては、Databricksは低レーテンシーでApache KafkaやAzure Event Hubs、AWS Kinesisのような著名なメッセージキューからApache Sparkがクイックにデータを取り込めるネイティブなコネクターを提供しています。さらに、多くの企業ではエンタープライズアプリケーション、データベース、メインフレームなどからレイクハウスに容易にデータを取り込めるように、FivetranのようなDatabricksとインテグレーションされている人気の取り込みツールを活用することができます。最後になりますが、アナリストはすでにどのファイルが処理されたのかを追跡する必要なしに、自動でレイクハウスに新規データをプルするために、シンプルなCOPY INTO
コマンドを活用することができます。
本記事では、クラウドオブジェクトストアからDelta Lakeへのファイルのバッチ取り込みを実行できるシンプルですがパワフルなSQLコマンドであるCOPY INTO
にフォーカスします。冪等性があるので複数回実行してもexactly-onceセマンティクスを保証し、インクリメンタルな追加とシンプルな変換処理をサポートしています。アドホックに一度のみ実行することも、Databricksワークフローを通じてスケジュールすることもできます。最近のDatabricksランタイムのリリースにおいては、COPY INTO
にはデータプレビュー、妥当性検証、強化されたエラーハンドリング、そして、ユーザーがクイックに作業を開始できるようにするためのスキーマレスDelta LakeテーブルへのCOPY INTO
を行う新たな手段などの新機能が導入され、クラウドオブジェクトストアからデータを取り込むエンドツーエンドのユーザージャーニーを完結させます。それでは、よくあるCOPY INTO
のユースケースから見ていきましょう。
1. 初めてのデータ取り込み
COPY INTO
では、ターゲットのDeltaテーブルにデータを取り込むので、既存のテーブルが必要となります。しかし、データがどのようなものか分からない場合があります。この場合、最初に空のDeltaテーブルを作成します。
CREATE TABLE my_example_data;
データを書き出す前に、データが適正であることを確認するためにプレビューしたいと考えるかもしれません。COPY INTO
のValidateモードは、Databricksランタイム10.3以降の新機能であり、クラウドオブジェクトストレージから大量のファイルを取り込む前にソースデータをプレビュー、検証することができます。
- データをパースすることができるか。
- スキーマはターゲットテーブルとマッチしているか、スキーマの進化が必要か。
- テーブルのnull可能性とチェック制約は満たしているか。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleData'
FILEFORMAT = CSV
VALIDATE
COPY_OPTIONS ('mergeSchema' = 'true')
データ検証のデフォルトの挙動は、いかなる問題もないことを保証するためにソースディレクトリのすべてのデータをパースするというものですが、プレビューのために返却される行は限定されます。オプションとして、VALIDATE
の後にプレビュー行数を指定することができます。
COPY_OPTION
のmergeSchema
はターゲットテーブルのスキーマが進化しても問題がないことを指定します。スキーマ進化は新規カラムの追加のみを許可し、既存カラムのデータ型の変更はサポートしていません。他のユースケースにおいては、お使いのデータパイプラインが厳密なスキーマ要件を持っており常にスキーマを進化させたくないため、テーブルスキーマをより厳密に管理したい場合にはこのオプションを除外することができます。しかし、上述のサンプルのターゲットDeltaテーブルは空なので、この時点でテーブルにはカラムはありません。このため、ここではCOPY_OPTION
のmergeSchema
を指定しなくてはなりません。
図1: COPY INTOのVALIDATEモードの出力
2. COPY INTOの設定
VALIDATEの結果(図1)の結果を見ると、データが希望している形になっていないことに気づくかもしれません。最初にデータセットをプレビューしておいて良かったと思いませんか?最初に気づくのは、CSVのヘッダーで指定されているようにカラム名が設定されていないということです。さらに悪いことに、ヘッダーがデータの行として表示されています。FORMAT_OPTIONS
を指定することで、CSVパーサーを設定することができます。次でこれらを追加しましょう。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleData'
FILEFORMAT = CSV
VALIDATE
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true', 'mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
FORMAT_OPTIONS
を使う際、inferSchema
オプションを指定することでCOPY INTO
にCSVファイルのデータ型を推定させるように指示することができます。そうしない場合、デフォルトのデータ型はすべてSTRING
となります。一方、AVROやPARQUETのようなバイナリーファイルフォーマットでは、これらは自身でスキーマを定義しているのでこのオプションは不要です。別のオプションのmergeSchema
は、単一のCSVファイルではなく包括的なCSVファイルのサンプルからスキーマを推定すべきということを宣言しています。フォーマット固有のオプションの包括的な一覧はドキュメントで確認することができます。
図2ではヘッダーが適切にパースされた検証結果が表示されています。
図2: headerとinferSchemaが有効化されたCOPY INTOのVALIDATEモードの出力
3. Deltaテーブルへのデータの追加
これでプレビューは大丈夫に見えますので、VALIDATE
キーワードを削除しCOPY INTO
コマンドを実行することができます。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleData'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true', 'mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
COPY INTO
は取り込んだファイルの状態を追跡し続けます。INSERT INTO
のようなコマンドと違い、ユーザーはCOPY INTO
の冪等性を利用することができ、同じソースデータから複数回COPY INTO
を実行しても、ターゲットテーブルにデータの重複が生じないことを意味します。
COPY INTO
はアドホックに一度のみ実行することも、Databricksワークフローを通じてスケジュールすることもできます。COPY INTO
はネイティブでは低レーテンシーの取り込みをサポートしていませんが、Apache Airflowのようなおーけすトレータを通じてCOPY INTO
を起動することができます。
図3: タスクをスケジュールするためのDatabricksワークフローUI
4. COPY INTOによるセキュアなデータアクセス
COPY INTO
はいくつかの方法でセキュアなアクセスをサポートしています。このセクションでは、最近のリリースDatabricks SQLとノートブックの両方で利用できる新たな2つのオプションをハイライトします。
Unity Catalog
Databrick Unity Catalogの正式提供によって、COPY INTO
でサポートされているソースやデータフォーマットからUnity Catalogのマネージドテーブル、外部テーブルにデータを取り込むためにCOPY INTO
を使用することができます。また、Unity Catalogは生データに対するセキュアなアクセスを設定するための新たなオプションを追加しており、クラウドオブジェクトストレージのデータにアクセスするためにUnity Catalogの外部ロケーションやストレージ資格情報を活用することができます。詳細は、Unity CatalogによるデータロードでのCOPY INTOの活用をご覧ください。
一時資格情報
Unity Catalogやインスタンスプロファイルを設定していない場合にはどうなるでしょうか?信頼するサードパーティのバケットからのデータはどうでしょうか?アドホックなバルク取り込みのユースケースに対応するために、インラインの一時資格情報を用いたデータの取り込みを可能とする便利なCOPY INTO
の機能があります。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleDataPath' WITH (
CREDENTIAL (AWS_ACCESS_KEY = '...', AWS_SECRET_KEY = '...', AWS_SESSION_TOKEN = '...')
)
FILEFORMAT = CSV
5. 取り込みファイルのフィルタリング
ファイル名がパターンにマッチするファイルのサブセットの取り込みはどうでしょうか?ソースディレクトリからロードするファイルを特定するglobパターンを適用することができます。例えば、以下のようにファイル名にraw_data
を含むファイルをフィルタリングし取り込みましょう。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleDataPath'
FILEFORMAT = CSV
PATTERN = '*raw_data*.csv'
FORMAT_OPTIONS ('header' = 'true')
6. 期間に基づくファイルの取り込み
データエンジニアリングにおいては、特定のタイムスタンプの前後に編集されたファイルを取り込む必要があるというのはよくある話です。2つのタイムスタンプ間のデータも興味の対象となり得ます。COPY INTO
のフォーマットオプションmodifiedAfter
、modifiedBefore
を用いることで、選択した時間ウィンドウのデータをDeltaテーブルに取り込めるようになります。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleDataPath'
FILEFORMAT = CSV
PATTERN = '*raw_data_*.csv'
FORMAT_OPTIONS('header' = 'true', 'modifiedAfter' = '2022-09-12T10:53:11.000+0000')
7. forceオプションによるデータの修正
COPY INTO
はデフォルトでは冪等なので、同じソースファイルに対して同じクエリーを複数回実行しても、初回からターゲットテーブルに変化はありません。現実の環境においては、クラウドストレージのソースデータファイルが修正のために後で変更されることが起こりうるので、変更をターゲットテーブルに伝播させる必要があります。このようなケースでは、ソースから最新のデータを取り込む前に、最初にターゲットテーブルからデータを削除することは可能です。このオペレーションを行うには、コピーオプションのforce
をtrue
に設定するだけです。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleDataPath'
FILEFORMAT = CSV
PATTERN = '*raw_data_2022*.csv'
FORMAT_OPTIONS('header' = 'true')
COPY_OPTIONS ('force' = 'true')
8. シンプルな変換処理の適用
カラムの名称を変更したいとしたらどうでしょう?あるいは、ソースデータが変更され、以前のカラム名が別の何かに変更されたとしたらどうでしょうか。2つの別々のカラムとしてではなく、単一のカラムとしてデータを取り込みたいと考えることでしょう。COPY INTO
でSELECT
を活用することで、シンプルな変換処理を実行することができます。
COPY INTO demo.my_example_data
FROM ( SELECT concat(first_name, " ", last_name) as full_name,
* EXCEPT (first_name, last_name)
FROM 's3://my-bucket/exampleDataPath'
)
FILEFORMAT = CSV
PATTERN = '*.csv'
FORMAT_OPTIONS('header' = 'true')
COPY_OPTIONS ('force' = 'true')
9. COPY INTOによるエラーハンドリングと観測可能性
エラーハンドリング
破損しているデータの取り込みはどうしたらいいのでしょうか?ファイル破損のよくある例は:
- 不適切なファイルフォーマットのファイル
- 解凍の失敗
- 読み込めないファイル(不正なParquetなど)
COPY INTO
のフォーマットオプションであるignoreCorruptFiles
は、処理中にこれらのファイルをスキップする役に立ちます。COPY INTO
コマンドの結果、num_skipped_corrupt_files
カラムにスキップされたファイル数が返却されます。さらに、これらの破損ファイルはCOPY INTO
の取り込み状態によってトラックされないので、破損が修正されたら以降の実行でリロードすることができます。このオプションはDatabrikcsラインタイム11.0以降で利用できます。
VALIDATEモードのCOPY INTO
を実行することで、破損と検知されたファイルを確認することができます。
COPY INTO my_example_data
FROM 's3://my-bucket/exampleDataPath'
FILEFORMAT = CSV
VALIDATE ALL
FORMAT_OPTIONS('ignoreCorruptFiles' = 'true')
観測可能性
Databricksランタイム10.5ではファイルのメタデータカラムがサポートされ、ユーザーは_metadata
という隠しSTRUCTカラムをクエリーすることで、パス、名称、サイズ、変更時刻のような取り込みファイルの主要なプロパティを監視、取得することができます。ターゲットにこの情報を含めるには、COPY INTO
のクエリーで明示的に_metadata
カラムを参照する必要があります。
COPY INTO my_example_data
FROM (
SELECT *, _metadata source_metadata FROM 's3://my-bucket/exampleDataPath'
)
FILEFORMAT = CSV
Auto Loderと何が違いますか?
COPY INTO
は少数のファイル(数千ファイル以下)を含むソースディレクトリの場合、かつ、SQLを好む場合に使用できるシンプルかつパワフルなコマンドです。さらに、COPY INTO
は多くのインテグレーションパートナーによる一般的なパターン同様、Delta Lakeにデータをプッシュする際にJDBC経由で利用することができます。ストリーム、バッチの両方で膨大な数を取り込む際には、Auto Loaderを使うことをお勧めします。また、メダリオンアーキテクチャに基づくモダンデータパイプラインに関しては、宣言型アプローチによる自動エラーハンドリング、品質管理、データリネージ、エクスペクテーションを活用するDelta Live TablesパイプラインでAuto Loaderを使うことをお勧めします。
どうすれば使い始められますか?
使い始めるには、Databricks SQLのクエリーエディタに移動し、お使いのクラウドオブジェクトストアからデータ取り込むためのサンプルSQLコマンドを更新して実行します。Databricks SQLでデータに対するセキュアなアクセスを確立し、クエリーを行うにはNo.4のオプションを確認してください。Databricks SQLでCOPY INTO
になれるには、こちらのクイックスタートチュートリアルを試すこともできます。
あるいは、この記事で説明したCOPY INTO
の機能の大部分を学ぶために、Data Science & EngineeringあるいはMachine Learningワークスペースで、ソースデータとターゲットDeltaテーブルがDBFSに生成されるノートブックを使用することができます。
この他のCOPY INTO
のチュートリアルにはこちらからアクセスすることができます。