1
1

More than 1 year has passed since last update.

DatabricksのCOPY INTOを用いた一般的なデータロードのパターン

Last updated at Posted at 2022-06-24

Common data loading patterns with COPY INTO | Databricks on AWS [2022/6/9時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

ファイルソースからデータをDelta LakeにロードするためにCOPY INTOを用いた一般的なパターンを学びましょう。

COPY INTOの使い方に関しては多くの選択肢があります。また、これらのパターンと組み合わせて、COPY INTOによるデータロードに一時的な認証情報を使うことができます。

全てのオプションのリファレンスに関してはCOPY INTOをご覧ください。

COPY INTOのターゲットテーブルの作成

COPY INTOは存在するDeltaテーブルをターゲットしなくてはなりません。Databricksランタイム11.0以降では、スキーマ進化をサポートするフォーマットでは、これらのスキーマの設定はオプションとなります。

SQL
CREATE TABLE IF NOT EXISTS my_table
[(col_1 col_1_type, col_2 col_2_type, ...)]
[COMMENT <table_description>]
[TBLPROPERTIES (<table_properties>)];

COPY INTOでスキーマを推定するには、追加のオプションを指定する必要があることに注意してください。

SQL
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

以下の例では、my_pipe_dataというスキーマレスのDeltaテーブルを作成し、パイプ区切りでヘッダーを持つCSVをロードします。

SQL
CREATE TABLE IF NOT EXISTS my_pipe_data;

COPY INTO my_pipe_data
  FROM 's3a://my-bucket/pipeData'
  FILEFORMAT = CSV
  FORMAT_OPTIONS ('mergeSchema' = 'true',
                  'delimiter' = '|',
                  'header' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

COPY INTOによるJSONデータのロード

以下の例では、my_json_dataというDeltaテーブルにAWS S3上の5つのファイルからJSONデータをロードします。このテーブルは、COPY INTOを実行する前に作成する必要があります。ある一つのファイルからすでにデータがロードされている場合、そのファイルからのデータは再ロードされません。

SQL
COPY INTO my_json_data
  FROM 's3://my-bucket/jsonData'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

-- The second execution will not copy any data since the first command already loaded the data
COPY INTO my_json_data
  FROM 's3://my-bucket/jsonData'
  FILEFORMAT = JSON
  FILES = ('f1.json', 'f2.json', 'f3.json', 'f4.json', 'f5.json')

COPY INTOによるAvroデータのロード

以下の例では、SELECT文の一部で追加のSQL表現を用いて、Google Cloud StorageのAvroデータをロードしています。

SQL
COPY INTO my_delta_table
  FROM (SELECT to_date(dt) dt, event as measurement, quantity::double
          FROM 'gs://my-bucket/avroData')
  FILEFORMAT = AVRO

COPY INTOによるCSVファイルのロード

以下のサンプルでは、abfss://container@storageAccount.dfs.core.windows.net/base/path/folder1のAzure Data Lake Storage Gen2上のCSVファイルをabfss://container@storageAccount.dfs.core.windows.net/deltaTables/targetにあるDeltaテーブルにロードしています。

SQL
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
  FROM (SELECT key, index, textData, 'constant_value'
          FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'
  FORMAT_OPTIONS('header' = 'true')

-- The example below loads CSV files without headers on ADLS Gen2 using COPY INTO.
-- By casting the data and renaming the columns, you can put the data in the schema you want
COPY INTO delta.`abfss://container@storageAccount.dfs.core.windows.net/deltaTables/target`
  FROM (SELECT _c0::bigint key, _c1::int index, _c2 textData
        FROM 'abfss://container@storageAccount.dfs.core.windows.net/base/path')
  FILEFORMAT = CSV
  PATTERN = 'folder1/file_[a-g].csv'

データをロードしている際に破損しているファイルを無視する

何かしらの破損の問題によってロードしているデータが読み込めない場合、FORMAT_OPTIONSignoreCorruptFilestrueに設定することで、これらのファイルをスキップすることができます。

COPY INTOコマンドは、num_skipped_corrupt_filesカラムで幾つのファイルが破損のためスキップされたのかを返却します。また、このメトリクスはDeltaテーブルにDESCRIBE HISTORYを実行した後のnumSkippedCorruptFilesoperationMetricsカラムにも表示されます。

破損ファイルはCOPY INTOで追跡されないので、破損が修復された際、後続の実行で再読み込みすることが可能です。VALIDATEモードでCOPY INTOを実行する子おtで、どのファイルが破損しているのかを確認することができます。

SQL
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
[VALIDATE ALL]
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true')

注意
ignoreCorruptFilesはDatabricksランタイム11.0以降で利用できます。

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1