Load data with COPY INTO | Databricks on AWS [2022/6/9時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
SQLコマンドCOPY INTO
を用いることで、ファイルの格納場所からDeltaテーブルにデータをロードすることができます。これは再トライ可能で冪等性のあるオペレーションです。ソースの場所にあるすでにロードされたファイルはスキップされます。
COPY INTOではCOPY INTOでのデータロードに一時的な認証情報を使うことを含み、いくつかの方法でのセキュアなアクセスをサポートしています。
空のDelta Lakeテーブル
注意
この機能はDatabricksランタイム11.0以降で利用できます。
COPY INTO
コマンドの実行の際、後ほどスキーマを推定できるように空のプレースホルダーのDeltaテーブルを作成することができます。
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table_description>]
[TBLPROPERTIES (<table_properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
上のSQL文は冪等性があり、Deltaテーブルに確実に一度だけ(exactly-once)データを取り込むようにスケジュールすることができます。
注意
空のDeltaテーブルはCOPY INTO
以外では使用できません。INSERT INTO
やMERGE INTO
ではスキーマレスのDeltaテーブルにデータを書き込むことをサポートしていません。COPY INTO
でテーブルデータがインサートされた後は、テーブルがクエリー可能になります。
COPY INTOのターゲットテーブルの作成をご覧ください。
サンプル
一般的な利用パターンについては、DatabricksのCOPY INTOを用いた一般的なデータロードのパターンをご覧ください。
以下の例では、サンプルデータセット(databricks-datasets)のサンプルデータをテーブルにロードするために、どのようにDeltaテーブルを作成し、COPY INTO
を使うのかを示しています。DatabricksクラスターにアタッチされたノートブックからサンプルのPython、R、Scala、SQLコードを実行することができます。また、Databricks SQLのSQLエンドポイントに関連づけられたクエリーからSQLを実行することもできます。
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
クリーアップするには、テーブルを削除する以下のコードを実行します。
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
チュートリアル
- Bulk load data into a table with COPY INTO in Databricks SQL
- Bulk load data into a table with COPY INTO with Spark SQL
リファレンス
- Databricksランタイム7.x以降: DatabricksのCOPY INTOコマンド
- Databricksランタイム5.5LTSおよび6.x: Copy Into (Delta Lake on Databricks)