LoginSignup
1

More than 5 years have passed since last update.

SparkでJSONからDatasetを作成する際にスキーマを設定する

Posted at

SparkでDatasetを作成する際、JSONを読み込んでからモデル定義をcase classで指定すると、整数が標準でLongになっているといったことが原因で例外が発生することがある。

import org.apache.spark.sql.SparkSession

val ss = SparkSession.builder.getOrCreate
val rdd = sc.parallelize(Seq("""{"i":1}"""))

val ds = ss.read.json(rdd)
ds.schema
// => org.apache.spark.sql.types.StructType = StructType(StructField(i,LongType,true))

case class C(i: Int)
ds.as[C]
// => org.apache.spark.sql.AnalysisException: Cannot up cast `i` from bigint to int as it may truncate
//    The type path of the target object is:
//    - field (class: "scala.Int", name: "i")
//    - root class: "C"
//    You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;
//    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:2016)
//    ...

case class D(i: Long)
ds.as[D]
// => org.apache.spark.sql.Dataset[D] = [i: bigint]

次のように予めスキーマを設定しておくと、最初からモデル定義に従ってJSONを読み込んでくれる。

case class C(i: Int)
val schema = ScalaReflection.schemaFor[C].dataType.asInstanceOf[StructType]

val ds = ss.read.schema(schema).json(rdd)
ds.schema
// => org.apache.spark.sql.types.StructType = StructType(StructField(i,IntegerType,false))

ds.as[C]
// => org.apache.spark.sql.Dataset[C] = [i: int]

モデル定義ごとにスキーマを作るのが面倒なら、次のようなメソッドを作っておくと便利。

def readJsonAs[Model <: Product : TypeTag](rdd: RDD[String])(implicit ss: SparkSession): Dataset[Model] = {
  val schema = ScalaReflection.schemaFor[Model].dataType.asInstanceOf[StructType]
  ss.read.schema(schema).json(rdd).as[Model]
}

val ds = readJsonAs[C](rdd)

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1