0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

宣言型パイプラインとforeachBatchの出会い: 高度なパイプライン向けカスタムストリーミング

Last updated at Posted at 2026-01-09

Declarative Pipelines meets foreachBatch: Custom S... - Databricks Community - 142033の翻訳です。

本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Lakflow宣言型パイプライン(LDP)のシンクに関して初めて聞くのであれば、最近ローンチされたイベントストリームやDeltaテーブルのような外部システムにシームレスにデータを注ぎ込む能力を提供するSinks APIを探索するSDP Sink APIのご紹介を読むことを強くお勧めします。

訳者註
LDPは現在Spark宣言型パイプライン(SDP)と呼ばれています。

イントロダクション

LDPは、あなたがストリーミングテーブルでリアルタイムデータを処理していても、マテリアライズドビューを用いてデータを効率的に集計していても、パイプラインの開発をシンプルにし、円滑にすることを目的として設計された宣言型のフレームワークです。Sinks APIの導入にによって、外部システムとのシームレスなインテグレーションを可能にし、機能をさらに強化します。

この記事では、ストリーミングデータに対する任意のバッチ処理を推進するApache Sparkの構造化ストリーミングの機能であり、複雑な変換処理やさまざまなデータシンクへの書き込みを可能にするLDPのforeachBatchのサポートを深掘りします。

クリックから洞察へ: クリックストリームデータの実践

シンプルさと再現可能性のために、この記事ではSinks APIブログと同じデータソースを使用します: Databricksデータセットにあるクリックストリームデータセットです。このデータは、訪問者がナビゲートし、あるページから他のページにどのように移動したのかを説明するインターネットの地図と考えることができます。わかりやすくするために、ここでは、ページIDとタイトルには1:1の関係があり、タイトルは静的なものと仮定します。このデータセットを用いて、3つのキーとなるユースケースを探索します:

  1. 包括的な訪問の集計器: 人気の訪問先を明らかにするために、アクセス元に関係なく集計を行うことで合計のページ訪問数を深掘りしましょう。
  2. 100万クリッククラブ: ここから面白くなってきます。単一の時間帯において100万を超えたページ訪問を示す、SQLサーバーにデータを記録する監視システムを実装します。
  3. ニューヨーク市トラッカー: 最後のステップでは、「New_York_City」ページに対する訪問をアクティブに監視します。到着するストリームがこのようなイベントを捕捉した時には常に、効率的な保存と分析のためにParquetレコードとしてこのデータをアーカイブします。

この記事の最後までには、あなたのデータ処理ワークフローをLDPがどのように引き上げるのかを知ることになります。どのようにして効率的にリアルタイムでストリーミングデータに対応し、既存のDeltaテーブルにシームレスにマージし、動的に複数の宛先にルーティングするのかをウォークスルーします!

コードのウォークスルー

クリックストリームデータの準備

必要なモジュールのインポートとJSONフォーマットでWikipediaのクリックストリームデータを取り込むためのストリーミングテーブルの作成からスタートしましょう。@dp.table APIを用いて、ソースロケーションから直接データをストリーミングします。また、イベントストリームを含み、AutoLoaderでサポートされる任意のソースからデータをストリーミングさせるようにパイプラインを設定することができます。フォーマットを切り替えて、必要な認可を提供するだけです。

from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable


json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"


