3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Pythonデータフレームを用いた構造化ストリーミング

Last updated at Posted at 2021-12-17

Demo notebooks | Databricks on AWS [2021/9/24時点]のStructured Streaming demo Python notebookを翻訳・説明する記事です。

Apache Sparkには、高レベルのストリーミング処理APIである構造化ストリーミングが含まれています。ここで説明するノートブックでは、Databricksで構造化ストリーミングアプリケーションを構築するためにどのようにデータフレームAPIを活用するのかをクイックに見てみます。ここでは、ストリームにおけるタイムスタンプを伴うアクション(Open、Closeなど)に対するウィンドウ内のカウントや連続的に更新されるカウントのようなリアルタイムのメトリクスを計算します。

サンプルノートブックの説明

サンプルデータ

ここでアプリケーションを構築するために用いるサンプルのアクションデータは、/databricks-datasets/structured-streaming/events/にファイルとして格納されています。ディレクトリの中身を見てみましょう。

%fs ls /databricks-datasets/structured-streaming/events/

Screen Shot 2021-12-17 at 9.42.04.png

ディレクトリには約50個のJSONファイルが格納されています。個々のJSONファイルの中身を見てみましょう。

%fs head /databricks-datasets/structured-streaming/events/file-0.json

Screen Shot 2021-12-17 at 10.48.30.png

ファイル内のそれぞれの行には、2つのフィールドtimeactionが含まれています。インタラクティブにこれらのファイルを解析してみましょう。

バッチ/インタラクティブ処理

データを処理する最初のステップは、通常はデータに対するインタラクティブなクエリーの実行です。ファイルに対する静的なデータフレームを定義し、テーブル名をつけましょう。

Python
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# すでにデータのフォーマットを知っているので、処理を高速化するためにスキーマを定義しましょう(Sparkがスキーマを推定する必要がなくなります)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# JSONファイルのデータを表現する静的なデータフレーム
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

Screen Shot 2021-12-17 at 9.44.36.png

これで、1時間のタイムウィンドウにおけるopencloseアクションの数を計算することができます。このためには、actionカラムとtimeカラムに対する1時間のウィンドウでグルーピングを行います。

Python
from pyspark.sql.functions import *      # window()関数を使用するために必要

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)
staticCountsDF.cache()

# データフレームをテーブル'static_counts'として登録します 
staticCountsDF.createOrReplaceTempView("static_counts")

これで、テーブルに対してSQLを用いて直接クエリーを実行できます。例えば、以下のように全ての時間におけるアクションの総数を計算します。

SQL
%sql select action, sum(count) as total_count from static_counts group by action

Screen Shot 2021-12-17 at 9.45.23.png

ウィンドウ内のカウントはどうでしょうか?

SQL
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

Screen Shot 2021-12-17 at 9.46.04.png

グラフの最後の2つに注意してください。対応するOpenアクションの後にCloseアクションが生じるように生成されており、最初に多くの"Open"が存在し、最後に多くの"Close"が存在しています。

ストリーム処理

ここまではデータをインタラクティブに分析しましたが、データの到着に伴って連続的に更新するストリーミングクエリーに切り替えましょう。今回は一連の静的なファイルがあるのみなので、一度に一つのファイルを時系列に読み込むことでストリームをシミュレートします。ここで記述すべきクエリーは、上述のインタラクティブなクエリーと非常に似たものとなります。

Python
from pyspark.sql.functions import *

# 上で定義したstaticInputDFの定義と似ていますが、`read`ではなく`readStream`を使用します
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # JSONデータのスキーマを設定
    .option("maxFilesPerTrigger", 1)  # 一度に一つのファイルを取り込むことで一連のファイルをストリームとして取り扱います
    .json(inputPath)
)

# staticInputDFと同じクエリー
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# このデータフレームはストリーミングデータフレームでしょうか?
streamingCountsDF.isStreaming

Screen Shot 2021-12-17 at 9.47.11.png

上でわかるように、streamingCountsDFはストリーミングデータフレームです(streamingCountsDF.isStreamingtrueでした)。sinkを定義し、起動することで、ストリーミング処理をスタートすることができます。

今回のケースでは、カウントをインタラクティブに取得(上と同じクエリー)したいので、インメモリのテーブルに1時間ごとのカウントをセットします。

Python
spark.conf.set("spark.sql.shuffle.partitions", "2")  # シャッフルのサイズを小さくします

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = インメモリテーブルに格納 
    .queryName("counts")     # counts = インメモリテーブルの名称
    .outputMode("complete")  # complete = 全てのカウントをテーブルに保持
    .start()
)

Screen Shot 2021-12-17 at 9.47.55.png

queryはバックグラウンドで実行されるストリーミングクエリーのハンドルとなります。このクエリーは継続的にファイルを取得し、ウィンドウ内のカウントを更新します。

上のセルのクエリーのステータスに注意してください。プログレスバーはクエリーがアクティブであることを示しています。さらに、上の> countsを展開すると、すでに処理されたファイルの数を確認することができます。

いくつかのファイルが処理されるまで待ち、インメモリのcountsテーブルに対してインタラクティブにクエリーを実行してみましょう。

Python
from time import sleep
sleep(5)  # 計算がスタートするまで少し待ちます
SQL
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

Screen Shot 2021-12-17 at 9.48.45.png

クエリーを実行するごとに結果が更新されていることがわかります。

Screen Shot 2021-12-17 at 9.48.56.png

Screen Shot 2021-12-17 at 9.49.10.png

上のクエリーを繰り返し実行し続けることで、"Close"が常に対応する"Open"の後に出現するデータストリームで期待されたように、常に"Close"の数より"Open"の数が多いことを確認することができます。これは、構造化ストリーミングがprefix integrityを保証していることを示しています。prefix integrityの詳細を知りたい方は以下のブログ記事を参照ください。

Spark Structured Streaming - The Databricks Blog

ここでは少数のファイルしか取り扱っていないので、全てのファイルを処理してしまうとカウントは更新されなくなります。再度ストリーミングクエリーを操作したい場合には、クエリーを際実行してください。

最後に、クエリーのセルのCancelリンクをクリックするか、query.stop()を実行することで、バックグラウンドで実行しているクエリーを停止します。いずれの方法でも、クエリーが停止されると対応するセルのステータスは自動的にTERMINATEDとなります。

サンプルノートブック

Databricks 無料トライアル

Databricks 無料トライアル

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?