LoginSignup
3
3

More than 5 years have passed since last update.

SparkSQLでクエリ検索してみる。(テキストファイル入力編)

Last updated at Posted at 2014-10-15

環境

  • Mac OS X Version 10.9.5
  • Scala 2.10.4
  • Spark 1.1.0
  • Spark SQL 1.1.0
  • sbt 0.13.1

準備

/root/to/project/path
   |-- build.sbt
   |-- src
   |    |-- main
   |    |    |-- scala
   |    |    |    |-- Stoppable.scala
   |    |-- test
   |    |    |-- resources
   |    |    |    |-- people.txt
   |    |    |-- scala
   |    |    |    |-- SparkSQLFromFileSpec.scala
build.sbt
name := "Spark SQL From File examples"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0",
  "org.apache.spark" %% "spark-sql" % "1.1.0",
  "org.specs2" %% "specs2" % "2.4.1"
)
src/test/resources/people.txt
Michael, 29
Andy, 30
Justin, 19

実装

src/test/scala/SparkSQLFromFileSpec.scala
import commons._

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._

import org.specs2._

// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

class SparkSQLFromFileSpec extends Specification with Stoppable { def is = s2"""

  Spark SQL From File

  FromFile
    read from file                          $readFromFile
  """

  var retReadFromFile: Array[String] = _ 
  using(new SparkContext("local[1]", "SparkSQLFromFileSpec", System.getenv("SPARK_HOME"))) { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.createSchemaRDD

    // Create an RDD of Person objects and register it as a table.
    val people = sc.textFile("src/test/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
    people.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

    // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    retReadFromFile = teenagers.map(t => "%s:%d".format(t(0), t(1))).collect  
  }
  def readFromFile = retReadFromFile must_== Array("Justin:19")
}
src/main/scala/Stoppable.scala
package commons                                                                                                                                                                   

import scala.language.reflectiveCalls

trait Stoppable {
  type T = { def stop(): Unit }
  def using[A <: T, B](resource: A)(f: A => B) = try {
    f(resource)
  } finally {
    resource.stop()
  }
}

実行

$ sbt '~test-only SparkSQLFromFileSpec'

参考

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3