0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

StructuredStreamingでJOINしているstatic Dataframeの更新を反映させる方法

Posted at

はじめに

Structured Streamingにて、Kafkaからのストリームデータと、HDFS上のテーブルをJOINしており、HDFS上のテーブルは一日に一度のバッチ処理にて更新しています。

このような状況でテーブルの更新をストリーム結果に反映させるのに苦労したので、その共有です。

はじめに断っておきますが、今回の投稿は100%の自信があるわけではないので、参考にするのは自己責任でお願いします。むしろ、よりいい方法等あればご指摘ください。

ただ、いまのところ、正しく処理できています。

参考にした情報

Sparkのメーリングリストは、databricksの人が返答してくれているので、信じて問題ないと思います。ただ、2017年3月なので少し情報が古い可能性があります。

そちらでは、「kafkaからのストリームデータとS3のデータをJOINをStructuredStreamingで処理しているが、S3の更新が週に1度あり、それを反映させるのはどうすればよいか」が議論されており、結論としては、以下の手順を踏む必要があるようです。つまり、全部やり直しということですね。。

  • static DataFrameを再作成する(例ではS3)
  • streaming DataFrameを再作成する(例ではKafka)
  • queryをrestart

アプリケーション全体をrestartするよりは十分早く、10秒のレイテンシが許容できるなら問題ないとのことです。

では、クエリをrestartってどうやってやんねんというと、How to Shutdown a Spark Streaming Job Gracefullyで紹介されているGitHubに上がっているサンプルコードが参考になります。こちらも2017年2月と少し古いですが、databricksの人なので信用していいかと思います。

ただし、Spark Streamingなので、Structured StreamingのAPIに読み替える必要があります。
以下の二通りの方法で実装し、両方とも実際に動作確認いたしました。

定期的にrestart

言語はPythonです。


def launch():

    kafkaDataFrame = spark.readStream.format("kafka") \
                        .option("kafka.bootstrap.servers","......") \
                        .option("subscribe", "....") \
                        .load()

    thresholdDataFrame = spark.read.format("parquet") \
                            .load("hdfs://....")
    # persistする
    thresholdDataFrame.persist()

    # いろいろ処理
    
    # いろいろ処理後、JOIN
    withThreshold = kafkaDataFrame \
                    .join(
                        thresholdDataFrame,
                        (hour(col("window.start")) == thresholdDataFrame.hour)
                        &
                        ....
                    ) \
                    .select(...)


    kafkaQuery = withThreshold \
                    .selectExpr("to_json(struct(*)) AS value") \
                    .writeStream \
                    .outputMode("append") \
                    .format("kafka") \
                    .......
                    .start()


    isStopped = False
    # 半日に一度restartする
    checkIntervalSeconds = 12 * 60 * 60

    print("calling awaitTermination")
    isStopped = kafkaQuery.awaitTermination(checkIntervalSeconds)

    if isStopped:
        print("confirmed! The streaming context is stopped. Exiting application...")
    else:
        print("Streaming App is still running. Timeout...")

    if not isStopped:
        print("stopping kafkaQuery right now")
        kafkaQuery.stop()
        print("kafkaQuery is stopped!!!!!!!")
        print("restarting query!!!!!!!")
        launch()


こちらですが、地味なハマりポイントがあり、4,5時間つぶしてしまいました。

わかってしまえば、非常に単純なのですが、awaitTerminationのscalaのAPIはmillisecondを指定するのですが、pythonのAPIはsecondを指定します。

scalaのドキュメントのみ読んでいたので、それに気づかず、テストとして1分に設定しても全く処理が返らないのであきらめて、苦し紛れに次の章の例外を発生させる方法を思いつきました。

そして、この記事を書いている途中に↑の違いに気づいたというのが正直なところです笑

更新時に例外を発生させて、catchしてrestart


def launch():

    kafkaDataFrame = spark.readStream.format("kafka") \
                        .option("kafka.bootstrap.servers","......") \
                        .option("subscribe", "....") \
                        .load()

    thresholdDataFrame = spark.read.format("parquet") \
                            .load("hdfs://....")
    # persistしない!!!!
    # thresholdDataFrame.persist()

    # いろいろ処理
    
    # いろいろ処理後、JOIN
    # ...
    # ...

    try:
        kafkaQuery.awaitTermination()
    except StreamingQueryException as e:
        # おそらく不要だが念のため
        kafkaQuery.stop()
        # バッチ処理によってthresholdDataFrameが更新された場合リスタート
        if re.search(r"java\.io\.FileNotFoundException.*part.*parquet", traceback.format_exc()):
            print("===================================================")
            print("detected static dataframe was updated!")
            print("restarting query")
            print("===================================================")
            launch()

少しトリッキーな方法です。

thresholdDataFrameが参照しているHDFSファイルはmode("overwrite")にて指定したパスに上書きをしています。
その場合、読み込んだdataframeに対してpersistを呼ばないと更新時に以下の例外が発生します。

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 22.0 failed 1 times, most recent failure: Lost task 1.0 in stage 22.0 (TID 1440, localhost, executor driver): java.io.FileNotFoundException: File does not exist: hdfs://server/path/to/part-00034-afcdb8cd-0d97-4a47-81fa-99540fdc6f0c-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)

この例外は、structuredStreamingに限った話ではなく、

df=spark.read.parquet("hdfs://server/path/to")
# 他のターミナルでhdfs://server/path/toを上書きする
df.show()

のような処理でも同様の例外が発生します。これはおそらく、RDDの処理としてファイルの読み込みが遅延されるとはいえ、実際に存在するファイルパスなどのメタ情報をspark.readの時点でキャッシュとして保持しているからかと思われます。

この例外を避けたければ、書き込み方法をmode("append")をすればよいですが、そうするとファイルの意味する情報も変わってくるでしょう。

また、df.persistを呼んであげれば、実際のデータを初期に読み込んで永続化してくれるので、例外は発生しません。

ここでは、その性質を逆に利用し、あえて例外を発生させ、catchし、HDFSファイルの更新による例外発生の時のみ、restartしてみました。

どちらが良いか

後者の方法では、バッチ更新時にすぐに反映させられるというメリットはあるものの、設計としてはイケてないきがしますし、persistが使えないですし、基本的には前者でいいのではないでしょうか

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?