はじめに
この記事はMicroAd Advent Calendar 2017の19日目の記事です.
Spark Streamingで状態を扱えたら便利だと思いませんか?
ということで,mapWithState
というメソッドの紹介です.
状態を扱う3つの方法
Spark Streaming単体で状態を扱う方法は下記3つがあります.
- window: 過去のバッチ処理結果を使って今のバッチ処理を実行する
- updateStateByKey: バッチ処理を跨ってkey/valueの状態を管理する
- mapWithState: updateStateByKeyの改良版. Spark 1.6から利用可能
window
は過去の処理結果も踏まえてバッチ実行します.一方,updateStateByKey
,mapWithState
は全てのバッチ処理に共通した状態を扱います.(共有変数として扱えます)それぞれ場面に応じて使い分けるのが良いと思いますが,ここではmapWithState
を説明します.
mapWithState
mapWithState
はメソッド名の通りで,map
操作中に状態管理ができるようになっています.よく似た機能にupdateStateByKey
がありますが,基本的にmapWithState
を使えば問題ないかと思います.
では,コードを紹介しながら説明しようと思います.
まずはメインの処理.単純なSpark Streamingのサンプルです.
object Driver {
def main(args: Array[String]): Unit = {
// Sparkの設定関連
val sparkConf = new SparkConf().setMaster("local").setAppName("test-spark")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Milliseconds(10000))
// ストリームの作成
val stream = ssc.socketTextStream("localhost", 7777)
stream.map { record =>
// 任意の処理
}
}.print()
ssc.start()
ssc.awaitTermination()
}
}
ソケット通信で入力を受付けmap処理した後,標準出力する.というコードです.ではこのサンプルコードにmapWithState
を足してみます.
注意点として,mapWithState
を使うためにはDStream
の要素を**(KEY, VALUE)形式**にする必要があります.
object Driver {
def main(args: Array[String]): Unit = {
..省略..
def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0L) + state.getOption.getOrElse(0L)
state.update(sum)
Some((key, sum))
}
val stateSpec = StateSpec.function(stateFunc _)
// mapWithStateを呼ぶためには(key, value)の形式にする
stream.map { record =>
("key-dayo", record.toLong)
}.mapWithState(stateSpec)
ssc.start()
ssc.awaitTermination()
}
}
mapWithState
を使って状態を管理するためにStateSpec
を生成する必要があります.そして状態管理とmap処理を記述した関数stateFunc
を定義し,StateSpec
に渡します.
では,詳細に見てみます.
/** キー毎に合計値を計算する処理
*
* @param key 要素のキー
* @param value 要素のバリュー
* @param state keyに対応した状態
* @return mapによる変換後の要素
*/
def stateFunc(key: String, value: Option[Long], state: State[Long]): Option[(String, Long)] = {
// value(今回の例だとストリームの入力値)とstateを足す
// 最初は状態変数は空なので,getOrElse(0L)で0を返す
val sum = value.getOrElse(0) + state.getOption.getOrElse(0L)
// 状態を更新する
state.update(sum)
// 変換結果を返す
Some((key, sum))
}
追加したstateFunc
はキー毎に入力された値を足していく,というコード例になります.
stateFunc
の変数state
は引数key
毎に管理されています.stateFunc
が実行されると,key
に対応したstate
が自動的に渡されるようになっているので,上記のようにstate.getOption.getOrElse(0L)
とするとkey
に対応した現在までの合計値を得られます.もし状態がまだ設定されていなければ0を返します.
続いて,state.update(sum)
でstate
に要素のvalue
を足し,状態を更新します.そしてキーと新しい合計値のペアを返しています.
以上が最小限の例になります.その他,状態に有効期間を設けたり,処理するパーティション数を変えたり,初期値を与えることができます.
既存のupdateStateByKey
と比較して高機能になっていることと, (僕は検証はしていないのですが)こちらの記事によるとパフォーマンスも向上しているそうです.特にupdateStateByKey
だと状態の時限削除ができませんが,こちらは出来るようになっているので,例えば「ユーザの一定時間以内のアクセス集計」のような時間を扱う集計処理で役立つのでは,と思っています.
おわりに
今回紹介したmapWithState
を紹介した日本語記事はまだ見当たらなかったので,今回この場をお借りして紹介しました.Spark Streamingで状態を扱う数少ない方法の1つですので是非活用していきたいですね!
参考文献
- Petar Zečević Marko Bonaći, Spark in Action, Manning Publications Co.