Python Arbitrary Stateful Processing in Structured Streaming - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
構造化ストリーミングにおける、状態を保つPython関数の実行方法
増加し続けるお客様からの需要に応えるために、リアルタイムの分析や機械学習ワークロードをのために、より多くのユーザーがDatabricksを活用しています。これが、Apache Spark™の構造化ストリーミングのレーテンシー、機能性、エコシステムとのコネクター、オペレーション乗しやすさの改善を狙いとしたProject Lightspeedをスタートした理由です。リアルタイムのストリーム処理においては、新規データは定常的に生成され、データを継続的な方法で処理する必要があります。例えば、特定のユーザーに対して短い時間ウィンドウにおいて、クレジットカードの連続的なトランザクションを追跡し、カウントするというものが挙げられます。継続的処理を実行するためには、多くの場合で最終的な結果を計算するまでに、状態ストアで中間結果を処理し、維持し続ける必要が出てきます。この様な高度な処理要件に応えるために、構造化ストリーミングでは任意のステートフルオペレーションを提供していますが、これまではScalaユーザーしかこの機能にアクセスすることができませんでした。
本日、我々はPySparkによる構造化ストリーミングにおける任意のステートフルオペレーションのサポートとセッションウィンドウシナリオのサンプルコードをご紹介します。これによって、SparkでPythonを使用するケースで、膨大な数のリアルタイム分析や機械学習ユースケースを解放します。この機能はDatabricksランタイム11.3以降、そして間も無くリリースされるSpark 3.4.0で利用できます。
DataFrame.groupby.applyInPandasWithState
任意ステートフルオペレーションでユーザーが利用するPySpark APIは、Scalaのそれとは若干異なります。Scalaでは、Dataset.groupByKey.mapGroupsWithStateとDataset.groupByKey.flatMapGroupsWithStateが任意ステートフルオペレーションをサポートしています。両方のメソッドは静的に型付けされますが、Python言語は代わりに動的に型付けされ、PySparkのDataFrame.groupby.applyInPandas APIとアラインしています。動的型付けがあるため、PySparkのステートはユーザーが指定するスキーマとマッチするタプルの格納のみをサポートしています。以下にAPIのシグネチャを示します。
def applyInPandasWithState(
self,
func: Callable,
# Schema for the output DataFrame.
outputStructType: Union[pyspark.sql.types.StructType, str],
# Schema for the state
stateStructType: Union[pyspark.sql.types.StructType, str],
# Output mode such as "Update"
outputMode: str,
# Timeout such as "NoTimeout"
timeoutConf: str,
) -> DataFrame
ユーザーが指定するPython関数のシグネチャは以下の様になります。
def func(
# Grouping keys.
key: Tuple,
# Iterator of batches for one group.
pdf_iter: Iterator[pandas.DataFrame],
# State to get and/or update. Identical with Scala API.
state: pyspark.sql.streaming.state.GroupState
) -> Iterator[pandas.DataFrame]
ユーザーは、ステートを取得・アップデートするユーザー定義関数を呼び出すことができます。
def func(
key: Tuple, pdfs: Iterator[pd.DataFrame], state: GroupState
) -> Iterator[pd.DataFrame]:
for pdf in pdfs
# Do something with `state`.
s: Tuple = state.get
state.update((..., ..., ...))
yield pdf
df.groupBy(df["..."]).applyInPandasWithState(
func,
StructType(...),
StructType(...),
"...",
GroupStateTimeout.NoTimeout
)
セッションウィンドウによるワードカウントのシナリオ
このセクションでは、単語数をカウントする実際のセッションウィンドウシナリオのサンプルをウォークスルーします。以下のコードスニペットをDatabricksノートブックやpysparkシェルにコピーアンドペーストすることができます。ぜひ試してみてください!
このサンプルでは、ストリーミングでテキストファイルの単語を取り込み、デフォルトは10秒の指定されたセッションタイムアウト期間で集計された単語と単語数を表示します。セッションのステートは、単語とカウントを保持し、10秒以上の間入力に単語が存在しなくなるまで集計を行い、後ほどそれらを表示します。

