Help us understand the problem. What is going on with this article?

dataflowを使って簡単なstream処理を試す

本記事はFringe81アドベントカレンダー2019の8日目の投稿です。
今年はStream処理がどんなものなのか、Scioでdataflowのstream処理を実装して試してみました。
dataflow(GCP)のドキュメントでも触れられている有名なストリーム処理の記事がなかなかハードな内容だったので理解のため実際に動かして試してみることにしました。
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
Scioを使ったdataflow実装については以前の投稿も参照くださいm(--)m

Stream処理とは?(batch処理との比較)

  • 無限データ(継続した区切りのない連続データ)を扱う
  • よりタイムリーで即時性あるデータ処理を実行

具体的に長所短所(batch処理との比較)

  • 長)データ入ってくるタイミングで即座に処理できる。
  • 短)常に処理を待ち受けるため、コストがかかる
  • 短)即時処理のために色々考慮することがあって複雑性が増す。

処理単位の種類

連続データのStream処理を考えた場合以下のパターンが考えられる

  • Fixed time windows
    image.png

  • Sliding time windows
    image.png

  • Session windows
    image.png

cf.https://beam.apache.org/documentation/programming-guide/#windowing

まずはシンプルに「Fixed time windows」の処理を実装する

  • 固定window期間を2分に設定 こうすることで2分ごとに集計結果が更新される
def calculate1(infos: SCollection[GameActionInfo]): SCollection[(String, Int)] =
    infos.withFixedWindows(Duration.standardMinutes(2))
      .map(i => (i.team, i.score))
      .sumByKey

※ただし、集計間隔の2分より遅れて到達したデータは漏れてしまう。
スクリーンショット 2019-12-08 12.55.37.png
cf. https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

「Stream処理を実装する=windowから外れた値をどう処理するか」を扱う概念

  • Watermark
    どの値までデータ処理に渡されたかの目安(推測値)。

  • Trigger
    windowに溜まったpane(中身)をどのタイミングで処理するか定義

  • Accumulation
    同一windowの複数結果を算出する場合の関係性や動作を定めるもの

これらを取り入れて実装すると以下になります。
これにより、window間隔に対するある程度の遅れデータまで考慮した集計が可能となって信頼性が上がる処理ができました。

def calculate2(infos: SCollection[GameActionInfo]): SCollection[(String, Int)] =
    infos.withFixedWindows(Duration.standardMinutes(2),
      options = WindowOptions(
        trigger = AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardMinutes(1)))
          .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardMinutes(5))),
        accumulationMode = ACCUMULATING_FIRED_PANES,
        allowedLateness = Duration.standardMinutes(600)))
      .map(i => (i.team, i.score))
      .sumByKey

やってみて

stream処理がどんなものか試してみるため、比較的シンプルなところを実装してみました。
stream処理は文章で読むよりも実装して試した方が理解しやすいと思ったので、興味ある方は先ず動かしてみるをオススメします!!

ソースコード

https://github.com/kaz3284/4qiita/tree/master/dataflowstreaming

参照元

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
https://beam.apache.org/documentation/programming-guide/

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした