6
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spark 共有変数

Posted at
  • Sparkでは、各executorに定数を転送したり、各executorで集計した値をdriverで受け取ったりする機能がある。
  • SparkはScalaの関数(クロージャー)を使って処理を記述するので、関数の外側で定義した変数を関数内で使うことは出来る(ようになっている)が、実行はexecutor(分散した各ノード上)で行われるので、そこで変数に対して行われた変更はdriver側には反映されない。
  • 当然、executor間で参照し合うことも出来ない。
    そのため、driverとexecutorとの間で値を共有するための仕組みがSparkには用意されている。

ブロードキャスト変数(broadcast variables)

は、driverで定義した定数(固定値)を各executorに転送する為の変数。

  • SparkはScalaの関数(クロージャー)を使って処理を記述するので、ブロードキャストを使わなくても定数を使用することは出来るのだが。

Scalaの定数を使った例

val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = 123
val filter = rdd.filter(_ != CONSTANT)
filter.foreach(println)

Sparkのブロードキャストを使った例

val rdd = sc.makeRDD(Seq(123, 456, 789))
val CONSTANT = sc.broadcast(123)
val filter = rdd.filter(_ != CONSTANT.value)
filter.foreach(println)

Scalaの関数で定数を使用している場合、関数を転送する度に値の転送も発生することになる。
ブロードキャストを使うと、定数の内容は1度だけ各executorに転送される。

したがって、ある程度大きなサイズの定数(バイト列とかMapとか)を共有したい場合はブロードキャストを使う方が良い。
逆に小さいサイズの定数なら、ブロードキャストの方がオーバーヘッドが大きい可能性がある。

なお、ブロードキャストで転送する値は、ブロードキャスト生成後に変更してはならない。
例えば、後から新しいノードが追加になった場合に、そのノードに対してブロードキャストを転送することがある為。

アキュムレーター

  • アキュムレーター(accumulator)は、“追加”のみを行う変数。

  • driverでアキュムレーターを生成し、各executorでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。

  • アキュムレーターは、Hadoopのカウンターのようなもの。(Hadoopでは、各タスクでカウンターに値を加算していく)

  • ただし、Hadoopのカウンターの結果はアプリケーション内からは利用できない(Mapタスクで集計したカウンターをReduceタスクで読み込むことは出来ない)が、Sparkのアキュムレーターはアプリケーション内の後続処理に利用することが出来る。

  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val sum = sc.accumulator(0)
  rdd.foreach(sum += _)
  println(sum.value)
  • アキュムレーターはvalueというフィールドを持っており、アキュムレーターの+=メソッドを使うと、valueに対して“追加(加算)”が実行される。

  • アキュムレーターはデフォルトではInt・Long・Float・Doubleでしか使えないが、自分でAccumulatorParamを実装すれば、どんな型でも扱える。

  • (Int・Long・Float・Doubleに関しては、暗黙のAccumulatorParamが用意されている)

import org.apache.spark.AccumulatorParam
object StringAccumulatorParam extends AccumulatorParam[String] {

  def zero(initialValue: String): String = ""

  def addInPlace(t1: String, t2: String): String = t1 + t2
}
  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val s = sc.accumulator("0")(StringAccumulatorParam)
  rdd.foreach(s += _.toString)
  println(s.value)
  • foreachメソッドでは処理順序は保証されないので、上記の結果が「0123」になるとは限らない。「0312」とかにもなりうる。

  • AccumulatorParamのオブジェクトをimplicit objectにしておけば、accumulatorメソッドでAccumulatorParamを指定する必要が無くなる。

implicit object StringAccumulatorParam extends AccumulatorParam[String] {

  def zero(initialValue: String): String = ""

  def addInPlace(t1: String, t2: String): String = t1 + t2
}
  val rdd = sc.makeRDD(Seq(1, 2, 3))
  val s = sc.accumulator("0")
  • Accumulableはアキュムレーターの汎用版で、保持する値の型と追加する値の型が異なっていてもよい。(Accumulator[T]はAccumulable[T,T]である)
import org.apache.spark.AccumulableParam
  • Seq[String]の例
object SeqAccumulableParam extends AccumulableParam[Seq[String], String] {

  def zero(initialValue: Seq[String]): Seq[String] = Seq.empty

  def addInPlace(r1: Seq[String], r2: Seq[String]): Seq[String] = r1 ++ r2

  def addAccumulator(r: Seq[String], t: String): Seq[String] = r :+ t
}
  val rdd = sc.makeRDD(Seq("a", "b", "c"))
  val s = sc.accumulable(Seq.empty[String])(SeqAccumulableParam)
  rdd.foreach(s += _)
  println(s.value) //→List(b, c, a)
  • Map[K,V]の例
class MapAccumulableParam[K, V] extends AccumulableParam[Map[K, V], (K, V)] {

  def zero(initialValue: Map[K, V]): Map[K, V] = Map.empty

  def addInPlace(r1: Map[K, V], r2: Map[K, V]): Map[K, V] = r1 ++ r2

  def addAccumulator(r: Map[K, V], t: (K, V)): Map[K, V] = r + t
}
  val rdd = sc.makeRDD(Seq("a", "bc", "def"))
  val m = sc.accumulable(Map.empty[String, Int])(new MapAccumulableParam[String, Int])
  rdd.foreach(key => m += (key, key.length))
  println(m.value) //→Map(a -> 1, bc -> 2, def -> 3)
  • Mapでカウントする例
object MapCounterAccumulableParam extends AccumulableParam[scala.collection.mutable.Map[String, Int], String] {

  def zero(initialValue: scala.collection.mutable.Map[String, Int]) = scala.collection.mutable.Map.empty[String, Int]

  def addInPlace(r1: scala.collection.mutable.Map[String, Int], r2: scala.collection.mutable.Map[String, Int]): scala.collection.mutable.Map[String, Int] = {
    r2.foreach{ kv => add(r1, kv._1, kv._2) }
    r1
  }

  def addAccumulator(r: scala.collection.mutable.Map[String, Int], t: String): scala.collection.mutable.Map[String, Int] = {
    add(r, t, 1)
    r
  }

  private def add(r: scala.collection.mutable.Map[String, Int], key: String, value: Int): Unit = {
    r.put(key, r.getOrElse(key, 0) + value)
  }
}
  val rdd = sc.makeRDD(Seq("a", "b", "c", "a", "b", "a"))
  val m = sc.accumulable(scala.collection.mutable.Map.empty[String, Int])(MapCounterAccumulableParam)
  rdd.foreach(m += _)
  println(m.value) //→Map(b -> 2, a -> 3, c -> 1)
  • ArrayBuffer[String]の例
    Growableトレイトをミックスインしているコレクション(mutableのArrayBufferやListBuffer)を使う場合はaccumulableCollectionメソッドが便利。
  val rdd = sc.makeRDD(Seq("a", "b", "c"))
  val ac = sc.accumulableCollection(scala.collection.mutable.ArrayBuffer.empty[String])
  rdd.foreach(ac += _)
  println(ac.value) //→ArrayBuffer(a, b, c)
6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?