環境
- Mac OS X Version 10.9.5
- Scala 2.10.4
- Spark 1.1.0
- Spark SQL 1.1.0
- sbt 0.13.1
準備
/root/to/project/path
|-- build.sbt
|-- src
| |-- main
| | |-- scala
| | | |-- Stoppable.scala
| |-- test
| | |-- resources
| | | |-- people.txt
| | |-- scala
| | | |-- SparkSQLFromFileSpec.scala
build.sbt
name := "Spark SQL From File examples"
version := "0.0.1-SNAPSHOT"
scalaVersion := "2.10.4"
scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.0",
"org.apache.spark" %% "spark-sql" % "1.1.0",
"org.specs2" %% "specs2" % "2.4.1"
)
src/test/resources/people.txt
Michael, 29
Andy, 30
Justin, 19
実装
src/test/scala/SparkSQLFromFileSpec.scala
import commons._
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.specs2._
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
class SparkSQLFromFileSpec extends Specification with Stoppable { def is = s2"""
Spark SQL From File
FromFile
read from file $readFromFile
"""
var retReadFromFile: Array[String] = _
using(new SparkContext("local[1]", "SparkSQLFromFileSpec", System.getenv("SPARK_HOME"))) { sc =>
val sqlContext = new SQLContext(sc)
import sqlContext.createSchemaRDD
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("src/test/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
retReadFromFile = teenagers.map(t => "%s:%d".format(t(0), t(1))).collect
}
def readFromFile = retReadFromFile must_== Array("Justin:19")
}
src/main/scala/Stoppable.scala
package commons
import scala.language.reflectiveCalls
trait Stoppable {
type T = { def stop(): Unit }
def using[A <: T, B](resource: A)(f: A => B) = try {
f(resource)
} finally {
resource.stop()
}
}
実行
$ sbt '~test-only SparkSQLFromFileSpec'
参考