見逃してました。2月頭のアップデートです。
Delta Live Tables シンクを使用してパイプラインから外部サービスにデータを書き込む (パブリック プレビュー)
Delta Live Tables
sink
API はパブリック プレビュー段階です。 Delta Live Tablesシンクを使用すると、パイプラインによって変換されたデータを、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity Catalog や Hive metastoreによって管理される外部テーブルなどのターゲットに書き込むことができます。Delta Live Tables シンクを使用して外部サービスにレコードをストリームするを参照してください。
ブログも公開されていました。
要は、これまでのDelta Live Tables(DLT)ではストリーミングテーブルとマテリアライズドビューのみを用いてデータの処理を行っていましたが、それだと外部システムとの連携が困難だったり、構造化ストリーミングのシンクを利用したいと言うニーズに応えられなかったので、今回新たにSink APIを提供するようにしたとのこと。これによって、外部のイベントストリーミングサービスや外部のDeltaテーブルへの書き込みが可能になります。
と言うことで、動かしてみます。
DLT Sink APIのウォークスルー
ブロンズレイヤー
import dlt
from pyspark.sql.functions import *
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="生のウィキペディアクリックストリームデータセット、databricks-datasetsから取り込まれました。",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
シルバーレイヤー
@dlt.table(
comment="データ型/列名がクリーンアップされ、品質期待値が設定されたウィキペディアクリックストリームデータセット。",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
dlt.readStream("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
ゴールドレイヤー
@dlt.table(
comment="Apache Sparkページにリンクする最も一般的なページのテーブル。",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
dlt.readStream("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
ここまでは従来と同じですね。
Delta Live Tables sinkを構成
Delta Live Tables シンクを構成するを参考に、Deltaテーブルシンクを構成します。
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "users.takaaki_yayoi.my_table" }
)
Delta Live Tables sinkへの書き込みフロー
シンクへの書き込みフローを定義します。
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
パイプラインの実行
上のノートブックからパイプラインを構成して実行します。
新たにシンクがパイプラインの最後に現れます。
シンクのDeltaテーブルにデータが永続化されました。
リネージもキャプチャされています。