Using Auto Loader in Delta Live Tables | Databricks on AWS [2022/6/6時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta Live Tablesのパイプラインの中でAuto Loaderを使うことができます。Delta Live Tablesは、Apache Sparkの構造化ストリーミングの機能を拡張し、数行の宣言型PythonあるいはSQLを書くだけで、以下の機能を持つプロダクション品質のデータパイプラインをデプロイすることができます。
- コスト節約のためにオートスケーリングする計算インフラストラクチャ
- エクスペクテーションによるデータ品質チェック
- 自動スキーマエボリューションのハンドリング
- イベントログのメトリクスによるモニタリング
Delta Live Tableは自動でスキーマやチェックポイントの格納場所を管理するので、パイプラインにこれらの設定を行う必要はありません。Delta Live Tablesのデータソースをご覧ください。
DLT用Auto Loader構文
Delta Live TablesはAuto Loader向けに若干修正したPython構文を提供し、Auto LoaderのSQLサポートを追加しています。
以下の例では、CSV、JSONファイルからデータセットを作成するためにAuto Loaderを使用しています。
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Auto Loaderでサポートされているフォーマットオプションを使用することができます。map()
関数を用いることで、cloud_files()
メソッドに任意の数のオプションを指定することができます。オプションはキーバリューのペアであり、キーとバリューは文字列となります。以下ではSQLでAuto Loaderを動作させる構文を示しています。
CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
FROM cloud_files(
"<file_path>",
"<file_format>",
map(
"<option_key>", "<option_value",
"<option_key>", "<option_value",
...
)
)
以下の例では、タブ区切りのヘッダー付きCSVからデータを読み込んでいます。
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
手動でフォーマットを指定するためにschema
を指定することができます。スキーマ推定をサポートしないフォーマットではschema
を使用する必要があります。
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
注意
Delta Live Tablesはファイルを読み込むためにAuto Loaderを使用する際、スキーマとチェックポイントのディレクトリを自動で設定し管理します。しかし、これらのディレクトリを手動で設定した場合、フルリフレッシュは設定されたディレクトリの中のコンテンツに影響を与えません。処理の際の予期しない副作用を避けるために、自動で設定されたディレクトリを使用することをお勧めします。