0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spark SQLを用いたCOPY INTOによるテーブルへのデータのバルクロード

Last updated at Posted at 2022-06-25

Bulk load data into a table with COPY INTO with Spark SQL | Databricks on AWS [2022/6/10時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

数千のファイルを含むデータソースからインクリメンタル、あるいはバルクでデータをロードする際にはCOPY INTOコマンドを使用することをお勧めします。高度なユースケースではAuto Loaderを使用することをお勧めします。

このチュートリアルでは、クラウドオブジェクトストレージからお使いのDatabricksワークスペースにデータをロードするためにCOPY INTOコマンドを使用します。

要件

  1. Databricksアカウントとアカウント内にあるDatabricksワークスペース。これらを作成するには、フリートライアルへのサインアップをご覧ください。
  2. Databricksランタイム11.0以降が稼働しているall-purposeクラスター。all-purposeクラスターの作成方法に関しては、クラスターの作成を参照ください。
  3. Databricksワークスペースのユーザーインタフェースの習熟。ワークスペースのナビゲートを参照ください。
  4. ノートブックの操作の習熟。
  5. データを書き込める場所。このデモではサンプルとしてDBFSルートを使用しますが、Unity Catalogで設定された外部ストレージロケーションをお勧めします。

ステップ1: 環境を設定しデータ生成器を作成する

このチュートリアルでは、Databricksの基本的な操作方法を理解していることと、デフォルトのワークスペース設定を前提としています。指定されたコードを実行できない場合、計算リソースやデータを書き込む場所に対してアクセス権があることを確認するためにワークスペース管理者に問い合わせてください。

COPY INTOのデータソースとして設定する場所を指定するために、コードではsourceパラメーターを使用することに注意してください。上述したように、このコードはDBFSルートをポイントします。外部のオブジェクトストレージに書き込みアクセス権を持っているのであれば、スース文字列のdbfs:/の部分を、お使いのオブジェクトストレージのパスに置き換えてください。また、このコードブロックは、このデモをリセットするために再起的な削除を行うので、プロダクションデータをポイントしていないこと、既存データの上書き削除を避けるためにネストされた/user/{username}/copy-into-demoを維持していることを確認してください。

  1. 新規のSQLノートブックを作成し、Databricksランタイム11.0以降が稼働しているクラスターにアタッチします。

  2. このチュートリアルで使用するストレージの格納場所とデータベースをリセットするために以下のコードを実行します。

    Python
    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. いくつかのテーブルとランダムにデータを生成する関数を設定するために以下のコードを実行します。

    SQL
    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

ステップ2: クラウドストレージにサンプルデータを書き込む

DatabricksにおいてはDelta Lake以外のデータフォーマットでデータを書き込むことは稀です。ここで使用するコードは、別のシステムの結果をオブジェクトストレージにダンプする外部システムを模して、JSONに書き込みを行います。

生のJSONデータのバッチを書き込むために以下のコードをコピーして実行します。

SQL
-- Write a new batch of data to the data source

INSERT INTO user_ping_raw
SELECT *,
  get_ping() ping,
  current_timestamp() time
FROM user_ids
WHERE is_active()=true;

ステップ3: JSONデータを冪等性を持ってロードするためにCOPY INTOを使う

COPY INTOを使用する前には、ターゲットのDelta Lakeテーブルを作成する必要があります。Databricksランタイム11.0以降では、CREATE TABLE文でテーブル名以外を指定する必要はありません。以前のDatabrikcsランタイムでは、空のテーブルを作成する際にスキーマを指定しなくてはなりません。

ターゲットDeltaテーブルを作成し、ソースからデータをロードするために以下のコードをコピーして実行します。

SQL
-- Create target table and load data

CREATE TABLE IF NOT EXISTS user_ping_target;

COPY INTO user_ping_target
FROM ${c.source}
FILEFORMAT = JSON
FORMAT_OPTIONS ("mergeSchema" = "true")
COPY_OPTIONS ("mergeSchema" = "true")

このアクションは冪等性があるので、複数回実行することができますが、データは一度だけロードされます。

ステップ4: テーブルの中身をプレビューする

このテーブルの中身を手動でき確認するために、シンプルなSQLクエリーを実行することができます。

テーブルの中身を確認するために以下のコードをコピーして実行します。

SQL
-- Review updated table

SELECT * FROM user_ping_target

ステップ5: より多くのデータをロードし結果をプレビューする

ソースにランダムな生のJSONデータの新規バッチを到着させるために、ステップ2-4を複数回再実行することができ、COPY INTOを用いてDeltaテーブルに冪等性を持ってロードし、結果を確認することができます。順序を入れ替えてこれらのステップを試してみたり、生データのバッチが複数回書き込まれるのをシミュレートするために複数回実行したり、新規データが到着していない状態でCOPY INTOを複数回実行してみてください。

ステップ6: チュートリアルをクリーンアップする

このチュートリアルが完了したら、不要であれば関連リソースを削除します。

  1. データベース、テーブル、全てのデータを削除するために以下のコードをコピーして実行します。

    Python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. 計算リソースを停止するためにClustersタブに移動し、クラスターをTerminateします。

その他のリソース

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?