0
0

More than 1 year has passed since last update.

COPY INTOでレイクハウスへのデータ取り込みを簡単に

Last updated at Posted at 2023-01-18

Easy Ingestion to Lakehouse With COPY INTO - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

データレイクとして知られる新たなデータ管理アーキテクチャは、膨大なデータに対する直接のAIやBIをサポートするために数多くの企業やユースケースで独立して発生しました。分析や機械学習でデータレイクハウスを活用する際のキーとなる成功要因の一つは、オンプレミスのストレージプラットフォーム(データウェアハウス、メインフレーム)、リアルタイムストリーミングデータ、膨大なデータ資産を含む様々なデータをクイックかつ容易に取り込む能力となります。

レイクハウスへのデータ取り込みはETLパイプラインをフィードする継続的なプロセスなので、様々なフォーマット、タイプ、レーテンシーのデータを取り込むための複数のオプションを必要とすることでしょう。AWS S3、Google Cloud Storage、Azure Data Lake Storageのようなクラウドオブジェクトストレージに格納されているデータに対して、Databricksはデータエンジニアがクラウドストレージから継続的に数百万のファイルを取り込めるネイティブの組み込み機能であるAuto Loaderを提供しています。その他のストリーミングケース(センサーデータやクリックストリームデータなど)においては、Databricksは低レーテンシーでApache KafkaやAzure Event Hubs、AWS Kinesisのような著名なメッセージキューからApache Sparkがクイックにデータを取り込めるネイティブなコネクターを提供しています。さらに、多くの企業ではエンタープライズアプリケーション、データベース、メインフレームなどからレイクハウスに容易にデータを取り込めるように、FivetranのようなDatabricksとインテグレーションされている人気の取り込みツールを活用することができます。最後になりますが、アナリストはすでにどのファイルが処理されたのかを追跡する必要なしに、自動でレイクハウスに新規データをプルするために、シンプルなCOPY INTOコマンドを活用することができます。

本記事では、クラウドオブジェクトストアからDelta Lakeへのファイルのバッチ取り込みを実行できるシンプルですがパワフルなSQLコマンドであるCOPY INTOにフォーカスします。冪等性があるので複数回実行してもexactly-onceセマンティクスを保証し、インクリメンタルな追加とシンプルな変換処理をサポートしています。アドホックに一度のみ実行することも、Databricksワークフローを通じてスケジュールすることもできます。最近のDatabricksランタイムのリリースにおいては、COPY INTOにはデータプレビュー、妥当性検証、強化されたエラーハンドリング、そして、ユーザーがクイックに作業を開始できるようにするためのスキーマレスDelta LakeテーブルへのCOPY INTOを行う新たな手段などの新機能が導入され、クラウドオブジェクトストアからデータを取り込むエンドツーエンドのユーザージャーニーを完結させます。それでは、よくあるCOPY INTOのユースケースから見ていきましょう。

1. 初めてのデータ取り込み

COPY INTOでは、ターゲットのDeltaテーブルにデータを取り込むので、既存のテーブルが必要となります。しかし、データがどのようなものか分からない場合があります。この場合、最初に空のDeltaテーブルを作成します。

SQL
CREATE TABLE my_example_data;

データを書き出す前に、データが適正であることを確認するためにプレビューしたいと考えるかもしれません。COPY INTOのValidateモードは、Databricksランタイム10.3以降の新機能であり、クラウドオブジェクトストレージから大量のファイルを取り込む前にソースデータをプレビュー、検証することができます。

  • データをパースすることができるか。
  • スキーマはターゲットテーブルとマッチしているか、スキーマの進化が必要か。
  • テーブルのnull可能性とチェック制約は満たしているか。
SQL
COPY INTO my_example_data
FROM 's3://my-bucket/exampleData'
FILEFORMAT = CSV
VALIDATE
COPY_OPTIONS ('mergeSchema' = 'true')

データ検証のデフォルトの挙動は、いかなる問題もないことを保証するためにソースディレクトリのすべてのデータをパースするというものですが、プレビューのために返却される行は限定されます。オプションとして、VALIDATEの後にプレビュー行数を指定することができます。

COPY_OPTIONmergeSchemaはターゲットテーブルのスキーマが進化しても問題がないことを指定します。スキーマ進化は新規カラムの追加のみを許可し、既存カラムのデータ型の変更はサポートしていません。他のユースケースにおいては、お使いのデータパイプラインが厳密なスキーマ要件を持っており常にスキーマを進化させたくないため、テーブルスキーマをより厳密に管理したい場合にはこのオプションを除外することができます。しかし、上述のサンプルのターゲットDeltaテーブルは空なので、この時点でテーブルにはカラムはありません。このため、ここではCOPY_OPTIONmergeSchemaを指定しなくてはなりません。

図1: COPY INTOのVALIDATEモードの出力

2. COPY INTOの設定

