- Sparkでは、各executorに定数を転送したり、各executorで集計した値をdriverで受け取ったりする機能がある。
- SparkはScalaの関数(クロージャー)を使って処理を記述するので、関数の外側で定義した変数を関数内で使うことは出来る(ようになっている)が、実行はexecutor(分散した各ノード上)で行われるので、そこで変数に対して行われた変更はdriver側には反映されない。
- 当然、executor間で参照し合うことも出来ない。
そのため、driverとexecutorとの間で値を共有するための仕組みがSparkには用意されている。
ブロードキャスト変数(broadcast variables)
は、driverで定義した定数(固定値)を各executorに転送する為の変数。
- SparkはScalaの関数(クロージャー)を使って処理を記述するので、ブロードキャストを使わなくても定数を使用することは出来るのだが。
Scalaの定数を使った例
val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = 123
val filter = rdd.filter(_ != CONSTANT)
filter.foreach(println)
Sparkのブロードキャストを使った例
val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = sc.broadcast(123)
val filter = rdd.filter(_ != CONSTANT.value)
filter.foreach(println)
Scalaの関数で定数を使用している場合、関数を転送する度に値の転送も発生することになる。
ブロードキャストを使うと、定数の内容は1度だけ各executorに転送される。
したがって、ある程度大きなサイズの定数(バイト列とかMapとか)を共有したい場合はブロードキャストを使う方が良い。
逆に小さいサイズの定数なら、ブロードキャストの方がオーバーヘッドが大きい可能性がある。
なお、ブロードキャストで転送する値は、ブロードキャスト生成後に変更してはならない。
例えば、後から新しいノードが追加になった場合に、そのノードに対してブロードキャストを転送することがある為。
アキュムレーター
-
アキュムレーター(accumulator)は、“追加”のみを行う変数。
-
driverでアキュムレーターを生成し、各executorでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。
-
アキュムレーターは、Hadoopのカウンターのようなもの。(Hadoopでは、各タスクでカウンターに値を加算していく)
-
ただし、Hadoopのカウンターの結果はアプリケーション内からは利用できない(Mapタスクで集計したカウンターをReduceタスクで読み込むことは出来ない)が、Sparkのアキュムレーターはアプリケーション内の後続処理に利用することが出来る。
val rdd = sc.makeRDD(Seq(1, 2, 3))
val sum = sc.accumulator(0)
rdd.foreach(sum += _)
println(sum.value)
-
アキュムレーターはvalueというフィールドを持っており、アキュムレーターの+=メソッドを使うと、valueに対して“追加(加算)”が実行される。
-
アキュムレーターはデフォルトではInt・Long・Float・Doubleでしか使えないが、自分でAccumulatorParamを実装すれば、どんな型でも扱える。
-
(Int・Long・Float・Doubleに関しては、暗黙のAccumulatorParamが用意されている)
import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {
def zero(initialValue: String): String = ""
def addInPlace(t1: String, t2: String): String = t1 + t2
}
val rdd = sc.makeRDD(Seq(1, 2, 3))
val s = sc.accumulator("0")(StringAccumulatorParam)
rdd.foreach(s += _.toString)
println(s.value)
-
foreachメソッドでは処理順序は保証されないので、上記の結果が「0123」になるとは限らない。「0312」とかにもなりうる。
-
AccumulatorParamのオブジェクトをimplicit objectにしておけば、accumulatorメソッドでAccumulatorParamを指定する必要が無くなる。
implicit object StringAccumulatorParam extends AccumulatorParam[String] {
def zero(initialValue: String): String = ""
def addInPlace(t1: String, t2: String): String = t1 + t2
}
val rdd = sc.makeRDD(Seq(1, 2, 3))
val s = sc.accumulator("0")
- Accumulableはアキュムレーターの汎用版で、保持する値の型と追加する値の型が異なっていてもよい。(Accumulator[T]はAccumulable[T,T]である)
import org.apache.spark.AccumulableParam
- Seq[String]の例
object SeqAccumulableParam extends AccumulableParam[Seq[String], String] {
def zero(initialValue: Seq[String]): Seq[String] = Seq.empty
def addInPlace(r1: Seq[String], r2: Seq[String]): Seq[String] = r1 ++ r2
def addAccumulator(r: Seq[String], t: String): Seq[String] = r :+ t
}
val rdd = sc.makeRDD(Seq("a", "b", "c"))
val s = sc.accumulable(Seq.empty[String])(SeqAccumulableParam)
rdd.foreach(s += _)
println(s.value) //→List(b, c, a)
- Map[K,V]の例
class MapAccumulableParam[K, V] extends AccumulableParam[Map[K, V], (K, V)] {
def zero(initialValue: Map[K, V]): Map[K, V] = Map.empty
def addInPlace(r1: Map[K, V], r2: Map[K, V]): Map[K, V] = r1 ++ r2
def addAccumulator(r: Map[K, V], t: (K, V)): Map[K, V] = r + t
}
val rdd = sc.makeRDD(Seq("a", "bc", "def"))
val m = sc.accumulable(Map.empty[String, Int])(new MapAccumulableParam[String, Int])
rdd.foreach(key => m += (key, key.length))
println(m.value) //→Map(a -> 1, bc -> 2, def -> 3)
- Mapでカウントする例
object MapCounterAccumulableParam extends AccumulableParam[scala.collection.mutable.Map[String, Int], String] {
def zero(initialValue: scala.collection.mutable.Map[String, Int]) = scala.collection.mutable.Map.empty[String, Int]
def addInPlace(r1: scala.collection.mutable.Map[String, Int], r2: scala.collection.mutable.Map[String, Int]): scala.collection.mutable.Map[String, Int] = {
r2.foreach{ kv => add(r1, kv._1, kv._2) }
r1
}
def addAccumulator(r: scala.collection.mutable.Map[String, Int], t: String): scala.collection.mutable.Map[String, Int] = {
add(r, t, 1)
r
}
private def add(r: scala.collection.mutable.Map[String, Int], key: String, value: Int): Unit = {
r.put(key, r.getOrElse(key, 0) + value)
}
}
val rdd = sc.makeRDD(Seq("a", "b", "c", "a", "b", "a"))
val m = sc.accumulable(scala.collection.mutable.Map.empty[String, Int])(MapCounterAccumulableParam)
rdd.foreach(m += _)
println(m.value) //→Map(b -> 2, a -> 3, c -> 1)
- ArrayBuffer[String]の例
Growableトレイトをミックスインしているコレクション(mutableのArrayBufferやListBuffer)を使う場合はaccumulableCollectionメソッドが便利。
val rdd = sc.makeRDD(Seq("a", "b", "c"))
val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String])
rdd.foreach(ac += _)
println(ac.value) //→ArrayBuffer(a, b, c)