Edited at

SparkSQLDataframe(v1.5.0/1.5.1)バグ回避策

More than 3 years have passed since last update.


spark 1.5.1 のDataframe注意点


間違った結果になるケース

val eventTableColumns = Seq[String](

"entityType"
, "entityId"
, "targetEntityType"
, "targetEntityId"
, "properties"
, "eventTime")

val eventDF = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF(eventTableColumns:_*)

eventDF.filter($"entityType" === "user").select("entityId").distinct.count


回避策①


  • === の代わりに isin を使う

eventDF.filter($"entityType" isin lit("user")).select("entityId").distinct.count


回避策②


  • case class を使う

case class Event(entityType: String, entityId: String,

targetEntityType: String, targetEntityId: String,
properties: String, eventTime: String)

val eventDF2 = sc.textFile("/tmp/events_s.csv").map(_.split(",")).filter(_.size >= 6)
.map { e =>
Event(
e(5), e(0), "item", e(1), s"""{"rating": ${e(3).trim.toDouble}}""", e(3)
)
}.toDF()

eventDF2.filter($"entityType" === "user").select("entityId").distinct.count


回避策③


  • spark.sql.inMemoryColumnarStorage.partitionPruning を false にする

    sqlContext.sql("SET spark.sql.inMemoryColumnarStorage.partitionPruning=false")

eventDF.filter($"entityType" === "user").select("entityId").distinct.count


詳しいサンプルコードはこちら

https://gist.github.com/schon/b49a616eeb0b78fb5f47