Help us understand the problem. What is going on with this article?

SparkSQLでクエリ検索してみる。(テキストファイル入力編)

More than 5 years have passed since last update.

環境

  • Mac OS X Version 10.9.5
  • Scala 2.10.4
  • Spark 1.1.0
  • Spark SQL 1.1.0
  • sbt 0.13.1

準備

/root/to/project/path
   |-- build.sbt
   |-- src
   |    |-- main
   |    |    |-- scala
   |    |    |    |-- Stoppable.scala
   |    |-- test
   |    |    |-- resources
   |    |    |    |-- people.txt
   |    |    |-- scala
   |    |    |    |-- SparkSQLFromFileSpec.scala
build.sbt
name := "Spark SQL From File examples"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature", "-Xelide-below", "ALL")

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.1.0",
  "org.apache.spark" %% "spark-sql" % "1.1.0",
  "org.specs2" %% "specs2" % "2.4.1"
)
src/test/resources/people.txt
Michael, 29
Andy, 30
Justin, 19

実装

src/test/scala/SparkSQLFromFileSpec.scala
import commons._

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._

import org.specs2._

// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

class SparkSQLFromFileSpec extends Specification with Stoppable { def is = s2"""

  Spark SQL From File

  FromFile
    read from file                          $readFromFile
  """

  var retReadFromFile: Array[String] = _ 
  using(new SparkContext("local[1]", "SparkSQLFromFileSpec", System.getenv("SPARK_HOME"))) { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.createSchemaRDD

    // Create an RDD of Person objects and register it as a table.
    val people = sc.textFile("src/test/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
    people.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

    // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by ordinal.
    retReadFromFile = teenagers.map(t => "%s:%d".format(t(0), t(1))).collect  
  }
  def readFromFile = retReadFromFile must_== Array("Justin:19")
}
src/main/scala/Stoppable.scala
package commons                                                                                                                                                                   

import scala.language.reflectiveCalls

trait Stoppable {
  type T = { def stop(): Unit }
  def using[A <: T, B](resource: A)(f: A => B) = try {
    f(resource)
  } finally {
    resource.stop()
  }
}

実行

$ sbt '~test-only SparkSQLFromFileSpec'

参考

http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#inferring-the-schema-using-reflection

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away