0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Delta Live Tables Sink APIによる外部サービスへのデータの書き込み

Posted at

見逃してました。2月頭のアップデートです。

Delta Live Tables シンクを使用してパイプラインから外部サービスにデータを書き込む (パブリック プレビュー)

Delta Live Tables sink API はパブリック プレビュー段階です。 Delta Live Tablesシンクを使用すると、パイプラインによって変換されたデータを、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity Catalog や Hive metastoreによって管理される外部テーブルなどのターゲットに書き込むことができます。Delta Live Tables シンクを使用して外部サービスにレコードをストリームするを参照してください。

ブログも公開されていました。

要は、これまでのDelta Live Tables(DLT)ではストリーミングテーブルとマテリアライズドビューのみを用いてデータの処理を行っていましたが、それだと外部システムとの連携が困難だったり、構造化ストリーミングのシンクを利用したいと言うニーズに応えられなかったので、今回新たにSink APIを提供するようにしたとのこと。これによって、外部のイベントストリーミングサービスや外部のDeltaテーブルへの書き込みが可能になります。

と言うことで、動かしてみます。

DLT Sink APIのウォークスルー

ブロンズレイヤー

import dlt
from pyspark.sql.functions import *

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="生のウィキペディアクリックストリームデータセット、databricks-datasetsから取り込まれました。",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():         
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

シルバーレイヤー

@dlt.table(
 comment="データ型/列名がクリーンアップされ、品質期待値が設定されたウィキペディアクリックストリームデータセット。",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
  return (
    dlt.readStream("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
  )

ゴールドレイヤー

@dlt.table(
 comment="Apache Sparkページにリンクする最も一般的なページのテーブル。",
 table_properties={
   "quality": "gold" 
 } 
)
def spark_referrers():
 return (
   dlt.readStream("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

ここまでは従来と同じですね。

Delta Live Tables sinkを構成

Delta Live Tables シンクを構成するを参考に、Deltaテーブルシンクを構成します。

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "users.takaaki_yayoi.my_table" }
)

Delta Live Tables sinkへの書き込みフロー

シンクへの書き込みフローを定義します。

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
  return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
  )

パイプラインの実行

上のノートブックからパイプラインを構成して実行します。

Screenshot 2025-03-04 at 16.24.05.png

新たにシンクがパイプラインの最後に現れます。

Screenshot 2025-03-04 at 16.41.25.png

シンクのDeltaテーブルにデータが永続化されました。

Screenshot 2025-03-04 at 16.42.55.png

リネージもキャプチャされています。

Screenshot 2025-03-04 at 16.43.25.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?