SparkでDatasetを作成する際、JSONを読み込んでからモデル定義をcase classで指定すると、整数が標準でLongになっているといったことが原因で例外が発生することがある。
import org.apache.spark.sql.SparkSession
val ss = SparkSession.builder.getOrCreate
val rdd = sc.parallelize(Seq("""{"i":1}"""))
val ds = ss.read.json(rdd)
ds.schema
// => org.apache.spark.sql.types.StructType = StructType(StructField(i,LongType,true))
case class C(i: Int)
ds.as[C]
// => org.apache.spark.sql.AnalysisException: Cannot up cast `i` from bigint to int as it may truncate
// The type path of the target object is:
// - field (class: "scala.Int", name: "i")
// - root class: "C"
// You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
// at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2016)
// ...
case class D(i: Long)
ds.as[D]
// => org.apache.spark.sql.Dataset[D] = [i: bigint]
次のように予めスキーマを設定しておくと、最初からモデル定義に従ってJSONを読み込んでくれる。
case class C(i: Int)
val schema = ScalaReflection.schemaFor[C].dataType.asInstanceOf[StructType]
val ds = ss.read.schema(schema).json(rdd)
ds.schema
// => org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false))
ds.as[C]
// => org.apache.spark.sql.Dataset[C] = [i: int]
モデル定義ごとにスキーマを作るのが面倒なら、次のようなメソッドを作っておくと便利。
def readJsonAs[Model <: Product : TypeTag](rdd: RDD[String])(implicit ss: SparkSession): Dataset[Model] = {
val schema = ScalaReflection.schemaFor[Model].dataType.asInstanceOf[StructType]
ss.read.schema(schema).json(rdd).as[Model]
}
val ds = readJsonAs[C](rdd)