VALIDATEの結果(図1)の結果を見ると、データが希望している形になっていないことに気づくかもしれません。最初にデータセットをプレビューしておいて良かったと思いませんか?最初に気づくのは、CSVのヘッダーで指定されているようにカラム名が設定されていないということです。さらに悪いことに、ヘッダーがデータの行として表示されています。FORMAT_OPTIONSを指定することで、CSVパーサーを設定することができます。次でこれらを追加しましょう。

SQL
COPY INTO my_example_data
FROM 's3://my-bucket/exampleData'
FILEFORMAT = CSV
VALIDATE
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true', 'mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

FORMAT_OPTIONSを使う際、inferSchemaオプションを指定することでCOPY INTOにCSVファイルのデータ型を推定させるように指示することができます。そうしない場合、デフォルトのデータ型はすべてSTRINGとなります。一方、AVROやPARQUETのようなバイナリーファイルフォーマットでは、これらは自身でスキーマを定義しているのでこのオプションは不要です。別のオプションのmergeSchemaは、単一のCSVファイルではなく包括的なCSVファイルのサンプルからスキーマを推定すべきということを宣言しています。フォーマット固有のオプションの包括的な一覧はドキュメントで確認することができます。

図2ではヘッダーが適切にパースされた検証結果が表示されています。

図2: headerとinferSchemaが有効化されたCOPY INTOのVALIDATEモードの出力

3. Deltaテーブルへのデータの追加

これでプレビューは大丈夫に見えますので、VALIDATEキーワードを削除しCOPY INTOコマンドを実行することができます。

SQL
COPY INTO my_example_data
FROM 's3://my-bucket/exampleData'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true', 'mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

COPY INTOは取り込んだファイルの状態を追跡し続けます。INSERT INTOのようなコマンドと違い、ユーザーはCOPY INTOの冪等性を利用することができ、同じソースデータから複数回COPY INTOを実行しても、ターゲットテーブルにデータの重複が生じないことを意味します。

COPY INTOはアドホックに一度のみ実行することも、Databricksワークフローを通じてスケジュールすることもできます。COPY INTOはネイティブでは低レーテンシーの取り込みをサポートしていませんが、Apache Airflowのようなおーけすトレータを通じてCOPY INTOを起動することができます。

図3: タスクをスケジュールするためのDatabricksワークフローUI

4. COPY INTOによるセキュアなデータアクセス

COPY INTOはいくつかの方法でセキュアなアクセスをサポートしています。このセクションでは、最近のリリースDatabricks SQLとノートブックの両方で利用できる新たな2つのオプションをハイライトします。

Unity Catalog

Databrick Unity Catalogの正式提供によって、COPY INTOでサポートされているソースやデータフォーマットからUnity Catalogのマネージドテーブル、外部テーブルにデータを取り込むためにCOPY INTOを使用することができます。また、Unity Catalogは生データに対するセキュアなアクセスを設定するための新たなオプションを追加しており、クラウドオブジェクトストレージのデータにアクセスするためにUnity Catalogの外部ロケーションやストレージ資格情報を活用することができます。詳細は、Unity CatalogによるデータロードでのCOPY INTOの活用をご覧ください。

一時資格情報

Unity Catalogやインスタンスプロファイルを設定していない場合にはどうなるでしょうか?信頼するサードパーティのバケットからのデータはどうでしょうか?アドホックなバルク取り込みのユースケースに対応するために、インラインの一時資格情報を用いたデータの取り込みを可能とする便利なCOPY INTOの機能があります。

SQL
COPY INTO my_example_data
FROM 's3://my-bucket/exampleDataPath' WITH (
  CREDENTIAL (AWS_ACCESS_KEY = '...', AWS_SECRET_KEY = '...', AWS_SESSION_TOKEN = '...')
)
FILEFORMAT = CSV

5. 取り込みファイルのフィルタリング

ファイル名がパターンにマッチするファイルのサブセットの取り込みはどうでしょうか?ソースディレクトリからロードするファイルを特定するglobパターンを適用することができます。例えば、以下のようにファイル名にraw_dataを含むファイルをフィルタリングし取り込みましょう。

SQL
COPY INTO my_example_data 
  FROM  's3://my-bucket/exampleDataPath' 
  FILEFORMAT = CSV
  PATTERN = '*raw_data*.csv'
  FORMAT_OPTIONS ('header' = 'true')

6. 期間に基づくファイルの取り込み

データエンジニアリングにおいては、特定のタイムスタンプの前後に編集されたファイルを取り込む必要があるというのはよくある話です。2つのタイムスタンプ間のデータも興味の対象となり得ます。COPY INTOのフォーマットオプションmodifiedAftermodifiedBeforeを用いることで、選択した時間ウィンドウのデータをDeltaテーブルに取り込めるようになります。

SQL
COPY INTO my_example_data 
  FROM  's3://my-bucket/exampleDataPath' 
  FILEFORMAT = CSV
  PATTERN = '*raw_data_*.csv'
  FORMAT_OPTIONS('header' = 'true', 'modifiedAfter' = '2022-09-12T10:53:11.000+0000')

