How to Better Monitor Streaming Queries with Spark 3.0 Structured Streaming - The Databricks Blogの翻訳です。
この記事はAlibabaのソフトウェアエンジニアGenmao Yuによるゲストコミュニティの投稿です。
Apache Spark 2.0で構造化ストリーミングが導入され、分散ストリーム処理アプリケーションを構築するためのベストなプラットフォームであることが証明されました。SQL/データセット/データフレームAPIとSparkのビルトイン機能の統合によって、開発者はストリーミングの集計、ストリーム・ストリームのjoin、ウィンドウサポートのような複雑な要件に対応できるようになりました。構造化ストリーミングの立ち上げ以来、開発者からはSparkストリーミング(DStream)でサポートしたのと同じように、ストリーミングを管理するためのより良い方法はないのかという問い合わせを頻繁にいただくようになりました。Apache Spark 3.0では、構造化ストリーミングのための新たなUIをリリースしました。
新たな構造化ストリーミングUIでは、全てのストリーミングジョブの有用な情報と統計情報を提供することで、開発時のデバッグを容易にし、リアルタイムメトリクスによる運用時の可視性を改善します。UIでは二つのセットの統計情報を表示します:1) ストリーミングクエリージョブの集計情報、2) Input Rate、Process Rate、Input Rows、Batch Duration、Operation Durationなどを含むストリーミングクエリーに関する詳細統計情報です。
ストリーミングクエリージョブの集計情報
開発者がストリーミングSQLクエリーをサブミットした際、Structured Streamingタブの一覧に表示されます。ここには、アクティブなストリーミングクエリーと、完了した(completed)ストリーミングクエリーの両方が表示されます。ストリーミングクエリーのいくつかの基本情報は、結果テーブルに表示されます。これには、クエリー名、ステータス、ID、ランのID、サブミット時刻、クエリー期間、最後のバッチID、そして、平均入力レート、平均処理レートのような集計情報が含まれます。ストリーミングクエリーのステータスにはRUNNING、FINISHED、FAILEDの3つがあります。FINISHED、FAILEDの全てのクエリーは、完了したクエリーのテーブにリストされます。Errorカラムには、失敗したクエリーの例外の詳細が表示されます。
ランIDをクリックすることで、ストリーミングクエリーの詳細統計情報を確認することができます。
詳細統計情報
統計ページでは、入力/処理レート、レーテンシー、詳細なオペレーション期間を含むメトリクスが表示され、ストリーミングクエリーのステータスに対する洞察を得るのに役立ち、クエリー処理における以上を容易にデバッグできるようになります。
ここには以下のメトリクスが含まれます。
- Input Rate: (全てのソースから)到着するデータのレートの集計値。
- Process Rate: Sparkが(全てのソースからの)データを処理するレートの集計値。
- Batch Duration: それぞれのバッチに要する時間。
- Operation Duration: 様々なオペレーションに要した時間(m秒)
追跡されるオペレーションは以下の通りです。
- addBatch: マイクロバッチがソースから入力データを読み込み、処理し、シンクにバッチの出力を書き出すのに要する時間。これはマイクロバッチの処理時間の大部分を占めます。
- getBatch: ソースから現在のマイクロバッチの入力を読み込むための論理的クエリーを準備に要する時間。
- getOffset: 新たな入力なデータがあるかどうかをソースに問い合わせるのに要する時間。
- walCommit: メタデータログへのオフセットの書き込み。
- queryPlanning: 実行計画の生成。
全てのオペレーションがUIに表示されるわけでないことに注意する必要があります。データソースごとに異なるタイプの異なるオペレーションが存在するので、一覧されたオペレーションの一部は、一つのストリーミングクエリーで実行される場合があります。
UIによるストリーミング性能のトラブルシューティング
ここでは、新たな構造化ストリーミングUIが、何かおかしいことが起きていることを知らせてくれるケースを見ていきましょう。ハイレベルでは、以下のようなデモクエリーを用い、それぞれのケースで、いくつかの前提条件を置きます。
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
処理能力不足によるレーテンシーの増加
最初のケースでは、可能な限り迅速にApache Kafkaを処理するクエリーを実行します。それぞれのバッチにおいては、ストリーミングジョブはKafkaで利用できる全てのデータを処理します。処理能力がバッチデータを処理するのに十分でない場合には、急速にレーテンシー画像化します。もっとも直感的に判断できるのは、Input RowsとBatch Durationの線形増加です。Process Rateは、ストリーミングジョブが最大で約8,000レコード/秒しか処理できていないことを示しています。しかし、現在のInput Rateは約20,000レコード/秒となっています。ストリーミングジョブにより多くの実行リソースを追加するか、全てのコンシューマーがプロデューサーについていけるように充分なパーティションを追加することができます。
安定しているがレーテンシーが高い
ここでのケースは上のケースと何が違うのでしょうか?以下のスクリーンショットのように、レーテンシーは増加していませんが安定しています。
同じInput RateでProcess Rateが安定していることがわかります。これは、ジョブの処理能力は入力データを処理するには充分であることを意味しています。しかし、それぞれのバッチの処理時間、すなわちレーテンシーは20秒と高いものになっています。高いレーテンシーの主な理由はそれぞれのバッチで大量のデータを取り扱っているためです。通常、ジョブの並列性を上げることでレーテンシーを削減することができます。10以上のKafkaのパーティションとSparkタスクに10コアを追加した後で、レーテンシーが20秒から5秒程度になったことがわかります。
Operation Durationチャートをトラブルシュートに活用する
Operation Durationチャートは様々なオペレーションを実行するのに要する時間(m秒)を表示します。これは、それぞれのバッチの時間の分布を理解するのに有用であり、トラブルシューティングが容易になります。例としてApache Sparkコミュニティにおける性能改善「SPARK-30915:最新のバッチIDを検知した際のメタデータログファイルの読み込みの回避」を使ってみましょう。
この取り組みの前は、圧縮されたメタデータログが膨大になった際、他のバッチよりも圧縮後のバッチ処理に時間がかかっていました。
コードの調査の後で、不要な圧縮ログファイルの読み取りがあることがわかり修正されました。以下のoperation duraionチャートは期待通りの効果が得られていることがわかります。
今後の開発予定
上で示した通り、新たな構造化ストリーミングUIによって、開発者はより有益なストリーミングクエリー情報を用いてストリーミングジョブをモニターできるようになります。初期リリースバージョンでは、新たなUIは開発中のものであり、将来のリリースで改善されていきます。すべてではありませんが、以下のような機能が今後実装される予定です。
- ストリーミングクエリー実行詳細の追加: 遅延データ、ウォーターマーク、状態、メトリクスなど。
- Sparkヒストリーサーバーにおける構造化ストリーミングUIのサポート。
- レーテンシーの発生など異常事態に対応するためのティップス。
新たなUIを試す
新たなDatabricksランタイム7.1からサポートされているApache Spark 3.0のSparkストリーミングUIを試してみてください。Databricksノートブックを利用されているのであれば、ノートブックのストリーミングクエリーのステータスを確認し、クエリーを管理するシンプルな方法を提供しています。Databricksは無料かつ数分で利用を開始することができます。利用開始する際にクレジットカードの登録は不要です。