Edited at

SparkSQLでクエリ検索してみる。(JSON入力編)

More than 5 years have passed since last update.


環境


  • Mac OS X Version 10.9.5

  • Scala 2.10.4

  • Spark 1.1.0

  • Spark SQL 1.1.0

  • sbt 0.13.1


準備

/root/to/project/path

|-- build.sbt
|-- src
| |-- main
| | |-- scala
| | | |-- Stoppable.scala
| |-- test
| | |-- scala
| | | |-- SparkSQLFromJSONSpec.scala


build.sbt

name := "Spark SQL examples"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.0",
"org.apache.spark" %% "spark-sql" % "1.1.0",
"org.specs2" %% "specs2" % "2.4.1"
)



実装


src/test/scala/SparkSQLFromJSONSpec.scala

import commons._

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._

import org.specs2._

class SparkSQLFromJSONSpec extends Specification with Stoppable { def is = s2"""

SparkSQL From JSON

From JSON
read from json $readFromJSON

"""

val json = Seq(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"},"datetime":"2014-08-12 19:07:43"}""",
"""{"name":"Mary","address":{"city":"Manhattan","state":"NewYork"},"datetime":"2014-08-14 19:07:43"}""",
"""{"name":"Mike","address":{"city":"Los Angeles","state":"California"},"datetime":"2014-08-11 19:07:43"}"""
)
var retReadFromJSON: Seq[String] = _
using(new SparkContext("local[1]", "SparkSQLFromJSONSpec", System.getenv("SPARK_HOME"))) { sc =>
val sqlContext = new SQLContext(sc)
val rdd = sc.parallelize(json)
val rddSql = sqlContext.jsonRDD(rdd)
rddSql.registerTempTable("people")
val ret = sqlContext.sql(
"SELECT name, address.city FROM people WHERE name IN ('Yin', 'Mike') AND datetime < '2014-08-14 00:00:00'")
retReadFromJSON = ret.collect.map { row =>
"%s, %s".format(row.getString(0), row.getString(1))
}
}
def readFromJSON = retReadFromJSON must_== Seq(
"Yin, Columbus",
"Mike, Los Angeles"
)
}



src/main/scala/Stoppable.scala

package commons                                                                                                                                                                   

import scala.language.reflectiveCalls

trait Stoppable {
type T = { def stop(): Unit }
def using[A <: T, B](resource: A)(f: A => B) = try {
f(resource)
} finally {
resource.stop()
}
}



実行

$ sbt '~test-only SparkSQLFromJSONSpec'


参考

http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets