8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MicroAdAdvent Calendar 2017

Day 19

Spark StreamingのmapWithStateでステートフルな処理をする

Last updated at Posted at 2017-12-19

はじめに

この記事はMicroAd Advent Calendar 2017の19日目の記事です.

Spark Streamingで状態を扱えたら便利だと思いませんか?
ということで,mapWithStateというメソッドの紹介です.

状態を扱う3つの方法

Spark Streaming単体で状態を扱う方法は下記3つがあります.

  • window: 過去のバッチ処理結果を使って今のバッチ処理を実行する
  • updateStateByKey: バッチ処理を跨ってkey/valueの状態を管理する
  • mapWithState: updateStateByKeyの改良版. Spark 1.6から利用可能

windowは過去の処理結果も踏まえてバッチ実行します.一方,updateStateByKeymapWithStateは全てのバッチ処理に共通した状態を扱います.(共有変数として扱えます)それぞれ場面に応じて使い分けるのが良いと思いますが,ここではmapWithStateを説明します.

mapWithState

mapWithStateはメソッド名の通りで,map操作中に状態管理ができるようになっています.よく似た機能にupdateStateByKeyがありますが,基本的にmapWithStateを使えば問題ないかと思います.
では,コードを紹介しながら説明しようと思います.

まずはメインの処理.単純なSpark Streamingのサンプルです.

Driver.scala
object Driver {

  def main(args: Array[String]): Unit = {
    // Sparkの設定関連
    val sparkConf = new SparkConf().setMaster("local").setAppName("test-spark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Milliseconds(10000))

    // ストリームの作成
    val stream = ssc.socketTextStream("localhost", 7777)

    stream.map { record =>
      // 任意の処理
    }
  }.print()

  ssc.start()
  ssc.awaitTermination()
  }
}

ソケット通信で入力を受付けmap処理した後,標準出力する.というコードです.ではこのサンプルコードにmapWithStateを足してみます.
注意点として,mapWithStateを使うためにはDStreamの要素を**(KEY, VALUE)形式**にする必要があります.

Driver.scala
object Driver {

  def main(args: Array[String]): Unit = {
    ..省略..
    def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
      val sum = value.getOrElse(0L) + state.getOption.getOrElse(0L)
      state.update(sum)
      Some((key, sum))
    }

    val stateSpec = StateSpec.function(stateFunc _)

    // mapWithStateを呼ぶためには(key, value)の形式にする
    stream.map { record =>
      ("key-dayo", record.toLong)
    }.mapWithState(stateSpec)

  ssc.start()
  ssc.awaitTermination()
  }
}

mapWithStateを使って状態を管理するためにStateSpecを生成する必要があります.そして状態管理とmap処理を記述した関数stateFuncを定義し,StateSpecに渡します.

では,詳細に見てみます.

/** キー毎に合計値を計算する処理
  *
  * @param key 要素のキー
  * @param value 要素のバリュー
  * @param state keyに対応した状態
  * @return mapによる変換後の要素
  */
def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
  // value(今回の例だとストリームの入力値)とstateを足す
  // 最初は状態変数は空なので,getOrElse(0L)で0を返す
  val sum = value.getOrElse(0) + state.getOption.getOrElse(0L)
  // 状態を更新する
  state.update(sum)
  // 変換結果を返す
  Some((key, sum))
}

追加したstateFuncはキー毎に入力された値を足していく,というコード例になります.
stateFuncの変数stateは引数key毎に管理されています.stateFuncが実行されると,keyに対応したstateが自動的に渡されるようになっているので,上記のようにstate.getOption.getOrElse(0L)とするとkeyに対応した現在までの合計値を得られます.もし状態がまだ設定されていなければ0を返します.
続いて,state.update(sum)stateに要素のvalueを足し,状態を更新します.そしてキーと新しい合計値のペアを返しています.

以上が最小限の例になります.その他,状態に有効期間を設けたり,処理するパーティション数を変えたり,初期値を与えることができます.
既存のupdateStateByKeyと比較して高機能になっていることと, (僕は検証はしていないのですが)こちらの記事によるとパフォーマンスも向上しているそうです.特にupdateStateByKeyだと状態の時限削除ができませんが,こちらは出来るようになっているので,例えば「ユーザの一定時間以内のアクセス集計」のような時間を扱う集計処理で役立つのでは,と思っています.

おわりに

今回紹介したmapWithStateを紹介した日本語記事はまだ見当たらなかったので,今回この場をお借りして紹介しました.Spark Streamingで状態を扱う数少ない方法の1つですので是非活用していきたいですね!

参考文献

  • Petar Zečević Marko Bonaći, Spark in Action, Manning Publications Co.
8
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?