Ingest data into Delta Live Tables | Databricks on AWSの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
データセットを作成するために、以下の外部データソースを使用することができます。
-
Databricksランタイムが直接サポートしているすべてのデータソース。
-
Azure Data Lake Storage Gen2 (ADLS Gen2)、AWS S3、Google Cloud Storage (GCS)のようなクラウドストレージの任意のファイル。
-
DBFSに格納されている任意のファイル。
クラウドストレージへのアクセスの設定に関しては、クラウドストレージの設定をご覧ください。
サポートされるファイルフォーマット、特に連続的に到着するデータを操作するストリーミングライブテーブルにおいては、パイプラインでAuto Loaderを使用することをお勧めします。Auto Loaderはスケーラブルで効率的であり、スキーマ推定をサポートしています。
Pythonデータセットでは、Auto Loaderでサポートされていないファイルフォーマットからバッチ処理でデータを読み込むために、Apache Sparkのビルトインのファイルデータソースを活用することができます。
SQLデータセットでは、Auto Loaderでサポートされていないファイルフォーマットからバッチ処理でデータを読み込むために、Delta Live Tablesのファイルソースを活用することができます。
Auto Loader
以下のサンプルでは、CSVファイルとJSONファイルからデータセットを作成するためにAuto Loaderを活用しています。
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Auto Loaderではサポートされているフォーマットオプションを使用することができます。map()
関数を用いることで、cloud_files()
メソッドにさまざまななオプションを指定することができます。オプションはキーバリューのペアであり、キーとバリューは文字列となります。以下では、SQLでAuto Loaderを操作する際の構文を示しています。
CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
FROM cloud_files(
"<file_path>",
"<file_format>",
map(
"<option_key>", "<option_value",
"<option_key>", "<option_value",
...
)
)
以下のサンプルでは、ヘッダーありのタブ区切りのCSVファイルからデータを読み込んでいます。
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
手動でフォーマットを指定するためにschema
を使用することができます。スキーマ推定をサポートしていないフォーマットではschema
を指定しなくてはなりません。
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
注意
ファイルを読み込むためにAuto Loaderを使用する際、Delta Live Tablesは自動でスキーマやチェックポイントのディレクトリを設定、管理します。これらのディレクトリのいずれかを手動で設定し、フルリフレッシュを実行しても、設定されたディレクトリのコンテンツには影響を及ぼしません。処理過程で期待しない副作用を避けるために、自動で設定されるディレクトリを使用することをお勧めします。
Apache Sparkファイルソース
Pythonでデータセットを定義し、バッチ処理でファイルを読み込むには、標準的なPySpark関数を使用することができます。以下のサンプルでは、PySparkのspark.read.format("parquet").load()
関数を用いて、ファイルからParquetデータを読み込んでいます。
@dlt.table
def lendingclub_raw_data():
return (
spark.read.format("parquet").load("/databricks-datasets/samples/lending_club/parquet/")
)
Spark SQLファイルソース
SQLでデータセットを定義し、バッチ処理でファイルを読み込むには、Spark SQLの構文を使用することができます。以下のサンプルでは、ファイルからParquetデータを読み込んでいます。
CREATE OR REFRESH LIVE TABLE customers
AS SELECT * FROM parquet.`/databricks-datasets/samples/lending_club/parquet/`