Bulk load data into a table with COPY INTO with Spark SQL | Databricks on AWS [2022/6/10時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
数千のファイルを含むデータソースからインクリメンタル、あるいはバルクでデータをロードする際にはCOPY INTOコマンドを使用することをお勧めします。高度なユースケースではAuto Loaderを使用することをお勧めします。
このチュートリアルでは、クラウドオブジェクトストレージからお使いのDatabricksワークスペースにデータをロードするためにCOPY INTO
コマンドを使用します。
要件
- Databricksアカウントとアカウント内にあるDatabricksワークスペース。これらを作成するには、フリートライアルへのサインアップをご覧ください。
- Databricksランタイム11.0以降が稼働しているall-purposeクラスター。all-purposeクラスターの作成方法に関しては、クラスターの作成を参照ください。
- Databricksワークスペースのユーザーインタフェースの習熟。ワークスペースのナビゲートを参照ください。
- ノートブックの操作の習熟。
- データを書き込める場所。このデモではサンプルとしてDBFSルートを使用しますが、Unity Catalogで設定された外部ストレージロケーションをお勧めします。
ステップ1: 環境を設定しデータ生成器を作成する
このチュートリアルでは、Databricksの基本的な操作方法を理解していることと、デフォルトのワークスペース設定を前提としています。指定されたコードを実行できない場合、計算リソースやデータを書き込む場所に対してアクセス権があることを確認するためにワークスペース管理者に問い合わせてください。
COPY INTO
のデータソースとして設定する場所を指定するために、コードではsource
パラメーターを使用することに注意してください。上述したように、このコードはDBFSルートをポイントします。外部のオブジェクトストレージに書き込みアクセス権を持っているのであれば、スース文字列のdbfs:/
の部分を、お使いのオブジェクトストレージのパスに置き換えてください。また、このコードブロックは、このデモをリセットするために再起的な削除を行うので、プロダクションデータをポイントしていないこと、既存データの上書き削除を避けるためにネストされた/user/{username}/copy-into-demo
を維持していることを確認してください。
-
新規のSQLノートブックを作成し、Databricksランタイム11.0以降が稼働しているクラスターにアタッチします。
-
このチュートリアルで使用するストレージの格納場所とデータベースをリセットするために以下のコードを実行します。
Python%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
-
いくつかのテーブルとランダムにデータを生成する関数を設定するために以下のコードを実行します。
SQL-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
ステップ2: クラウドストレージにサンプルデータを書き込む
DatabricksにおいてはDelta Lake以外のデータフォーマットでデータを書き込むことは稀です。ここで使用するコードは、別のシステムの結果をオブジェクトストレージにダンプする外部システムを模して、JSONに書き込みを行います。
生のJSONデータのバッチを書き込むために以下のコードをコピーして実行します。
-- Write a new batch of data to the data source
INSERT INTO user_ping_raw
SELECT *,
get_ping() ping,
current_timestamp() time
FROM user_ids
WHERE is_active()=true;
ステップ3: JSONデータを冪等性を持ってロードするためにCOPY INTOを使う
COPY INTO
を使用する前には、ターゲットのDelta Lakeテーブルを作成する必要があります。Databricksランタイム11.0以降では、CREATE TABLE
文でテーブル名以外を指定する必要はありません。以前のDatabrikcsランタイムでは、空のテーブルを作成する際にスキーマを指定しなくてはなりません。
ターゲットDeltaテーブルを作成し、ソースからデータをロードするために以下のコードをコピーして実行します。
-- Create target table and load data
CREATE TABLE IF NOT EXISTS user_ping_target;
COPY INTO user_ping_target
FROM ${c.source}
FILEFORMAT = JSON
FORMAT_OPTIONS ("mergeSchema" = "true")
COPY_OPTIONS ("mergeSchema" = "true")
このアクションは冪等性があるので、複数回実行することができますが、データは一度だけロードされます。
ステップ4: テーブルの中身をプレビューする
このテーブルの中身を手動でき確認するために、シンプルなSQLクエリーを実行することができます。
テーブルの中身を確認するために以下のコードをコピーして実行します。
-- Review updated table
SELECT * FROM user_ping_target
ステップ5: より多くのデータをロードし結果をプレビューする
ソースにランダムな生のJSONデータの新規バッチを到着させるために、ステップ2-4を複数回再実行することができ、COPY INTO
を用いてDeltaテーブルに冪等性を持ってロードし、結果を確認することができます。順序を入れ替えてこれらのステップを試してみたり、生データのバッチが複数回書き込まれるのをシミュレートするために複数回実行したり、新規データが到着していない状態でCOPY INTO
を複数回実行してみてください。
ステップ6: チュートリアルをクリーンアップする
このチュートリアルが完了したら、不要であれば関連リソースを削除します。
-
データベース、テーブル、全てのデータを削除するために以下のコードをコピーして実行します。
Python# Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
-
計算リソースを停止するためにClustersタブに移動し、クラスターをTerminateします。
その他のリソース
- COPY INTOのリファレンス