@dp.table(
 comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

生のテーブルにクリックストリームデータがストリーミングされるようになったので、次のステップはデータ品質制約と変換処理を適用することで、データを洗練することになります。これには、データ型の標準化や不正レコードの除外が含まれます。データ品質を強制し、パイプラインの一貫性を維持するためのLDPネイティブのエクスペクテーションを活用することで、これを達成します。

@dp.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dp.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dp.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   dp.readStream("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

foreachBatch - 複数のシンクへの書き込み

ストリーミングテーブルでデータをきれいにしたので、LDPにforeachBatchスタイルの処理を導入するLDPの新たなシンクである@dp.foreach_batch_sinkを用いて、3つのユースケースを実装することができます。これによって、従来のDataFrame APIと連携しつつも、ストリーミングデータをマイクロバッチとして取り扱うことができます。

Spark構造化ストリーミングにおいては、データは一連のマイクロバッチとして処理されます。それぞれのマイクロバッチには、DataFrameとして表現される行の特定のセットが含まれています。処理の過程では、このDataFrameと対応するbatchIDはシンクに登録された関数に引き渡されます。我々のユースケースを実装するために、マイクロバッチからの行を格納するデータフレームdfを使用します。

最初のユースケースでは、すべてのアクセス元に対して、シンクで合計のページ訪問数を集計します。それぞれのマイクロバッチには同じcurrent_page_idの複数の行が含まれる場合があるので、最初にマイクロバッチレベルでデータを集計します。シンクに既存の値がある場合には、それを更新します。それ以外の場合には新規レコードを挿入します。

二つ目のユースケースでは、それぞれの時間間隔でトラフィックの高いページを特定するために、click_countでフィルタリングを行い、外部システムで利用できるようにするためにSQLサーバーのdbo.high_traffic_pagesと呼ばれるテーブルにデータを追記します。接続文字列を格納し、SQLサーバーとの認証に使用するために、Databricksのシークレットを活用します。

最後のユースケースでは、New York Cityへのページ訪問を捕捉するためにデータフレームに対してシンプルなフィルタリングを適用し、Unity CatalogのボリュームにParquetファイルとしてデータを安全にアーカイブします。アーカイブデータへのアクセスは、Unity Catalogを通じて直接管理、制御することができます。

@dp.foreach_batch_sink(name = "all_the_sinks")
def foreachBatchFunc(df, batchId):
 #Usecase 1: Aggregate page level click count at a Delta Sink
 agg_df = df.groupBy("current_page_id", "current_page_title").agg(sum("click_count").alias("click_count"))
 out = DeltaTable.forName(df.sparkSession, "harsha_pasala.default.clickstream_sink")
 out.alias("target") \
   .merge(agg_df.alias("source"), "source.current_page_id = target.current_page_id") \
   .whenMatchedUpdate(
     set = {"click_count": col("target.click_count") + col("source.click_count")}
   ) \
   .whenNotMatchedInsert(
     values = {
       "target.current_page_id": "source.current_page_id",
       "target.current_page_title": "source.current_page_title",
       "target.click_count": "source.click_count"
     }
   ) \
   .execute()


 #Usecase 2: High traffic pages to SQL Server.
 high_traffic_pages = df.filter(col("click_count") > 1000000)
 high_traffic_pages.write \
   .format("jdbc") \
   .option("url", dbutils.secrets.get(scope="secret-lab", key="sql_connection_string")) \
   .option("dbtable", "dbo.high_traffic_pages") \
   .mode("append") \
   .save()


 #Usecase 3: Monitor for a specific page and archive data.
 new_york_clicks = df.filter(col("current_page_title").like("New_York_City"))
 new_york_clicks.write.format("parquet").mode("append").save("/Volumes/path/")

訳者補足: 複数シンク書き込み時のcache()について

上記のコードでは単一のforeachBatch内で3つの宛先に書き込んでいます。このときdf.cache()を使用しないと、各write操作のたびにDataFrameの再計算が発生します。

効率的に処理するには以下のようにcache()unpersist()を追加します:

@dp.foreach_batch_sink(name = "all_the_sinks")
def foreachBatchFunc(df, batchId):
    df.cache()
    try:
        # 3つの書き込み処理
        ...
    finally:
        df.unpersist()

これによりソースからの読み取りと計算が1回で済み、特にKafkaなど課金対象のソースや計算コストの高い変換を含む場合に効果的です。

シンクへのデータのストリーミング

データの準備ができたので、シンクを配置します - 実行の時です。我々のシンクはマージ挙動を用いてデータを変更するので、clickstream_cleanストリーミングテーブルからのデータを、上で作成したall_the_sinksforeach_batch_sinkに向けさせるためにupdate_flowAPIを使用します。これによって、データが効率的に処理され、我々が定義したユースケースに応じてルーティングされることを保証します。

@dp.update_flow(
 target="all_the_sinks",
 name="sink_flow"
)
def read_data():
 return (
   dp.readStream("clickstream_clean")
 )

最後に、LDPパイプラインでクリックストリームデータ取り込み、品質制約を用いた処理を連携させることで、パイプラインの実行で以下のDAGに到達することになります。DAGはall_the_sinkという単一のシンクを表示していますが、これは設定した3つすべての下流のターゲットにデータを効果的にディスパッチするforeach_batch_sinkを表現していることに注意することが重要です。

過去数年において、LDPはさまざまな機能強化をおこなってきており、@dp.foreach_batch_sinkは最新の追加機能の一つとなっています。この機能によって、高度なストリーミング機能を実現しつつも、既存のシンクとappend flow APIをベースに構築を行い、よりきれいで効率化された実装を可能にします。このAPIに関する詳細と、サンプルの実装についてはドキュメントをご覧ください。

関連するDatabricksブログ記事

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?