LoginSignup
7
5

More than 5 years have passed since last update.

Sparkでメソッド適用をmapするとTask not Serializableが発生する

Last updated at Posted at 2017-01-13

問題

  • Apache Spark でクラスに定義されたメソッドを map しようとすると Task not serializable が発生する
$ spark-shell
scala> import org.apache.spark.sql.SparkSession
scala> val ss = SparkSession.builder.getOrCreate

scala> val ds = ss.createDataset(Seq(1, 2, 3))

scala> :paste

class C {
  def square(i: Int): Int = i * i
}

scala> val c = new C()

scala> ds.map(c.square).show
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
  ... 48 elided
Caused by: java.io.NotSerializableException: C
Serialization stack:
    - object not serializable (class: C, value: C@440da0cb)
    - field (class: $iw, name: c, type: class C)
    - object (class $iw, $iw@63fa7459)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3a04aa02)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4dd42f69)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6b46d353)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@61716ddf)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6335b60)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3ed2f699)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@12f190db)
    - field (class: $line60.$read, name: $iw, type: class $iw)
    - object (class $line60.$read, $line60.$read@3508cc94)
    - field (class: $iw, name: $line60$read, type: class $line60.$read)
    - object (class $iw, $iw@38360fc3)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@5681c40a)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.Literal, name: value, type: class java.lang.Object)
    - object (class org.apache.spark.sql.catalyst.expressions.Literal, <function1>)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 81 more
  • ちなみに class ではなく object でシングルトンにした場合は発生しなかった
scala> :paste

object O {
  def square(i: Int): Int = i * i
}

scala> ds.map(O.square).show
+-----+
|value|
+-----+
|    1|
|    4|
|    9|
+-----+

原因

  • メソッド適用がクロージャ化される際に、メソッドが所属するクラスごとクロージャが作られる
  • 分散処理のため、クロージャはシリアライズされて各計算ノードに送られる
  • クラスはそのままではシリアライズできないためエラーが発生する
Caused by: java.io.NotSerializableException: C
Serialization stack:
    - object not serializable (class: C, value: C@440da0cb)
    - field (class: $iw, name: c, type: class C)

解決策

  • 次のいずれかで解決できる

A. クラスに java.io.Serializable を継承させる

scala> :paste

class C extends java.io.Serializable {
  def square(i: Int): Int = i * i
}

scala> val c = new C()

scala> ds.map(c.square).show
+-----+
|value|
+-----+
|    1|
|    4|
|    9|
+-----+
  • object を使うとエラーにならなかったので object はもともとSerializableなのかもしれない

B. 処理をメソッドではなく関数として定義する

scala> :paste

class C extends java.io.Serializable {
  val square: Int => Int = i => i * i
}

scala> val c = new C()

scala> ds.map(c.square).show
+-----+
|value|
+-----+
|    1|
|    4|
|    9|
+-----+
  • 関数内部でCの別のメンバを参照してたりすると無理な気もする

参考

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5