Common data loading patterns with COPY INTO | Databricks on AWS [2022/6/9時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
ファイルソースからデータをDelta LakeにロードするためにCOPY INTOを用いた一般的なパターンを学びましょう。
COPY INTOの使い方に関しては多くの選択肢があります。また、これらのパターンと組み合わせて、COPY INTOによるデータロードに一時的な認証情報を使うことができます。
全てのオプションのリファレンスに関してはCOPY INTOをご覧ください。
COPY INTOのターゲットテーブルの作成
COPY INTO
は存在するDeltaテーブルをターゲットしなくてはなりません。Databricksランタイム11.0以降では、スキーマ進化をサポートするフォーマットでは、これらのスキーマの設定はオプションとなります。
CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table_description>]
[TBLPROPERTIES (<table_properties>)];
COPY INTOでスキーマを推定するには、追加のオプションを指定する必要があることに注意してください。
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
以下の例では、my_pipe_data
というスキーマレスのDeltaテーブルを作成し、パイプ区切りでヘッダーを持つCSVをロードします。
CREATE TABLE IF NOT EXISTS my_pipe_data;
COPY INTO my_pipe_data
FROM 's3a://my-bucket/pipeData'
FILEFORMAT = CSV
FORMAT_OPTIONS ('mergeSchema' = 'true',
'delimiter' = '|',
'header' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
COPY INTOによるJSONデータのロード
以下の例では、my_json_data
というDeltaテーブルにAWS S3上の5つのファイルからJSONデータをロードします。このテーブルは、COPY INTO
を実行する前に作成する必要があります。ある一つのファイルからすでにデータがロードされている場合、そのファイルからのデータは再ロードされません。
COPY INTO my_json_data
FROM 's3://my-bucket/jsonData'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
FROM 's3://my-bucket/jsonData'
FILEFORMAT = JSON
FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')
COPY INTOによるAvroデータのロード
以下の例では、SELECT文の一部で追加のSQL表現を用いて、Google Cloud StorageのAvroデータをロードしています。
COPY INTO my_delta_table
FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
FROM 'gs://my-bucket/avroData')
FILEFORMAT = AVRO
COPY INTOによるCSVファイルのロード
以下のサンプルでは、abfss://container@storageAccount.dfs.core.windows.net/base/path/folder1
のAzure Data Lake Storage Gen2上のCSVファイルをabfss://container@storageAccount.dfs.core.windows.net/deltaTables/target
にあるDeltaテーブルにロードしています。
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (SELECT key, index, textData, 'constant_value'
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
FORMAT_OPTIONS('header' = 'true')
-- The example below loads CSV files without headers on ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
FILEFORMAT = CSV
PATTERN = 'folder1/file_[a-g].csv'
データをロードしている際に破損しているファイルを無視する
何かしらの破損の問題によってロードしているデータが読み込めない場合、FORMAT_OPTIONS
でignoreCorruptFiles
をtrue
に設定することで、これらのファイルをスキップすることができます。
COPY INTO
コマンドは、num_skipped_corrupt_files
カラムで幾つのファイルが破損のためスキップされたのかを返却します。また、このメトリクスはDeltaテーブルにDESCRIBE HISTORY
を実行した後のnumSkippedCorruptFiles
のoperationMetrics
カラムにも表示されます。
破損ファイルはCOPY INTO
で追跡されないので、破損が修復された際、後続の実行で再読み込みすることが可能です。VALIDATE
モードでCOPY INTO
を実行する子おtで、どのファイルが破損しているのかを確認することができます。
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')
注意
ignoreCorruptFiles
はDatabricksランタイム11.0以降で利用できます。