Ingest data into Delta Lake | Databricks on AWS [2022/8/29時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
DatabricksではDelta Lakeにデータを取り込む方法を多数取り揃えています。
CSVファイルのアップロード
Databricks SQLのCreate Table UIを用いてCSVファイルからテーブルをセキュアに作成することができます。
パートナーインテグレーション
Databricksのパートナーインテグレーションを用いることで、簡単にDatabricksにデータをロードすることができます。これらのインテグレーションによって、様々なソースからDatabricksに対して、ローコード、実装が容易かつスケーラブルなデータ取り込みを実現することができます。詳細はDatabricks integrationsをご覧ください。
COPY INTO
SQLコマンド
COPY INTO
SQLコマンドを用いることで、ファイルの格納場所からDeltaテーブルにデータをロードすることができます。これは再トライ可能で、冪等性のあるオペレーションです。ソースのロケーションにある処理済みのファイルはスキップされます。
以下のケースでは、Auto Loaderの代わりにCOPY INTO
を使用します。
- 数千未満のファイルを含むファイルロケーションからデータをロードしたいケース。
- お使いのデータのスキーマは頻繁に変更されないケース。
- 以前ロードしたファイルのサブセットをロードしようとしているケース。
COPY INTO
SQLコマンドの概要とデモに関しては、本書後半のAuto Loaderと以下のYoutube動画(2分)をご覧ください。
以下の例では、どのようにDeltaテーブルを作成し、サンプルデータセット(databricks-datasets)からテーブルにサンプルデータをロードするのかを説明しています。DatabricksクラスターにアタッチされたノートブックでサンプルのPython、R、Scala、SQLコードを実行することができます。また、Databricks SQLでSQLウェアハウスに関連づけられたクエリーの中でSQLコードを実行することもできます。
注意
以下のコードサンプルのいくつかでは、スキーマ(データベース)とテーブル・ビューで構成される2レベルの名前空間(例default.people10m
)を使用しています。これらのサンプルをUnity Catalogで実行するには2レベルの名前空間を、カタログ、スキーマ、テーブル・ビューから構成されるUnity Catalogの3レベルの名前空間(例main.default.people10m
)に置き換えてください。詳細は、Use three-level namespace notation with Unity Catalogをご覧ください。
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
クリーンアップするには、テーブルを削除する以下のコードを実行します。
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
より詳細やサンプルについては、以下をご覧ください。
- Databricksランタイム7.x以降: DatabricksのCOPY INTOコマンド
Auto Loader
Auto Loaderは追加のセットアップなしに、クラウドストレージに新規のデータファイルが到着すると、インクリメンタルかつ効率的にファイルを処理します。Auto LoaderはcloudFiles
という新たな構造化ストリーミングソースを提供します。クラウドファイルストレージの入力ディレクトリパスが指定されると、cloudFiles
ソースは、データが到着すると新規ファイルを自動で処理します。オプションで既にディレクトリにあるファイルを処理することも可能です。
以下のケースでは、COPY INTO SQLコマンドの代わりにAuto Loaderを使用します。
- 数百万以上の規模のファイルを含むファイルロケーションからデータをロードしたいケース。Auto Loaderは、
COPY INTO
SQLコマンドよりもさらに効率的にファイルを発見し、複数のバッチにファイル処理を分割することができます。 - データスキーマが頻繁に変更されるケース。Auto Loaderはスキーマ推定とスキーマ進化に対して優れたサポートを提供します。詳細は、Auto Loaderにおけるスキーマ推定とスキーマ進化の設定をご覧下さい。
- 以前アップロードしたファイルのサブセットをロードする計画がないケース。Auto Loaderではファイルのサブセットを再処理することが困難となります。しかし、Auto Loaderのストリームが実行されるのと並行して、ファイルのサブセットを理ロードするために
COPY INTO
コマンドを使用することができます。
Auto Loaderの概要とデモに関しては、本書前半のCOPY INTO SQLコマンドと以下のYoutube動画(2分)をご覧ください。
Auto Loaderの更なる概要説明とデモに関しては、こちらのYouTube動画(59分)をご覧ください。
注意
以下のコードサンプルのいくつかでは、スキーマ(データベース)とテーブル・ビューで構成される2レベルの名前空間(例default.people10m
)を使用しています。これらのサンプルをUnity Catalogで実行するには2レベルの名前空間を、カタログ、スキーマ、テーブル・ビューから構成されるUnity Catalogの3レベルの名前空間(例main.default.people10m
)に置き換えてください。詳細は、Use three-level namespace notation with Unity Catalogをご覧ください。
以下のサンプルコードでは、クラウドストレージに新規データファイルが到着するとどのようにAuto Loaderが検知するのかを説明しています。Databricksクラスターにアタッチされたノートブックでサンプルコードを実行することができます。
-
以下のようにファイルアップロードディレクトリを作成します。
Pythonuser_dir = '<my-name>@<my-organization.com>' upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload" dbutils.fs.mkdirs(upload_path)
Scalaval user_dir = "<my-name>@<my-organization.com>" val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload" dbutils.fs.mkdirs(upload_path)
-
以下のサンプルCSVファイルを作成し、DBFSファイルブラウザを用いてファイルアップロードディレクトリにアップロードします。
WA.csv
:city,year,population Seattle metro,2019,3406000 Seattle metro,2020,3433000
OR.csv
:city,year,population Portland metro,2019,2127000 Portland metro,2020,2151000
-
Auto Loaderを起動するために以下のコードを実行します。
Pythoncheckpoint_path = '/tmp/delta/population_data/_checkpoints' write_path = '/tmp/delta/population_data' # Set up the stream to begin reading incoming files from the # upload_path location. df = spark.readStream.format('cloudFiles') \ .option('cloudFiles.format', 'csv') \ .option('header', 'true') \ .schema('city string, year int, population long') \ .load(upload_path) # Start the stream. # Use the checkpoint_path location to keep a record of all files that # have already been uploaded to the upload_path location. # For those that have been uploaded since the last check, # write the newly-uploaded files' data to the write_path location. df.writeStream.format('delta') \ .option('checkpointLocation', checkpoint_path) \ .start(write_path)
Scalaval checkpoint_path = "/tmp/delta/population_data/_checkpoints" val write_path = "/tmp/delta/population_data" // Set up the stream to begin reading incoming files from the // upload_path location. val df = spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") .option("header", "true") .schema("city string, year int, population long") .load(upload_path) // Start the stream. // Use the checkpoint_path location to keep a record of all files that // have already been uploaded to the upload_path location. // For those that have been uploaded since the last check, // write the newly-uploaded files' data to the write_path location. df.writeStream.format("delta") .option("checkpointLocation", checkpoint_path) .start(write_path)
-
ステップ3のコードが動いている間に、書き込み先のディレクトリにクエリーを行うために以下のコードを実行します。
Pythondf_population = spark.read.format('delta').load(write_path) display(df_population) ''' Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ '''
Scalaval df_population = spark.read.format("delta").load(write_path) display(df_population) /* Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ */
-
ステップ3のコードが動いている間に、以下の追加のCSVファイルを作成し、DBFSファイルブラウザを用いてファイルアップロードディレクトリにアップロードします。
ID.csv
:city,year,population Boise,2019,438000 Boise,2020,447000
MT.csv
:city,year,population Helena,2019,81653 Helena,2020,82590
Misc.csv
:city,year,population Seattle metro,2021,3461000 Portland metro,2021,2174000 Boise,2021,455000 Helena,2021,81653
-
ステップ3のコードが動いている間に、書き込み先のディレクトリにクエリーを行うために以下のコードを実行すると、Auto Loaderがアップロードディレクトリで新規データを検知し、書き込みディレクトリに書き込みを行っていることがわかります。
Pythondf_population = spark.read.format('delta').load(write_path) display(df_population) ''' Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Helena | 2019 | 81653 | +----------------+------+------------+ | Helena | 2020 | 82590 | +----------------+------+------------+ | Boise | 2019 | 438000 | +----------------+------+------------+ | Boise | 2020 | 447000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ | Seattle metro | 2021 | 3461000 | +----------------+------+------------+ | Portland metro | 2021 | 2174000 | +----------------+------+------------+ | Boise | 2021 | 455000 | +----------------+------+------------+ | Helena | 2021 | 81653 | +----------------+------+------------+ '''
Scalaval df_population = spark.read.format("delta").load(write_path) display(df_population) /* Result +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Helena | 2019 | 81653 | +----------------+------+------------+ | Helena | 2020 | 82590 | +----------------+------+------------+ | Boise | 2019 | 438000 | +----------------+------+------------+ | Boise | 2020 | 447000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ | Seattle metro | 2021 | 3461000 | +----------------+------+------------+ | Portland metro | 2021 | 2174000 | +----------------+------+------------+ | Boise | 2021 | 455000 | +----------------+------+------------+ | Helena | 2021 | 81653 | +----------------+------+------------+ */
-
クリーンアップするには、ステップ3で実行しているコードをキャンセルし、アップロードディレクトリ、チェックポイントディレクトリ、書き込みディレクトリを削除する以下のコードを実行します。
Pythondbutils.fs.rm(write_path, true) dbutils.fs.rm(upload_path, true)
Scaladbutils.fs.rm(write_path, true) dbutils.fs.rm(upload_path, true)