Best Practices for Using Structured Streaming in Production - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
いかなるデータパイプラインやアプリケーションをプロダクション状態にリリースするには、計画、テスト、監視、メンテナンスが必要となります。この点に関しては、ストリーミングパイプラインも例外ではありません。本記事では、ストリーミングパイプラインやストリーミングアプリケーションをプロダクション環境にデプロイする際の最も重要な検討事項のいくつかを説明します。
Databricksにおいては、ストリーミングパイプラインやアプリケーションの開発、実行に対して2つの異なる手段を提供しています。Delta Live Tables (DLT)とDatabricksワークフローです。DLTはバッチとストリーミングパイプラインの両方をサポートするフラッグシップかつ完全マネージドETL製品です。これは、宣言型開発、自動オペレーション、データ品質、高度な観測性機能などを提供しています。ワークフローを用いることで、お客様は統合されたガバナンス(Unity Catalog)やストレージ(Delta Lake)を用いて、Databricksの最適化された実行環境(Photonなど)でApache Sparkのワークロードを実行することができます。ストリーミングのワークロードに関しては、DLTとワークフローは同じコアのストリーミングエンジンであるSpark構造化ストリーミングを使用しています。DLTの場合、お客様はDLT APIを用いてプログラムを行い、DLTは内部で構造化ストリーミングエンジンを使用します。ジョブの場合、お客様は直接Spark APIを用いてプログラムを行います。
本記事の推奨事項は構造化ストリーミングエンジンの観点から書かれており、これらのほとんどはDLTとワークフローの両方に適用されます(DLTはトリガーやチェックポイントのような、これらのいくつかを自動でハンドリングしますが)。我々は、推奨事項をこれらのコンセプトがいつ適用されるべきかをハイライトするためにデプロイメント前
とデプロイメント後
という見出しの中にグルーピングし、これらを分割する形でこのブログシリーズを公開しました。これらのセクションのいくつかに関してディープダイブする追加コンテンツも予定しています。ストリーミングパイプラインやストリーミングアプリケーションをプロダクションに移行し始める前に、全てのセクションを読んでいただき、開発からQA、最終的なプロダクションにプロモーションする際に再度読むことをお勧めします。
デプロイメント前
プロダクションの体験を改善するために、ストリーミングアプリケーションを作成する際に検討すべきことは多数存在しています。ユニットテスト、チェックポイント、トリガー、状態管理のようなこれらのトピックのいくつかは、あなたのストリーミングアプリケーションがどのように動作するのかを決定します。命名規則やどのクラスターでどれだけのストリームを実行するのかといった他のことに関しては、同じ環境で複数のストリーミングアプリケーションを管理するために行うべきことです。
ユニットテスト
バグを見つけ出して修正することによるコストは、あなたがSDLC(ソフトウェア開発ライフサイクル)プロセスを進むに従って指数関数的に増加し、これは構造化ストリーミングアプリケーションにおいても違いはありません。プロトタイプを強固なプロダクションパイプラインに移行する際、ビルトインのテストを行うCI/CDプロセスを必要とします。でも、これらのテストをどのように作成したらいいのでしょうか?
はじめは、ストリーミングパイプラインのユニットテストには何か特別の物が必要になると思うかもしれませんが、そのようなことはありません。ストリーミングパイプラインの一般的なガイドは、Sparkバッチジョブで聞いたことがあるかもしれないガイドと違いません。効果的にユニットテストを行えるようにコードを整理することからスタートします:
- コードをテスト可能な塊に分割します。
- ビジネスロジックを他の関数を呼び出す関数に整理します。foreachBatchの中に大量のロジックを含んでいる、あるいは、mapGroupsWithStateやflatMapGroupsWithStateを実装している場合、別々にテストできる複数の関数にコードを整理します。
- グローバルの状態や外部システムに対する依存関係をコーディングしてはいけません。
- データフレームやデータセットを操作する全ての関数は、データフレーム/データセット/設定を入力とし、データフレーム/データセットを出力するように整理されなくてはなりません。
コードが論理的なマナーに沿って分割されたら、関数のそれぞれに対してユニットテストを実装することができます。データフレームやデータセットを用いてUDFや関数をテストする際、複数のSparkテストフレームワークを利用することができます。これらのフレームワークは入力を容易に作成できるようにすべてのデータフレーム/データセットAPIをサポートしており、データフレームの中身やスキーマを比較できる特別なアサーションを有しています。以下にいくつかの例を示します:
- Sparkのすべてのパーツをテストするために設計されたビルトインのSparkテストスイート
- ScalaとPythonの両方をサポートする
spark-testing-base
- Scala Spark 2 & 3をテストするための
spark-fast-tests
-
spark-fast-tests
のPythonバージョンであるchispa
これらのライブラリのサンプルコードはこちらからアクセスすることができます。
しかし、待ってください!ここではストリーミングアプリケーションをテストします。ユニットテストでストリーミングデータフレームを作成する必要はないのでしょうか?答えはNOです。やめてください!ストリーミングデータフレームは終わりのないデータセットを表現しますが、関数が実行される際、それらは個々のデータセットであるマイクロバッチに対して実行されます。ステートレス、ステートフルなストリーム両方に対して、バッチアプリケーションに対して使用するのと同じユニットテストを使用することができます。他のフレームワークに対する構造化ストリーミングのメリットの一つは、両方のストリーミングに対して同じ変換処理のコードを使用でき、同じシンクに対する他のバッチオペレーションと共存できると言うことです。これによって、例えばデータのバックフィルのような処理において、2つの異なるアプリケーションの間でロジックを同期しようとするのではなく、入力ソースを修正し同じ宛先に書き込むことでオペレーションをシンプルにすることができます。シンクがDeltaテーブルの場合、両方のプロセスが追加のみのオペレーションであるならば、これらのオペレーションを同時実行することも可能です。
トリガー
どのようにコードが動作するのかを理解したので、次はストリームがどの程度の頻度で新規データを確認するのかを決定する必要があります。ここでトリガーが登場します。トリガーの設定はwriteStream
コマンドのオプションの一つであり、以下のようになります。
.trigger(Trigger.ProcessingTime("30 seconds"))
.trigger(processingTime='30 seconds')
上の例では、マイクロバッチが30秒以内で完了した場合には、エンジンは次のマイクロバッチを起動する前に残りの時間を待つことになります。マイクロバッチの完了に30秒以上かかった場合、以前のマイクロバッチが完了したら即座に次のマイクロバッチを起動します。
トリガー間隔を設定する際に検討すべき2つの要素があります。あなたのストリームがマイクロバッチを処理するのにどの程度の時間を要するのかと、システムにどれだけの頻度で新規データをチェックしてほしいのかということです。短いトリガー間隔を用いて、アプリケーションのパフォーマンスに合わせて、より多くのワーカーを追加したり、計算資源最適化あるいはメモリー最適化インスタンスを用いることで全体的な処理のレーテンシーを引き下げることができます。これらの増強されたリソースはコストの増加を引き起こしますので、コストの最小化がゴールの場合は、少ないリソースを用いて長いトリガー間隔を用いることができます。リソースの利用率を最大化するために、通常はストリームがマイクロバッチを処理するのに必要とする時間よりも長いトリガー間隔を設定することはありませんが、ストリームが共有クラスターで動作しており、定常的にクラスターのリソースを消費したくない場合には、トリガー間隔を長くすることは合理的と言えます。
データが頻繁に到着するわけでない、あるいは、SLAが10分以上など、連続的にストリームを実行する必要がない場合、Trigger.Once
オプションを使用することができます。このオプションはストリームを起動し、前回実行してからの新規データを確認し、1つの大きなバッチで全てを処理してシャットダウンします。Trigger.Once
を使用する際、連続的にストリームを実行するのと同じように、フォールトトレランスを保証するチェックポイント(以下をご覧ください)は、一度切り(exactly-once)の処理を保証します。
Sparkには、Trigger.AvailableNow
という新しいバージョンのTrigger.Once
があります。お手元のデータサイズによっては、すべてを1つのバッチで処理するTrigger.Once
が理想的ではない場合があります。Trigger.AvailableNow
は設定maxFilesPerTrigger
とmaxBytesPerTrigger
に基づいてデータを分割します。これによって、複数バッチでデータを処理することができます。これらの設定は、Trigger.Once
では無視されます。トリガー設定のサンプルはこちらから確認することができます。
ちょっとしたクイズです。1行のコードでどのようにストリーミング処理を、前回処理を行った時点まで自動で追跡するバッチ処理に切り替えることができるでしょうか?答え - お使いの処理の時間トリガーをTrigger.Once/Trigger.AvailableNow
に変更するだけです!スケジュール処理で全く同じコードを実行し、いかなるレコードを逃したり再処理することはありません。
ストリームに名前をつける
お子さんには名前をつけますし、ペットにも名前をつけます。次はストリームに名前をつけましょう。ストリームに馴染みのある名前をつけることができる.queryName
というwriteStream
のオプションがあります。なぜわざわざ?それでは名前をつけないことにしましょう。その場合、追跡しなくてはならないのは、Spark UIの構造化ストリーミングタブには、<no name>
という文字列とストリームのユニークな識別子として自動で生成された理解が困難なguidとなります。クラスターで1つ以上のストリームが稼働しており、それらすべてが<no name>
と意味のわからない識別子だとしたら、対象をどのように見つけ出すのでしょうか?メトリクスをエクスポートした際にどれがどのストリームのものか分かるのでしょうか?
ご自身の手でわかりやすくし、ストリームに名前をつけましょう。プロダクション環境でこれらを管理する際、名前をつけたことにご自身で感謝することになりますので、お使いのすべてのforeachBatch()
コードのバッチクエリーに名前をつけましょう。
フォールトトレランス
あなたのストリームはシャットダウンからどのように復旧するのでしょうか?クラスター障害や意図的な停止のように、いくつか異なるケースがありますが、ソリューションはチェックポイントのセットアップを行うこととなります。先行書き込みログによるチェックポイントは、ストリーミングアプリケーションが停止されることに対するある程度の保護を提供し、前回まで処理したところから再開することが可能となります。
チェックポイントはストリームの現在のオフセットと状態の値(集計値など)を格納します。チェックポイントはストリーム固有のものなので、それぞれの格納場所が設定される必要があります。これを行うことで、シャットダウンやアプリケーションコードによる障害、予期しないクラウドプロバイダーの障害や制限から安全に復旧することができるようになります。
チェックポイントを設定するには、ストリーム定義にcheckpointLocation
オプションを追加します:
streamingDataFrame.writeStream
.format("delta")
.option("path", "")
.queryName("TestStream")
.option("checkpointLocation", "")
.start()
シンプルであり続けるように、.writeStream
を呼び出す際は常にユニークなロケーションのチェックポイントオプションを指定すべきです。foreachBatch
を使用しており、writeStream
自身がパスやテーブルオプションを指定していない場合であっても、チェックポイントは指定しなくてはいけません。これによって、Sparkの構造化ストリーミングは手間のいらないフォールトトレランスを提供します。
お使いのストリームにおけるチェックポイントの管理に要する手間は通常は最小限であるべきです。Tathagata Dasが述べたように、「ストリーミング分析を実行する最もシンプルな方法は、ストリーミングに対する理由付けを全く行わないというものだ」というものです。と言いながらも、チェックポイントファイルのメンテナンスに関する疑問としてよく話題に上がる1つの設定に言及する価値はあると言えます。これは直接の設定を必要としない内部設定ではありますが、spark.sql.streaming.minBatchesToRetain
(デフォルトは100)は、作成されるチェックポイントファイルの数を制御します。基本的には、バッチ開始時点のオフセットを示すファイル(先行書き込みログとして知られるオフセット)と、バッチ完了時のオフセットを示すファイル(コミット)があるため、この数の2倍のファイルが生成されます。内部プロセスの一部のクリーンアップのために、ファイルの数は定期的にチェックされます。これによって、長期にわたるストリーミングアプリケーションのメンテナンスの少なくとも一つの観点をシンプルにします。
また、お使いのアプリケーションコードのいくつかの変更によってはチェックポイントが無効になることを説明しておくことが重要です。デプロイメント前にコードレビューを通じてこれらの変更をチェックすることをお勧めします。Recovery Semantics after Changes in a Streaming Queryで、このようなことが生じる変更の例を確認することができます。チェックポイント生成の詳細を知りたい、あるいは非同期チェックポイントがストリーミングアプリケーションのレーテンシーを改善しうるのかを検討したいと考えるかもしれません。そのような場合は、非同期ステートチェックポイントによるストリーミングクエリーの高速化でより詳細がカバーされています。
状態管理とRocksDB
ステートフルなストリーミングアプリケーションでは、現在のレコードが以前のイベントに依存することがあり、Sparkはマイクロバッチ間でデータを保持しなくてはなりません。保持するデータのことを ステート(状態) と呼び、Sparkはそれを状態ストアに格納し、それぞれのマイクロバッチで読み込み、更新、削除します。典型的なステートフルオペレーションはストリーミング集計、ストリーミングのdropDuplicates
、ストリーム間のjoin、mapGroupsWithState
、flatMapGroupsWithState
などがあります。よくある例としては、ビジネスメトリクスを計算するために、group by
メソッドを用いたセッション化や時間ごとの集計が挙げられます。状態ストアのそれぞれのレコードは、ステートフル計算の一部として使用されるキーで特定され、格納される状態データが大量になると、さらに多くのユニークなキーが必要となります。
これらのステートフルなオペレーションを実現するために必要な状態データの量が膨大かつ複雑になると、ワークロードのパフォーマンスが劣化し、レーテンシーが増加し最終的には障害を引きを越すことがあります。状態ストアがレーテンシー増加の原因となっている典型的な兆候は、JVMにおけるガーベージコレクション(GC)による停止です。マイクロバッチの処理時間を監視しているのであれば、マイクロバッチを通じて定常的に増加したり、処理時間が大幅に変化する様子を見て取れます。
状態ストアのデフォルト設定は大部分の一般的なワークロードでは十分なものであり、状態データをエグゼキューターのJVMメモリーに格納します。膨大な数のキー(通常は数百万、この記事のパート2のモニタリング&対応のセクションをお待ちください)はマシンメモリーに膨大なプレッシャーを与え、リソースを解放しようとしてこれらのGCポーズを引き起こす頻度を増加させます。
Databricksランタイム(Apache Spark 3.2+でもサポートされています)では、このメモリーソースのプレッシャーを軽減するために、代替の状態ストアプロバイダーとしてRocksDBを活用することができます。RocksDBは、高速なストレージとしてのキーバリュー用の埋め込み永続化ストアです。すべてがC++で記述され、高速、低レーテンシーストレージとして最適化されたログ構造化データベースエンジンを通じた高パフォーマンスを特徴としています。
状態ストアプロバイダーとしてRocksDBを活用してもマシンメモリーを必要としますが、JVM内の空間を占有せず膨大なキーを必要とするケースにおいて、より効率的状態管理システムを実現します。しかし、これはマイクロバッチごとに追加の処理ステップが必要となるので、利点だけであるとは言えません。JVMにおける状態データストレージからのメモリーのプレッシャーが関係している時以外では、レーテンシーを削減するためにRocksDBを導入することはお勧めしません。RocksDBによる状態ストアは、ストリームのチェックポイントに含まれており、通常の状態ストレージと同じレベルのフォールトトレランスを提供します。
RocksDBの設定はチェックポイントの設定と同様、設計上最小限のものとなっており、全体的なSpark設定で宣言するだけです。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
streamingQueryListener
を用いてストリームを監視しているのであれば、stateOperators
フィールドにRocksDBのメトリクスが含まれていることに気づくでしょう。詳細については、Configure RocksDB state store on Databricksをご覧ください。
大量のキーは、特に無制限あるいは期限切れにならない状態キーを用いている際に、メモリー消費に加えて別の反作用を引き起こすことがあることにも説明します。RocksDBを使用していようがそうでなかろうが、フォールトトレランスのためにアプリケーションの状態もチェックポイントにバックアップされます。期限切れにならないように状態ファイルを作成している場合、必要なストレージ容量は増加し、書き込みや障害から回復する時間も増加することになります。メモリーのデータ(この記事のパート2のモニタリング&対応のセクションをお待ちください)においては、このような状況はアウトオブメモリーのような不明瞭なエラーになり、クラウドストレージに書き込まれるチェックポイントデータに関しては予期しない、理由が説明できない量の増加が認められます。処理されたすべてのデータのストリーミング状態を保持する必要があるというビジネス要件がない限り(あるのは稀です)、Spark Structured Streaming documentationをお読みいただき、不要になった状態レコードをシステムが削除できるようにご自身のステートフルオペレーションを実装するようにしてください(dropDuplicates
やストリーム間joinには注意してください)。
クラスターでの複数ストリームの実行
みなさまのストリームが完全にテストされ、設定されたら、プロダクションでどのように整理するのかを明確にするフェーズとなります。リソース利用率を最大化し、コストを削減するために、同じSparkクラスターで複数のストリームを積み上げるというのはよくあるパターンです。ある時点まではこれで問題ありませんが、パフォーマンスに影響が出る前に一つのクラスターに追加できる数には制限があります。ドライバーは、クラスターで稼働するすべてのストリームを管理しなくてはならず、すべてのストリームはワーカーにおける同一のコアを奪い合うことになります。ご自身のストリームが何を行うのかを理解し、効率的にスタックを構成するために適切にキャパシティを計画する必要があります。
以下に、同じクラスターで複数のストリームを積み上げることを計画する際に考慮すべき事項を示します。
- ドライバーがすべてのストリームを管理するのに十分な大きさであることを確認します。お使いのドライバは高いCPU利用率やガーベージコレクションに苦しんでいませんか?これは、すべてのストリームの管理に苦戦していることを意味します。ストリームの数を減らすか、ドライバーのサイズを大きくしましょう。
- それぞれのストリームが処理するデータの量を検討します。取り込み、真紅に書き込むデータの量が増えると、それぞれのストリームのスループットを最大化するために必要なコアの数も多くなります。どの程度のデータが処理されるのかに応じて、ストリームの数を減らすか、ワーカーの数を増やします。Kafkaのようなソースに対しては、すべてのストリームのすべてのパーティションに対して十分なコア数がない場合には、
minPartitions
オプションを用いて取り込みにどれだけのコア数を使うのかを設定する必要があります。 - ストリームの複雑性やデータボリュームを検討します。すべてのストリームが最小限の操作を行いシンクに追加しているだけならば、それぞれのストリームがマイクロバッチごとに必要なリソースは少なくて済み、より多くのストリームを積み上げることができます。ストリームがステートフルな処理や計算、メモリーを大量に必要とするオペレーションをおこなっているのであれば、優れたパフォーマンスを得るにはよりリソースを必要とし、スタックできるストリームの数は少なくなります。
- スケジューラープールを検討します。ストリームを積み上げる際、全てが同じワーカーの同じコアを使おうとし、大量のコアを必要とするストリームが他のストリームを待たせることになります。スケジューラープールを用いることで、クラスターの別の場所で別のストリームが実行されるようになります。これによって、利用可能なリソースのサブセットを用いて並列にストリームを実行できるようになります。
- SLAを検討します。ミッションクリティカルなストリームがある場合には、ベストプラクティスとしてそれを分離し、重要度の低いストリームの影響を受けないようにします。
Databricksでは、通常お客様はクラスターで10から30のストリームを積み上げていますが、これはユースケースによって異なります。上述した要素を検討し、パフォーマンス、コスト、メンテナンス可能性で優れた体験を得られるようにしてください。
まとめ
ここで取り組んだアイデアのいくつかは間違いなく、詳細な議論のためにより多くの時間と特別な手当が必要なものであり、後程のディープダイブを楽しみにしていてください。しかし、みなさまのジャーニーをスタートする際やプロダクションストリーミング体験を強化する方法を探す際にこれらの推奨事項がお役に立てば幸いです。次の投稿「プロダクションにおけるSparkストリーミング: ベストプラクティスコレクションPart 2」をご覧ください。