Structured Streaming tutorial | Databricks on AWS [2020/12/10時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
センサー、IoTデバイス、ソーシャルネットワーク、オンライントランザクションは、定常的に監視し、迅速にアクションを取る必要のあるデータを生成します。結果として、大規模かつリアルタイムなストリーム処理の必要性が以前以上に高まっています。このチュートリアルモジュールでは、Apache Sparkにおいてストリーミングデータセットを取り扱うメインのモデルである構造化ストリーミングをご紹介します。構造化ストリーミングにおいては、データストリームは継続的に追加が行われるテーブルとして取り扱われます。これによって、バッチ処理モデルと類似したストリーム処理モデルを導入することができます。皆様がお使いのストリーミング処理を静的なテーブルに対する標準的なバッチクエリーとして表現しますが、Sparkは終わりのない入力テーブルに対するインクリメンタルなクエリーとして処理を実行します。
入力テーブルとして入力データストリームを考えてみます。ストリームに到着するそれぞれのデータアイテムは、入力テーブルに追加される新規の行のようなものと考えられます。
入力に対するクエリーは、結果テーブルを生成します。トリガー間隔(例えば、1秒)ごとに新規の行が入力テーブルに追加され、最終的には結果テーブルを更新します。結果テーブルが更新された時には常に、変更された行が外部のシンクに書き込まれます。外部ストレージに何を書き込むのかによって出力が定義されます。異なるモードで出力を定義することができます。
- Completeモード: 外部ストレージに更新された結果テーブルの全体が書き込まれます。テーブル全体の書き込みをどのように取り扱うのかはストレージコネクターに依存します。
- Appendモード: 外部ストレージに最後のトリガーが書き込まれて以降の新規行のみが結果テーブルに追加されます。これは、結果テーブルの既存の行が変更されないクエリーにおいてのみ適用することができます。
- Updateモード: 外部ストレージに最後のトリガーが書き込まれて以降の更新行のみが結果テーブルに追加されます。Updateモードは最後のトリガー以降に変更された行のみを出力するので、Completeモードとは異なります。クエリーに集計処理が含まれていない場合、これはAppendモードと同じものとなります。
このチュートリアルモジュールでは、以下のことを学びます。
このモジュールで説明されているコードにアクセスし、実行できるようにサンプルノートブックをインポートすることができます。
サンプルデータのロード
構造化ストリーミングをスタートする最も簡単な方法は、Databricksワークスペースの/databricks-datasets
フォルダーにあるサンプルDatabricksデータセットを使うことです。構造化ストリーミングアプリケーションの構築に使うことができるサンプルイベントデータを/databricks-datasets/structured-streaming/events/
に格納しています。ディレクトリの中を見てみましょう。
各行のファイルには2つのフィールドtime
、action
を持つJSONが含まれています。
{"time":1469501675,"action":"Open"}
{"time":1469501678,"action":"Close"}
{"time":1469501680,"action":"Open"}
{"time":1469501685,"action":"Open"}
{"time":1469501686,"action":"Open"}
{"time":1469501689,"action":"Open"}
{"time":1469501691,"action":"Open"}
{"time":1469501694,"action":"Open"}
{"time":1469501696,"action":"Close"}
{"time":1469501702,"action":"Open"}
{"time":1469501703,"action":"Open"}
{"time":1469501704,"action":"Open"}
ストリームの初期化
サンプルデータは単なる静的なファイルなので、ファイルが作成された時系列順にファイルを読み込むことでストリームをシミュレートします。
from pyspark.sql.types import *
from pyspark.sql.functions import *
inputPath = "/databricks-datasets/structured-streaming/events/"
# Define the schema to speed up processing
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])
streamingInputDF = (
spark
.readStream
.schema(jsonSchema) # Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
.json(inputPath)
)
streamingCountsDF = (
streamingInputDF
.groupBy(
streamingInputDF.action,
window(streamingInputDF.time, "1 hour"))
.count()
)
ストリーミングジョブの開始
シンクを定義しスタートすることで、ストリーミング処理を開始します。ここでは、インタラクティブにカウントをクエリーするために、インメモリーテーブルに1時間のカウントのcompleteセットを設定します。
query = (
streamingCountsDF
.writeStream
.format("memory") # memory = store in-memory table (for testing only)
.queryName("counts") # counts = name of the in-memory table
.outputMode("complete") # complete = all the counts should be in the table
.start()
)
バックグラウンドで実行されるcounts
という名前のストリーミングクエリーを取り扱うためのハンドルがquery
となります。このクエリーは、継続的にウィンドウで仕切られたカウントを更新し、ファイルを取得します。
counts
を展開すると、処理されたレコード数、バッチの統計情報、集計の状態を表示するダッシュボードを参照することができます。
ストリームに対してインタラクティブなクエリーを実行
counts
の集計結果に対して定期的にクエリーを実行することができます。
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
一連のスクリーンショットからわかるように、入力ストリームデータに基いたアクションのカウントを反映されるので、クエリーの都度結果が変わります。
ノートブック
これらのコードサンプルにアクセスするには、以下のノートブックをインポートしてください。他の構造化ストリーミングのサンプルについては、Structured Streamingを参照ください。
Apache Spark構造化ストリーミングのPythonノートブック