Scala
Spark
MicroAdDay 19

Spark StreamingのmapWithStateでステートフルな処理をする

はじめに

この記事はMicroAd Advent Calendar 2017の19日目の記事です.

Spark Streamingで状態を扱いたい!と思った皆さんに向けて書こうと思います.
ということで,mapWithStateというメソッドの紹介です.

Spark Streamingとは?(読み飛ばして大丈夫です)

Spark Streamingでは時間区切りのコレクション(RDDやDStream)を操作してストリーム処理を記述できます.例えば,Scalaで一般的に使用されるmapflatMapfilterなどのコレクションAPIのようなメソッドを利用し,コーディングができるようになっています.
これらのメソッドを用いた操作から分かるようにSpark Streamingでは基本的に状態を扱いません.そのため,関数型的なプログラミングが可能になります.
とは言え,データに通し番号をつけたいとか,レコード数をカウントしたいとか,ストリーム処理をする以上,何かと状態を扱いたい時はあるかと思います.

そんな時にSpark Streamingでは状態を扱う方法を提供してくれています.そんな機能を紹介するお話です.

状態を扱う3つの方法

  • window: 過去のバッチ処理結果を使って今のバッチ処理を実行する
  • updateStateByKey: バッチ処理を跨ってkey/valueの状態を管理する
  • mapWithState: updateStateByKeyの改良版. Spark 1.6から利用可能

windowは過去の処理結果も踏まえてバッチ実行します.一方,updateStateByKeymapWithStateは全ての処理に共通した状態を扱います.つまり変数みたいに使えます.ここではmapWithStateを説明します.

mapWithState

mapWithStateは状態をkey/valueペアで管理します.メソッド名の通りで,map操作に状態が加わった感じです.
コードを紹介しながら説明しようと思います.

  • まずはメインの処理.変哲の無いSpark Streamingのサンプルです.
Driver.scala
object Driver {

  def main(args: Array[String]): Unit = {
    // Sparkの設定関連
    val sparkConf = new SparkConf().setMaster("local").setAppName("test-spark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Milliseconds(10000))

    // ストリームの作成
    val stream = ssc.socketTextStream("localhost", 7777)

    stream.map { record =>
      //色々処理を書く場所
    }
  }.print()

  ssc.start()
  ssc.awaitTermination()
  }
}
  • このサンプルコードにmapWithStateを足してみます.
  • mapWithStateを使うためにはRDDを(KEY, VALUE)形式にする必要があります.
Driver.scala
object Driver {

  def main(args: Array[String]): Unit = {
    ..省略..
    def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
      val sum = value.getOrElse(0L) + state.getOption.getOrElse(0L)
      state.update(sum)
      Some((key, sum))
    }

    val stateSpec = StateSpec.function(stateFunc _)

    // mapWithStateを呼ぶためには(key, value)の形式にする
    stream.map { record =>
      ("key-dayo", record.toLong)
    }.mapWithState(stateSpec)

  ssc.start()
  ssc.awaitTermination()
  }
}
  • mapWithStateの引数にstateSpecを渡しています.
  • 状態を扱うためには状態関数を定義する必要があって,そのファクトリがStateSpecです.
  • 肝心なのがstateFunc関数です.引数stateが引数keyに対応した値を持っています
  • 詳細に見てみます
// key => RDDのキー
// value => RDDのバリュー
// state => keyに対応した状態を持つ
def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
  // RDDのバリュー(今回の例だとストリームの入力値)と状態変数の値を足す
  // 最初は状態変数は空なはずなので,getOrElse(0L)で0を返す
  val sum = value.getOrElse(0) + state.getOption.getOrElse(0L)
  // 状態を更新する
  state.update(sum)
  // 変換結果を返す
  Some((key, sum))
}
  • 重要なのはstate変数です.これは引数key毎に値が入ります.
  • state.update等の状態を変更するメソッドを用いて更新したり削除ができます,
  • stateを利用するなどして,最終的な計算結果を返しましょう.(mapの変換後の結果)

と・・こんな感じで使うことが出来ます.これは最小限の例ですが,状態に有効期間を設けたり,処理するパーティション数を変えたり,初期値を与えたり色々できます.
非常に便利なのと,既存のupdateStateByKeyと比較してパフォーマンスに優れているそうです.特にupdateStateByKeyだと状態の時限削除ができませんが,こちらは出来るようになっているので,メモリ溢れとかを心配せずに済むようになっています.

ぜひご活用下さい!

おわりに

今回紹介したmapWithStateは結構認知度が低いような気がします.たまたまSafariBooksOnlineで本を読んでたら見つけました.ただ,Spark Streamingで状態を扱う数少ない方法の1つですので是非活用していきたいですね!