Delta Live Tables data sources | Databricks on AWS [2022/5/23時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
データセットを作成するために、以下の外部データソースを使用することができます。
- Databricksランタイムが直接サポートしているすべてのデータソース。
- Azure Data Lake Storage Gen2 (ADLS Gen2)、AWS S3、Google Cloud Storage (GCS)のようなクラウドストレージ上のすべてのファイル。
- DBFSに格納されているすべてのファイル。
サポートされているファイルフォーマットからデータを読み込む際、特に継続的に到着するデータを処理するストリーミングライブテーブルでは、パイプラインにAuto Loaderを使用することをお勧めします。Auto Loaderはスケーラブルかつ効率的であり、スキーマ推定をサポートしています。
Auto Loaderでサポートされていないファイルフォーマットからバッチオペレーションでデータを読み込むために、PythonデータセットではApache Sparkビルトインのファイルデータソースを使用することができます。
Auto Loaderでサポートされていないファイルフォーマットからバッチオペレーションでデータを読み込むために、SQLデータセットではDelta Live Tablesのファイルソースを使用することができます。
Auto Loader
以下の例では、CSVファイル、JSONファイルからデータセットを作成するためにAuto Loaderを使用しています。
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Auto Loaderでサポートされているフォーマットオプションを使用することができます。map()
関数を使うことで、cloud_files()
メソッドに任意の数のオプションを指定することができます。オプションはキーバリューのペアであり、キーとバリューは文字列です。以下では、SQLでのAuto Loaderの使い方を説明しています。
CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
FROM cloud_files(
"<file_path>",
"<file_format>",
map(
"<option_key>", "<option_value",
"<option_key>", "<option_value",
...
)
)
以下の例では、ヘッダーありのタブ区切りのCSVファイルからデータを読み込んでいます。
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
フォーマット手動で設定するためにschema
を指定することができます。スキーマ推定でサポートされていないフォーマットにはschema
を指定しなくてはなりません。
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
注意
Delta Live Tablesでは、ファイルを読み込むためにAuto Loaderを使う際には、自動でスキーマとチェックポイントのディレクトリを設定、管理します。しかし、手動でこれらのディレクトリを設定してフルリフレッシュを実行しても、設定されたディレクトリのコンテンツには影響を及ぼしません。処理の際に予期しない副作用を避けるために、自動で設定されたディレクトリを使用することをお勧めします。
Apache Sparkファイルソース
Pythonでデータセットを定義する際に、バッチオペレーションでファイルを読み込むために標準的なPython関数を使用することができます。以下の例では、PySparkのspark.read.format("parquet").load()
関数を用いてファイルからParquetデータを読み込んでいます。
@dlt.table
def lendingclub_raw_data():
return (
spark.read.format("parquet").load("/databricks-datasets/samples/lending_club/parquet/")
)
Spark SQLファイルソース
SQLでデータセットを定義する際に、バッチオペレーションでファイルを読み込むためにSpark SQL文法を使用することができます。以下の例では、ファイルからParquetデータを読み込んでいます。
CREATE OR REFRESH LIVE TABLE customers
AS SELECT * FROM parquet.`/databricks-datasets/samples/lending_club/parquet/`