7. forceオプションによるデータの修正

COPY INTOはデフォルトでは冪等なので、同じソースファイルに対して同じクエリーを複数回実行しても、初回からターゲットテーブルに変化はありません。現実の環境においては、クラウドストレージのソースデータファイルが修正のために後で変更されることが起こりうるので、変更をターゲットテーブルに伝播させる必要があります。このようなケースでは、ソースから最新のデータを取り込む前に、最初にターゲットテーブルからデータを削除することは可能です。このオペレーションを行うには、コピーオプションのforcetrueに設定するだけです。

SQL
COPY INTO my_example_data
  FROM  's3://my-bucket/exampleDataPath' 
  FILEFORMAT = CSV
  PATTERN = '*raw_data_2022*.csv'
  FORMAT_OPTIONS('header' = 'true')
  COPY_OPTIONS ('force' = 'true')

8. シンプルな変換処理の適用

カラムの名称を変更したいとしたらどうでしょう?あるいは、ソースデータが変更され、以前のカラム名が別の何かに変更されたとしたらどうでしょうか。2つの別々のカラムとしてではなく、単一のカラムとしてデータを取り込みたいと考えることでしょう。COPY INTOSELECTを活用することで、シンプルな変換処理を実行することができます。

SQL
COPY INTO demo.my_example_data
  FROM ( SELECT  concat(first_name, " ", last_name) as full_name, 
                 * EXCEPT (first_name, last_name) 
    FROM 's3://my-bucket/exampleDataPath' 
  )
  FILEFORMAT = CSV
  PATTERN = '*.csv'
  FORMAT_OPTIONS('header' = 'true')
  COPY_OPTIONS ('force' = 'true')

9. COPY INTOによるエラーハンドリングと観測可能性

エラーハンドリング

破損しているデータの取り込みはどうしたらいいのでしょうか?ファイル破損のよくある例は:

  • 不適切なファイルフォーマットのファイル
  • 解凍の失敗
  • 読み込めないファイル(不正なParquetなど)

COPY INTOのフォーマットオプションであるignoreCorruptFilesは、処理中にこれらのファイルをスキップする役に立ちます。COPY INTOコマンドの結果、num_skipped_corrupt_filesカラムにスキップされたファイル数が返却されます。さらに、これらの破損ファイルはCOPY INTOの取り込み状態によってトラックされないので、破損が修正されたら以降の実行でリロードすることができます。このオプションはDatabrikcsラインタイム11.0以降で利用できます。

VALIDATEモードのCOPY INTOを実行することで、破損と検知されたファイルを確認することができます。

SQL
COPY INTO my_example_data
  FROM  's3://my-bucket/exampleDataPath' 
  FILEFORMAT = CSV
  VALIDATE ALL
  FORMAT_OPTIONS('ignoreCorruptFiles' = 'true')

観測可能性

Databricksランタイム10.5ではファイルのメタデータカラムがサポートされ、ユーザーは_metadataという隠しSTRUCTカラムをクエリーすることで、パス、名称、サイズ、変更時刻のような取り込みファイルの主要なプロパティを監視、取得することができます。ターゲットにこの情報を含めるには、COPY INTOのクエリーで明示的に_metadataカラムを参照する必要があります。

SQL
COPY INTO my_example_data 
  FROM (
     SELECT *, _metadata source_metadata FROM 's3://my-bucket/exampleDataPath' 
  )
  FILEFORMAT = CSV

Auto Loderと何が違いますか?

COPY INTOは少数のファイル(数千ファイル以下)を含むソースディレクトリの場合、かつ、SQLを好む場合に使用できるシンプルかつパワフルなコマンドです。さらに、COPY INTOは多くのインテグレーションパートナーによる一般的なパターン同様、Delta Lakeにデータをプッシュする際にJDBC経由で利用することができます。ストリーム、バッチの両方で膨大な数を取り込む際には、Auto Loaderを使うことをお勧めします。また、メダリオンアーキテクチャに基づくモダンデータパイプラインに関しては、宣言型アプローチによる自動エラーハンドリング、品質管理、データリネージ、エクスペクテーションを活用するDelta Live TablesパイプラインでAuto Loaderを使うことをお勧めします。

どうすれば使い始められますか?

使い始めるには、Databricks SQLのクエリーエディタに移動し、お使いのクラウドオブジェクトストアからデータ取り込むためのサンプルSQLコマンドを更新して実行します。Databricks SQLでデータに対するセキュアなアクセスを確立し、クエリーを行うにはNo.4のオプションを確認してください。Databricks SQLでCOPY INTOになれるには、こちらのクイックスタートチュートリアルを試すこともできます。

あるいは、この記事で説明したCOPY INTOの機能の大部分を学ぶために、Data Science & EngineeringあるいはMachine Learningワークスペースで、ソースデータとターゲットDeltaテーブルがDBFSに生成されるノートブックを使用することができます。

この他のCOPY INTOのチュートリアルにはこちらからアクセスすることができます。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0