Scala
Spark
MicroAdDay 19

Spark StreamingのmapWithStateでステートフルな処理をする

はじめに

この記事はMicroAd Advent Calendar 2017の19日目の記事です.

Spark Streamingで状態を扱えたら便利だと思いませんか?
ということで,mapWithStateというメソッドの紹介です.

状態を扱う3つの方法

Spark Streaming単体で状態を扱う方法は下記3つがあります.

  • window: 過去のバッチ処理結果を使って今のバッチ処理を実行する
  • updateStateByKey: バッチ処理を跨ってkey/valueの状態を管理する
  • mapWithState: updateStateByKeyの改良版. Spark 1.6から利用可能

windowは過去の処理結果も踏まえてバッチ実行します.一方,updateStateByKeymapWithStateは全てのバッチ処理に共通した状態を扱います.(共有変数として扱えます)それぞれ場面に応じて使い分けるのが良いと思いますが,ここではmapWithStateを説明します.

mapWithState

mapWithStateはメソッド名の通りで,map操作中に状態管理ができるようになっています.よく似た機能にupdateStateByKeyがありますが,基本的にmapWithStateを使えば問題ないかと思います.
では,コードを紹介しながら説明しようと思います.

まずはメインの処理.単純なSpark Streamingのサンプルです.

Driver.scala
object Driver {

  def main(args: Array[String]): Unit = {
    // Sparkの設定関連
    val sparkConf = new SparkConf().setMaster("local").setAppName("test-spark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Milliseconds(10000))

    // ストリームの作成
    val stream = ssc.socketTextStream("localhost", 7777)

    stream.map { record =>
      // 任意の処理
    }
  }.print()

  ssc.start()
  ssc.awaitTermination()
  }
}

ソケット通信で入力を受付けmap処理した後,標準出力する.というコードです.ではこのサンプルコードにmapWithStateを足してみます.
注意点として,mapWithStateを使うためにはDStreamの要素を(KEY, VALUE)形式にする必要があります.

Driver.scala
object Driver {

  def main(args: Array[String]): Unit = {
    ..省略..
    def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
      val sum = value.getOrElse(0L) + state.getOption.getOrElse(0L)
      state.update(sum)
      Some((key, sum))
    }

    val stateSpec = StateSpec.function(stateFunc _)

    // mapWithStateを呼ぶためには(key, value)の形式にする
    stream.map { record =>
      ("key-dayo", record.toLong)
    }.mapWithState(stateSpec)

  ssc.start()
  ssc.awaitTermination()
  }
}

mapWithStateを使って状態を管理するためにStateSpecを生成する必要があります.そして状態管理とmap処理を記述した関数stateFuncを定義し,StateSpecに渡します.

では,詳細に見てみます.

/** キー毎に合計値を計算する処理
  *
  * @param key 要素のキー
  * @param value 要素のバリュー
  * @param state keyに対応した状態
  * @return mapによる変換後の要素
  */
def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
  // value(今回の例だとストリームの入力値)とstateを足す
  // 最初は状態変数は空なので,getOrElse(0L)で0を返す
  val sum = value.getOrElse(0) + state.getOption.getOrElse(0L)
  // 状態を更新する
  state.update(sum)
  // 変換結果を返す
  Some((key, sum))
}

追加したstateFuncはキー毎に入力された値を足していく,というコード例になります.
stateFuncの変数stateは引数key毎に管理されています.stateFuncが実行されると,keyに対応したstateが自動的に渡されるようになっているので,上記のようにstate.getOption.getOrElse(0L)とするとkeyに対応した現在までの合計値を得られます.もし状態がまだ設定されていなければ0を返します.
続いて,state.update(sum)stateに要素のvalueを足し,状態を更新します.そしてキーと新しい合計値のペアを返しています.

以上が最小限の例になります.その他,状態に有効期間を設けたり,処理するパーティション数を変えたり,初期値を与えることができます.
既存のupdateStateByKeyと比較して高機能になっていることと, (僕は検証はしていないのですが)こちらの記事によるとパフォーマンスも向上しているそうです.特にupdateStateByKeyだと状態の時限削除ができませんが,こちらは出来るようになっているので,例えば「ユーザの一定時間以内のアクセス集計」のような時間を扱う集計処理で役立つのでは,と思っています.

おわりに

今回紹介したmapWithStateを紹介した日本語記事はまだ見当たらなかったので,今回この場をお借りして紹介しました.Spark Streamingで状態を扱う数少ない方法の1つですので是非活用していきたいですね!

参考文献

  • Petar Zečević Marko Bonaći, Spark in Action, Manning Publications Co.