環境
- Mac OS X Version 10.9.5
- Scala 2.10.4
- Spark 1.1.0
- Spark SQL 1.1.0
- sbt 0.13.1
準備
/root/to/project/path
|-- build.sbt
|-- src
| |-- main
| | |-- scala
| | | |-- Stoppable.scala
| |-- test
| | |-- scala
| | | |-- SparkSQLFromJSONSpec.scala
build.sbt
name := "Spark SQL examples"
version := "0.0.1-SNAPSHOT"
scalaVersion := "2.10.4"
scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.0",
"org.apache.spark" %% "spark-sql" % "1.1.0",
"org.specs2" %% "specs2" % "2.4.1"
)
実装
src/test/scala/SparkSQLFromJSONSpec.scala
import commons._
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.specs2._
class SparkSQLFromJSONSpec extends Specification with Stoppable { def is = s2"""
SparkSQL From JSON
From JSON
read from json $readFromJSON
"""
val json = Seq(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"},"datetime":"2014-08-12 19:07:43"}""",
"""{"name":"Mary","address":{"city":"Manhattan","state":"NewYork"},"datetime":"2014-08-14 19:07:43"}""",
"""{"name":"Mike","address":{"city":"Los Angeles","state":"California"},"datetime":"2014-08-11 19:07:43"}"""
)
var retReadFromJSON: Seq[String] = _
using(new SparkContext("local[1]", "SparkSQLFromJSONSpec", System.getenv("SPARK_HOME"))) { sc =>
val sqlContext = new SQLContext(sc)
val rdd = sc.parallelize(json)
val rddSql = sqlContext.jsonRDD(rdd)
rddSql.registerTempTable("people")
val ret = sqlContext.sql(
"SELECT name, address.city FROM people WHERE name IN ('Yin', 'Mike') AND datetime < '2014-08-14 00:00:00'")
retReadFromJSON = ret.collect.map { row =>
"%s, %s".format(row.getString(0), row.getString(1))
}
}
def readFromJSON = retReadFromJSON must_== Seq(
"Yin, Columbus",
"Mike, Los Angeles"
)
}
src/main/scala/Stoppable.scala
package commons
import scala.language.reflectiveCalls
trait Stoppable {
type T = { def stop(): Unit }
def using[A <: T, B](resource: A)(f: A => B) = try {
f(resource)
} finally {
resource.stop()
}
}
実行
$ sbt '~test-only SparkSQLFromJSONSpec'
参考