0
0

More than 1 year has passed since last update.

Delta Live Tablesへのデータの取り込み

Posted at

Ingest data into Delta Live Tables | Databricks on AWSの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

データセットを作成するために、以下の外部データソースを使用することができます。

  • Databricksランタイムが直接サポートしているすべてのデータソース

  • Azure Data Lake Storage Gen2 (ADLS Gen2)、AWS S3、Google Cloud Storage (GCS)のようなクラウドストレージの任意のファイル。

  • DBFSに格納されている任意のファイル。

    クラウドストレージへのアクセスの設定に関しては、クラウドストレージの設定をご覧ください。

サポートされるファイルフォーマット、特に連続的に到着するデータを操作するストリーミングライブテーブルにおいては、パイプラインでAuto Loaderを使用することをお勧めします。Auto Loaderはスケーラブルで効率的であり、スキーマ推定をサポートしています。

Pythonデータセットでは、Auto Loaderでサポートされていないファイルフォーマットからバッチ処理でデータを読み込むために、Apache Sparkのビルトインのファイルデータソースを活用することができます。

SQLデータセットでは、Auto Loaderでサポートされていないファイルフォーマットからバッチ処理でデータを読み込むために、Delta Live Tablesのファイルソースを活用することができます。

Auto Loader

以下のサンプルでは、CSVファイルとJSONファイルからデータセットを作成するためにAuto Loaderを活用しています。

Python
@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
SQL
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Auto Loaderではサポートされているフォーマットオプションを使用することができます。map()関数を用いることで、cloud_files()メソッドにさまざまななオプションを指定することができます。オプションはキーバリューのペアであり、キーとバリューは文字列となります。以下では、SQLでAuto Loaderを操作する際の構文を示しています。

SQL
CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
  FROM cloud_files(
    "<file_path>",
    "<file_format>",
    map(
      "<option_key>", "<option_value",
      "<option_key>", "<option_value",
      ...
    )
  )

以下のサンプルでは、ヘッダーありのタブ区切りのCSVファイルからデータを読み込んでいます。

SQL
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

手動でフォーマットを指定するためにschemaを使用することができます。スキーマ推定をサポートしていないフォーマットではschemaを指定しなくてはなりません。

Python
@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )
SQL
CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

注意
ファイルを読み込むためにAuto Loaderを使用する際、Delta Live Tablesは自動でスキーマやチェックポイントのディレクトリを設定、管理します。これらのディレクトリのいずれかを手動で設定し、フルリフレッシュを実行しても、設定されたディレクトリのコンテンツには影響を及ぼしません。処理過程で期待しない副作用を避けるために、自動で設定されるディレクトリを使用することをお勧めします。

Apache Sparkファイルソース

Pythonでデータセットを定義し、バッチ処理でファイルを読み込むには、標準的なPySpark関数を使用することができます。以下のサンプルでは、PySparkのspark.read.format("parquet").load()関数を用いて、ファイルからParquetデータを読み込んでいます。

Python
@dlt.table
def lendingclub_raw_data():
  return (
    spark.read.format("parquet").load("/databricks-datasets/samples/lending_club/parquet/")
  )

Spark SQLファイルソース

SQLでデータセットを定義し、バッチ処理でファイルを読み込むには、Spark SQLの構文を使用することができます。以下のサンプルでは、ファイルからParquetデータを読み込んでいます。

SQL
CREATE OR REFRESH LIVE TABLE customers
AS SELECT * FROM parquet.`/databricks-datasets/samples/lending_club/parquet/`

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0