LoginSignup
8
0

More than 3 years have passed since last update.

dataflowを使って簡単なstream処理を試す

Last updated at Posted at 2019-12-08

本記事はFringe81アドベントカレンダー2019の8日目の投稿です。
今年はStream処理がどんなものなのか、Scioでdataflowのstream処理を実装して試してみました。
dataflow(GCP)のドキュメントでも触れられている有名なストリーム処理の記事がなかなかハードな内容だったので理解のため実際に動かして試してみることにしました。
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
Scioを使ったdataflow実装については以前の投稿も参照くださいm(--)m

Stream処理とは?(batch処理との比較)

  • 無限データ(継続した区切りのない連続データ)を扱う
  • よりタイムリーで即時性あるデータ処理を実行

具体的に長所短所(batch処理との比較)

  • 長)データ入ってくるタイミングで即座に処理できる。
  • 短)常に処理を待ち受けるため、コストがかかる
  • 短)即時処理のために色々考慮することがあって複雑性が増す。

処理単位の種類

連続データのStream処理を考えた場合以下のパターンが考えられる

  • Fixed time windows
    image.png

  • Sliding time windows
    image.png

  • Session windows
    image.png

cf.https://beam.apache.org/documentation/programming-guide/#windowing

まずはシンプルに「Fixed time windows」の処理を実装する

  • 固定window期間を2分に設定 こうすることで2分ごとに集計結果が更新される
def calculate1(infos: SCollection[GameActionInfo]): SCollection[(String, Int)] =
    infos.withFixedWindows(Duration.standardMinutes(2))
      .map(i => (i.team, i.score))
      .sumByKey

※ただし、集計間隔の2分より遅れて到達したデータは漏れてしまう。
スクリーンショット 2019-12-08 12.55.37.png
cf. https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

「Stream処理を実装する=windowから外れた値をどう処理するか」を扱う概念

  • Watermark
    どの値までデータ処理に渡されたかの目安(推測値)。

  • Trigger
    windowに溜まったpane(中身)をどのタイミングで処理するか定義

  • Accumulation
    同一windowの複数結果を算出する場合の関係性や動作を定めるもの

これらを取り入れて実装すると以下になります。
これにより、window間隔に対するある程度の遅れデータまで考慮した集計が可能となって信頼性が上がる処理ができました。

def calculate2(infos: SCollection[GameActionInfo]): SCollection[(String, Int)] =
    infos.withFixedWindows(Duration.standardMinutes(2),
      options = WindowOptions(
        trigger = AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardMinutes(1)))
          .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardMinutes(5))),
        accumulationMode = ACCUMULATING_FIRED_PANES,
        allowedLateness = Duration.standardMinutes(600)))
      .map(i => (i.team, i.score))
      .sumByKey

やってみて

stream処理がどんなものか試してみるため、比較的シンプルなところを実装してみました。
stream処理は文章で読むよりも実装して試した方が理解しやすいと思ったので、興味ある方は先ず動かしてみるをオススメします!!

ソースコード

参照元

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
https://beam.apache.org/documentation/programming-guide/

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0