0
0

More than 1 year has passed since last update.

Apache Spark™ 3.1リリースにおける構造化ストリーミングの新機能

Last updated at Posted at 2022-06-13

The Improvements for Structured Streaming in the Apache Spark 3.1 Release - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

SparkコアとSQL APIによるストリーミング処理機能の提供とともに、構造化ストリーミングはApache Spark™における最も重要なコンポーネントとなっています。このブログ記事では、新たなストリーミングテーブルAPI、ストリーム-ストリームのjoinのサポート、複数のUIの改善を含む、最新の3.1リリースにおけるSparkストリーミングにおける特筆すべき改善点を要約します。また、スキーマのバリデーションとApache Kafkaのデータソースの改善によって、優れたユーザビリティを提供します。最後になりますが、FileStreamのソース/シンクに対する読み込み/書き込みに対してさまざまなエンハンスメントがなされました。

新たなストリーミングテーブルAPI

構造化ストリーミングを起動すると、連続的なデータストリームは境界のないテーブルと考えることができます。このため、テーブルAPIはストリーミングクエリーを取り扱うための自然かつ便利な手段を提供します。Spark 3.1では、DataStreamReaderとDataStreamWriterのサポートを追加しました。今では、エンドユーザーはストリーミングデータフレームをテーブルとして直接読み書きすることができます。以下のサンプルをご覧ください。

Python
# Create a streaming DataFrame
src = spark.readStream.format("rate").option("rowPerSecond", 10).load()

# Write the streaming DataFrame to a table
src.writeStream.option("checkpointLocation", checkpointLoc1).toTable("myTable")

# Check the table result
spark.read.table("myTable").show(truncate=30)
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2021-01-19 07:45:23.122|42   |
|2021-01-19 07:45:23.222|43   |
|2021-01-19 07:45:23.322|44   |
...

また、これらの新たなオプションによって、ユーザーはソースのデータセットを変換し、新規テーブルに書き込むことができます。

Python
# Write to a new table with transformation
spark.readStream.table("myTable").select("value") \
  .writeStream.option("checkpointLocation", checkpointLoc2) \
  .format("parquet").toTable("newTable")

# Check the table result
spark.read.table("newTable").show()
+-----+
|value|
+-----+
| 1214|
| 1215|
| 1216|
...

ストリーミングテーブルAPIにおいては、以下のことが可能となるDelta Lakeフォーマットを使うことをお勧めします。

  • 同時に実行される低レーテンシーのデータ取り込みによって生成される小規模ファイルのコンパクト化
  • 1つ以上のストリーム(あるいは同時実行のバッチジョブ)による「一度のみ(exactly-once)」の処理の維持
  • ファイルをストリームのソースとして使用する際、新規ファイルを効率的に特定

新たなストリーム-ストリームjoinのサポート

Spark 3.1までは、ストリーム-ストリームのjoinにおいては、inner、left outer、right outer joinしかサポートされていませんでした。最新リリースでは、full outerとleft semiのストリーム-ストリームjoinを実装したので、より多くのシナリオで構造化ストリーミングを活用できるようになりました。

Kafkaデータソースの改善

Spark 3.1では、Kafkaの依存ライブラリを2.6.0(SPARK-32568)にアップグレードしましたので、ユーザーはKafkaのオフセット取得(AdminClient.listOffsets)に対する新たなAPIに移行することができます。これによって、古いバージョンを使用した際にKafkaコネクターが永遠に待ち続けるという問題(SPARK-28367)を解決することができます。

スキーマの検証

スキーマは、構造化ストリーミングクエリーにおける重要な情報です。Spark 3.1において、我々はユーザー入力のスキーマと内部の状態ストアの両方に対してスキーマ検証ロジックを追加しました。

クエリー再起動間のステートスキーマ検証の導入(SPARK-27237)

このアップデートによって、ストリーム開始時に、キーバリューのスキーマはスキーマファイルに格納されます。新たなキーバリュースキーマは、クエリーの再起動時に既存のスキーマとの互換性が検証されます。フィールドの数が同じで、それぞれのフィールドのデータ型が同じ場合にステートスキーマは「互換性がある」とみなされます。Sparkでは名前変更が可能なので、ここではフィールド名をチェックしないことに注意してください。

これによって、互換性のないステートスキーマによるクエリーの実行を防ぐことができ、非決定論的挙動の可能性を削減し、より意味のあるエラーメッセージを出力することができます。

ストリーミングステートストアにおけるスキーマ検証の導入(SPARK-31894)

これまでは、構造化ストリーミングはいかなるスキーマ検証を行うことなしにStateStoreに直接(UnsafeRowとして表現される)チェックポイントを書き込んでいました。新たなSparkバージョンにアップグレードすると、チェックポイントファイルは再利用されます。スキーマ検証なしには、集計関数に関連する変更、バグフィックスはランダムな例外を引き起こす可能性があり、異なる結果を返すこともあります(例 SPARK-28067)。今では、Sparkはスキーマに対してチェックポイントを検証し、移行中にチェックポイントが再利用されるとInvalidUnsafeRowExceptionがスローされます。この動作によって、Spark 3.0.1におけるブロッカーSPARK-31990: ストリーミングのステートストアの互換性が破壊される、の発見に役立ったことをお伝えしておきます。

構造化ストリーミングUIのエンハンス

Spark 3.0において新たな構造化ストリーミングUIを導入しました。Spark 3.1においては、構造化ストリーミングUIでヒストリーサーバーのサポート(SPARK-31953)とストリーミングの実行状態に対して更なる情報を追加しました。

構造化ストリーミングUIにおけるステート情報(SPARK-33223)

ステート情報に4つのメトリクスが追加されました。

  1. 合計ステート行の集計数
  2. 更新されたステート行の集計数
  3. 使用される集計ステートメモリーのバイト数
  4. ウォーターマークによって削除されたステート行の集計数

これらのメトリクスによって、ステートストアの全体像を得ることができます。キャパシティプランニングのような新機能を追加することが可能となります。

  • 構造化ストリーミングUIにおけるウォーターマークギャップ情報(SPARK-33224)

ウォーターマークは、エンドユーザーがステートフルクエリーを追跡するのに必要な主要なメトリクスの一つです。ウォーターマークはappendモードにおいて「いつ」アウトプットが放出されるのかを定義しており、ウォールクロックとウォーターマーク(入力データ)の間のギャップがどのくらいなのかを知ることは、出力に対する期待値を設定するのに役立ちます。

  • SS UIにステートカスタムメトリクスを表示 (SPARK-33287)

以下では、Spark設定spark.sql.streaming.ui.enabledCustomMetricListに設定されたカスタムメトリクスを表示しています。

FileStream ソース/シンクのエンハンス

FileStreamのソース/シンクに幾つかの改善が加えられました。

maxFilesPerTrigger以上の取得ファイルのリストを読み込んでいないファイルとしてキャッシュ(SPARK-30866)

これまでは、maxFilesPerTriggerが設定された際、マイクロバッチごとにFileStreamソースは利用できる全てのファイルを取得し、設定に基づいて限られた数のファイルを処理し、残りを無視します。この改善によって、前回のバッチで取得されたファイルをキャッシュし、以降のバッチでそれらを利用します。

ファイルストリームソースとシンクのメタデータログに対するロジックの整理(SPARK-30462)

この変更前は、FileStreamソース/シンクでメタデータが必要とされた時は常に、メタデータログの全てのエントリーがSparkドライバーのメモリにデシアライズされました。この変更によってSparkは、可能な限り整理された形でメタデータログを読み込み処理します。

出力ファイルの保持期間に対する新たなオプションの提供(SPARK-27188)

FileStreamシンクにおけるメタデータログファイルの保持期間を設定する新たなオプションを提供しており、長い期間実行される構造化ストリーミングクエリーのメタデータログのファイルサイズの拡大を制限する助けになります。

次に来るのは

次のメジャーリリースに向けても、我々は構造化ストリーミングにおける新機能、性能、使いやすさの改善にフォーカスしていきます。エンドユーザーやSpark開発者としての皆様のフィードバックを心待ちにしています!フィードバックがございましたら、Sparkのユーザー開発者のメーリングリストでシェアしてください。これらの重要なエンハンスメントを現実のものにするのを助けてくれたコミュニティの全ての貢献者とユーザーに感謝の意を表します。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0