How Streaming Queries Are Monitored in Pyspark - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
データ取り込みと分析において、ストリーミングは最も重要なデータ処理技術となっています。ユーザーと開発者に対して、分析とアクションのトリガーのための低レーテンシーかつリアルタイムのデータ処理能力を提供します。しかし、ストリーミングデータワークロードのモニタリングは、データが到着するたびに連続的に処理されるのでチャレンジングなものとなっています。このストリーム処理の常時稼働の特質から、リアルタイムのメトリクス、アラート、ダッシュボードなしには開発やプロダクションにおける問題のトラブルシュートがより困難なものとなります。
Apache Spark™の構造化ストリーミングは、以下の機能を提供することでモニタリングの問題に取り組んでいます。
- リアルタイムのメトリクスと統計情報を提供する専用のUI。詳細に関しては、A look at the new Structured Streaming UI in Apache Spark 3.0.をご覧ください。
- 外部システムを用いたダッシュボードやアラートのような高度なモニタリング機能を提供するObservable API。
これまでは、Observable APIはユーザーが自身のストリーミングクエリーでアラートや他の外部システムのダッシュボードの機能を活用する際にはScala APIを使用しなくてはならず、PySparkには存在していませんでした。Pythonにおける本機能の欠如は、Databricksで実行されるノートブックコマンドの約70%がPythonであるように、Pythonが普及するにつれて重要なこととなりました。
Databricksランタイム11において、Observable APIがPySparkで利用できるようになったことを発表できて嬉しく思っています。この記事では、構造化ストリーミングにおけるPythonのObservable APIと、ストリーミングクエリーにアラートロジックを追加するシナリオのステップバイステップのサンプルを説明します。
Observable API
今では、開発者はPySparkでストリーミングクエリーのリスナーインタフェースとObservable APIを組み合わせることで、カスタムメトリクスを用いたアラートやダッシュボードのように、外部システムにストリーミングのメトリクスを送信することができます。Streaming Query Listenerインタフェースは、継承する必要がある抽象クラスであり、以下のように全てのメソッドを実装しなくてはなりません。
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` will always be
latest no matter when this method is called. Therefore, the status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may be changed before/when you process the event.
For example, you may find :class:`StreamingQuery`
is terminated when you are processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
これら全ては非同期で動作することに注意してください。
-
DataStreamWriter.start
のように、ストリーミングクエリーがスタートした際にStreamingQueryListener.onQueryStarted
がトリガーされます。 - 個々のマイクロバッチが完了した際に、
StreamingQueryListener.onQueryProgress
が呼び出されます。 -
StreamingQuery.stop
のように、クエリーが停止された際にはStreamingQueryListener.onQueryTerminated
がコールされます。
StreamingQueryManager
を通じてアクティベートされるようにするために、リスナーを追加する必要があり、以下のように後で削除することができます。
spark.streams.addListener(my_listener)
spark.streams.removeListener(my_listener)
カスタムメトリクスをキャプチャするためには、DataFrame.observe
を通じてカスタムメトリクスを追加しなくてはなりません。カスタムメトリクスは、以下のようにcount("value")
のような任意の集計関数として定義されます。
df.observe("name", count(column), ...)
エラーアラートのシナリオ
このセクションでは、Observable APIにおけるじつ世界のユースケースの散布を説明します。ここでは、別のシステムから新規のCSVファイルが連続的に到着するディレクトリがあり、これをストリーミングを用いて取り込まなくてはならないとします。この例では、APIを容易に理解できるようにするために、シンプルさのためにローカルファイルシステムを使用します。以下のコードスニペットをpysparkのシェルにコピー&ペーストしてトライすることもできます。
最初に、必要なPythonクラスとパッケージをインポートし、このシナリオで使用するmy_csv_dir
ディレクトリを作成しましょう。
import os
import shutil
import time
from pathlib import Path
from pyspark.sql.functions import count, col, lit
from pyspark.sql.streaming import StreamingQueryListener
# NOTE: replace `basedir` with the fused path, e.g., "/dbfs/tmp" in Databricks
# notebook.
basedir = os.getcwd() # "/dbfs/tmp"
# My CSV files will be created in this directory later after cleaning 'my_csv_dir'
# directory up in case you already ran this example below.
my_csv_dir = os.path.join(basedir, "my_csv_dir")
shutil.rmtree(my_csv_dir, ignore_errors=True)
os.makedirs(my_csv_dir)
次に、自身のカスタムストリーミングクエリーリスナーを定義します。リスナーは、それぞれのプロセスのCSV取り込みであまりに多い不正なレコードがあった場合にアラートを発呼します。処理されたレコードのトータル数の50%以上が不正なレコードであった場合、ログメッセージを出力します。しかし、プロダクションのシナリオにおいては、シンプルに出力するのではなく外部システムに接続することができます。
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
my_listener = MyListener()
spark.streams.addListener(my_listener)
リスナーをアクティベートするために、この例のクエリーの前に追加します。しかし、これらは非同期で動作するので、リスナーをクエリーの開始、停止に関係なく追加できることに注意してください。これによって、ご自身の実行中のクエリーを停止することなしに、アタッチ・デタッチが可能となります。
これで、my_csv_dir
ディレクトリにあるファイルを取り込むストリーミングクエリーをスタートします。処理の途中では、不正なレコードの数と処理されたレコードの数を観察することもできます。CSVデータソースはデフォルトでは不正なレコードを_corrupt_record
に格納しますので、不正なレコードの数を取得するためにこのカラムをカウントします。
# Now, start a streaming query that monitors 'my_csv_dir' directory.
# Every time when there are new CSV files arriving here, we will process them.
my_csv = spark.readStream.schema(
"my_key INT, my_val DOUBLE, _corrupt_record STRING"
).csv(Path(my_csv_dir).as_uri())
# `DataFrame.observe` computes the counts of processed and malformed records,
# and sends an event to the listener.
my_observed_csv = my_csv.observe(
"metric",
count(lit(1)).alias("cnt"), # number of processed rows
count(col("_corrupt_record")).alias("malformed")) # number of malformed rows
my_query = my_observed_csv.writeStream.format(
"console").queryName("My observer").start()
これでストリーミングクエリーとアラート機能を定義したので、ストリーミングで取り込まれるCSVファイルを作成します。
# Now, we will write CSV data to be processed in a streaming manner on time.
# This CSV file is all well-formed.
with open(os.path.join(my_csv_dir, "my_csv_1.csv"), "w") as f:
_ = f.write("1,1.1\n")
_ = f.write("123,123.123\n")
time.sleep(5) # Assume that another CSV file arrived in 5 seconds.
# Ouch! it has two malformed records out of 3. My observer query should alert it!
with open(os.path.join(my_csv_dir, "my_csv_error.csv"), "w") as f:
_ = f.write("1,1.123\n")
_ = f.write("Ouch! malformed record!\n")
_ = f.write("Arrgggh!\n")
time.sleep(5) # OK, all done. Let's stop the query in 5 seconds.
my_query.stop()
spark.streams.removeListener(my_listener)
ここでは、クエリーの開始、停止、処理が適切に記録されることを確認することができます。このCSVファイルには2つの不正なレコードが存在するため、以下のエラーメッセージとともに適切にアラートが発生します。
...
ALERT! Ouch! there are too many malformed records 2 out of 3!
...
結論
PySparkのユーザーは、ストリーミングクエリーのリスナーとObservable APIを通じてカスタムメトリクスを設定し、観測することができるようになりました。このようなロジックを、必要に応じて実行中のクエリーにアタッチ・デタッチすることができます。この機能は、ダッシュボード、アラート、他の外部システムを用いたレポートの要件に取り組むものです。
Streaming Query ListenerインタフェースとObservable APIはDBR 11 betaで利用でき、将来のApache Sparkでも利用できるようになる見込みです。すぐにDBR 11 Betaを使ってDatabricksでこの新機能を試してみてください。
サンプルノートブック
日本語のノートブックです。