このサンプルにおけるストリーミング入力には以下が含まれます。
- 最初の入力には1つのグレープ、2つのバナナ、3つのアップルが含まれます。
- その後、次の入力には合計10秒間において1秒ごとに3つのバナナ、3つのグレープがそれぞれ含まれます。
このため、コンソールの出力は以下の様になります。
- 10秒の後、最後の10秒間にはappleが含まれていないので、単語
appleにはカウント3がマッピングされます。 - 20秒後、最後の10秒ではbananaとgrapeが含まれていないので、単語
grapeにはカウント31(1 + 3 * 10)、bananaにはカウント32(2 + 3 * 10)がマッピングされます。
この様に、appleセッションウィンドウは10秒継続し、grapeとbananaのセッションウィンドウは20秒継続します。
それでは、サンプルシナリオを試してみましょう。必要なPythonクラスとパッケージをインポートし、入力ディレクトリwords_dirを作成します。
import os
import shutil
import math
import time
from typing import Tuple, Iterator
from pathlib import Path
from functools import reduce
import pandas as pd
from pyspark.sql.functions import current_timestamp, explode, split
from pyspark.sql.streaming.state import GroupState, GroupStateTimeout
# NOTE: set `basedir` with the fused path, e.g., "/dbfs/tmp" in Databricks
# notebook.
basedir = os.getcwd() # "/dbfs/tmp"
# My text files containing words will be created in this directory later
# after cleaning 'words_dir' directory up in case you already ran this
# example below.
words_dir = os.path.join(basedir, "words_dir")
shutil.rmtree(words_dir, ignore_errors=True)
os.makedirs(words_dir)
次のステップでは、作成したディレクトリにあるテキストファイルのすべての単語を読み込んで取り込むクエリーを定義します。
# Now, start a streaming query that ingests 'words_dir' directory.
# Every time when there are new text files arriving here, we will process them.
lines = spark.readStream.text(Path(words_dir).as_uri())
# Split the lines into words.
events = lines.select(explode(split(lines.value, " ")).alias("session"))
そして、DataFrame.groupby.applyInPandasWithStateを用いてセッションウィンドウのロジックを定義します。我々のユーザー定義関数は、単語ごとのカウントを集計し、セッションステートに格納します。それぞれのステートが10秒のタイムアウトに達するとステートを読み込み、コンソールに表示される結果を返却します。
def func(
key: Tuple[str], pdfs: Iterator[pd.DataFrame], state: GroupState
) -> Iterator[pd.DataFrame]:
if state.hasTimedOut:
(word,) = key
(count,) = state.get
state.remove()
yield pd.DataFrame({"session": [word], "count": [count]})
else:
# Aggregate the number of words.
count = sum(map(lambda pdf: len(pdf), pdfs))
if state.exists:
(old_count,) = state.get
count += old_count
state.update((count,))
# Set the timeout as 10 seconds.
state.setTimeoutDuration(10000)
yield pd.DataFrame()
# Group the data by word, and compute the count of each group
output_schema = "session STRING, count LONG"
state_schema = "count LONG"
sessions = events.groupBy(events["session"]).applyInPandasWithState(
func,
output_schema,
state_schema,
"append",
GroupStateTimeout.ProcessingTimeTimeout,
)
# Start running the query that prints the windowed word counts to the console.
query = sessions.writeStream.foreachBatch(lambda df, _: df.show()).start()
次に、クエリーに入力を提供します。最初にセッションストアに1つのgrape、2つのbanana、3つのappleを書き込みます。次に、合計10秒間において1秒ごとに3つのbanana、3つのgrapeを書き込みます。
# Now, we will write words to be processed in a streaming manner
# Write 1 banana, 2 grapes, and 3 apples.
with open(os.path.join(words_dir, "words.txt"), "w") as f:
_ = f.write("banana grape apple\n")
_ = f.write("banana apple apple\n")
# Write 3 bananas and 3 grapes every second for 10 seconds.
for i in range(10):
time.sleep(1)
with open(os.path.join(words_dir, f"words_{i}.txt"), "w") as f:
_ = f.write("banana banana banana\n")
_ = f.write("grape grape grape\n")
# Wait enough for the query to finish the input.
time.sleep(60)
query.stop()
appleセッションウィンドウは合計10秒継続し、コンソールにはセッションウィンドウで集計された3つのappleが表示されます。過去10秒間でappleが出現しなかったので、appleのセッションウィンドウはここで終了します。grapeとbananaのセッションウィンドウの場合、最初の10秒でgrapeとbananaが出現しているため20秒継続しており、コンソールには31個のgrapeと32個のbananaが表示されます。
...
+-------+-----+
|session|count|
+-------+-----+
| apple| 3|
+-------+-----+
...
+-------+-----+
|session|count|
+-------+-----+
| grape| 31|
| banana| 32|
+-------+-----+
...
この機能はDatabricksランタイム11.3、そして間も無くリリースされるApache Spark 3.4.0で利用することができます。すぐにDBR 11.3でこの新機能を試してみてください!
