0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksにおけるDelta Lakeへのデータ取り込み

Posted at

Ingest data into Delta Lake | Databricks on AWS [2022/8/29時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

DatabricksではDelta Lakeにデータを取り込む方法を多数取り揃えています。

CSVファイルのアップロード

Databricks SQLのCreate Table UIを用いてCSVファイルからテーブルをセキュアに作成することができます。

パートナーインテグレーション

Databricksのパートナーインテグレーションを用いることで、簡単にDatabricksにデータをロードすることができます。これらのインテグレーションによって、様々なソースからDatabricksに対して、ローコード、実装が容易かつスケーラブルなデータ取り込みを実現することができます。詳細はDatabricks integrationsをご覧ください。

COPY INTO SQLコマンド

COPY INTO SQLコマンドを用いることで、ファイルの格納場所からDeltaテーブルにデータをロードすることができます。これは再トライ可能で、冪等性のあるオペレーションです。ソースのロケーションにある処理済みのファイルはスキップされます。

以下のケースでは、Auto Loaderの代わりにCOPY INTOを使用します。

  • 数千未満のファイルを含むファイルロケーションからデータをロードしたいケース。
  • お使いのデータのスキーマは頻繁に変更されないケース。
  • 以前ロードしたファイルのサブセットをロードしようとしているケース。

COPY INTO SQLコマンドの概要とデモに関しては、本書後半のAuto Loaderと以下のYoutube動画(2分)をご覧ください。

以下の例では、どのようにDeltaテーブルを作成し、サンプルデータセット(databricks-datasets)からテーブルにサンプルデータをロードするのかを説明しています。DatabricksクラスターにアタッチされたノートブックでサンプルのPython、R、Scala、SQLコードを実行することができます。また、Databricks SQLSQLウェアハウスに関連づけられたクエリーの中でSQLコードを実行することもできます。

注意
以下のコードサンプルのいくつかでは、スキーマ(データベース)とテーブル・ビューで構成される2レベルの名前空間(例default.people10m)を使用しています。これらのサンプルをUnity Catalogで実行するには2レベルの名前空間を、カタログ、スキーマ、テーブル・ビューから構成されるUnity Catalogの3レベルの名前空間(例main.default.people10m)に置き換えてください。詳細は、Use three-level namespace notation with Unity Catalogをご覧ください。

Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

クリーンアップするには、テーブルを削除する以下のコードを実行します。

Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload

より詳細やサンプルについては、以下をご覧ください。

Auto Loader

Auto Loaderは追加のセットアップなしに、クラウドストレージに新規のデータファイルが到着すると、インクリメンタルかつ効率的にファイルを処理します。Auto LoaderはcloudFilesという新たな構造化ストリーミングソースを提供します。クラウドファイルストレージの入力ディレクトリパスが指定されると、cloudFilesソースは、データが到着すると新規ファイルを自動で処理します。オプションで既にディレクトリにあるファイルを処理することも可能です。

以下のケースでは、COPY INTO SQLコマンドの代わりにAuto Loaderを使用します。

  • 数百万以上の規模のファイルを含むファイルロケーションからデータをロードしたいケース。Auto Loaderは、COPY INTO SQLコマンドよりもさらに効率的にファイルを発見し、複数のバッチにファイル処理を分割することができます。
  • データスキーマが頻繁に変更されるケース。Auto Loaderはスキーマ推定とスキーマ進化に対して優れたサポートを提供します。詳細は、Auto Loaderにおけるスキーマ推定とスキーマ進化の設定をご覧下さい。
  • 以前アップロードしたファイルのサブセットをロードする計画がないケース。Auto Loaderではファイルのサブセットを再処理することが困難となります。しかし、Auto Loaderのストリームが実行されるのと並行して、ファイルのサブセットを理ロードするためにCOPY INTOコマンドを使用することができます。

Auto Loaderの概要とデモに関しては、本書前半のCOPY INTO SQLコマンドと以下のYoutube動画(2分)をご覧ください。

Auto Loaderの更なる概要説明とデモに関しては、こちらのYouTube動画(59分)をご覧ください。

注意
以下のコードサンプルのいくつかでは、スキーマ(データベース)とテーブル・ビューで構成される2レベルの名前空間(例default.people10m)を使用しています。これらのサンプルをUnity Catalogで実行するには2レベルの名前空間を、カタログ、スキーマ、テーブル・ビューから構成されるUnity Catalogの3レベルの名前空間(例main.default.people10m)に置き換えてください。詳細は、Use three-level namespace notation with Unity Catalogをご覧ください。

以下のサンプルコードでは、クラウドストレージに新規データファイルが到着するとどのようにAuto Loaderが検知するのかを説明しています。Databricksクラスターにアタッチされたノートブックでサンプルコードを実行することができます。

  1. 以下のようにファイルアップロードディレクトリを作成します。

    Python
    user_dir = '<my-name>@<my-organization.com>'
    upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    
    Scala
    val user_dir = "<my-name>@<my-organization.com>"
    val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    
    dbutils.fs.mkdirs(upload_path)
    
  2. 以下のサンプルCSVファイルを作成し、DBFSファイルブラウザを用いてファイルアップロードディレクトリにアップロードします。

    WA.csv:

    city,year,population
    Seattle metro,2019,3406000
    Seattle metro,2020,3433000
    

    OR.csv:

    city,year,population
    Portland metro,2019,2127000
    Portland metro,2020,2151000
    
  3. Auto Loaderを起動するために以下のコードを実行します。

    Python
    checkpoint_path = '/tmp/delta/population_data/_checkpoints'
    write_path = '/tmp/delta/population_data'
    
    # Set up the stream to begin reading incoming files from the
    # upload_path location.
    df = spark.readStream.format('cloudFiles') \
      .option('cloudFiles.format', 'csv') \
      .option('header', 'true') \
      .schema('city string, year int, population long') \
      .load(upload_path)
    
    # Start the stream.
    # Use the checkpoint_path location to keep a record of all files that
    # have already been uploaded to the upload_path location.
    # For those that have been uploaded since the last check,
    # write the newly-uploaded files' data to the write_path location.
    df.writeStream.format('delta') \
      .option('checkpointLocation', checkpoint_path) \
      .start(write_path)
    
    Scala
    val checkpoint_path = "/tmp/delta/population_data/_checkpoints"
    val write_path = "/tmp/delta/population_data"
    
    // Set up the stream to begin reading incoming files from the
    // upload_path location.
    val df = spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", "true")
      .schema("city string, year int, population long")
      .load(upload_path)
    
    // Start the stream.
    // Use the checkpoint_path location to keep a record of all files that
    // have already been uploaded to the upload_path location.
    // For those that have been uploaded since the last check,
    // write the newly-uploaded files' data to the write_path location.
    df.writeStream.format("delta")
      .option("checkpointLocation", checkpoint_path)
      .start(write_path)
    
  4. ステップ3のコードが動いている間に、書き込み先のディレクトリにクエリーを行うために以下のコードを実行します。

    Python
    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    '''
    
    Scala
    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    */
    
  5. ステップ3のコードが動いている間に、以下の追加のCSVファイルを作成し、DBFSファイルブラウザを用いてファイルアップロードディレクトリにアップロードします。

    ID.csv:

    city,year,population
    Boise,2019,438000
    Boise,2020,447000
    

    MT.csv:

    city,year,population
    Helena,2019,81653
    Helena,2020,82590
    

    Misc.csv:

    city,year,population
    Seattle metro,2021,3461000
    Portland metro,2021,2174000
    Boise,2021,455000
    Helena,2021,81653
    
  6. ステップ3のコードが動いている間に、書き込み先のディレクトリにクエリーを行うために以下のコードを実行すると、Auto Loaderがアップロードディレクトリで新規データを検知し、書き込みディレクトリに書き込みを行っていることがわかります。

    Python
    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    '''
    
    Scala
    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    */
    
  7. クリーンアップするには、ステップ3で実行しているコードをキャンセルし、アップロードディレクトリ、チェックポイントディレクトリ、書き込みディレクトリを削除する以下のコードを実行します。

    Python
    dbutils.fs.rm(write_path, true)
    dbutils.fs.rm(upload_path, true)
    
    Scala
    dbutils.fs.rm(write_path, true)
    dbutils.fs.rm(upload_path, true)
    

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?