2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spark SQL実行メモ

Last updated at Posted at 2016-02-12

spark-shellからバッチに変えた際に少しハマったので、その時のメモ。

事前準備

環境

Proxy環境下にあるPCでVirtualBOXを利用して作成した仮想マシン
HadoopはCDH5.5.1を利用。擬似分散環境。
SparkもCDHのものを利用。インストール方法は以下を参照。
http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_spark_install.html

Scala

CDHのSparkがscala 2.10.4でコンパイルされたもののようなので、同じバージョンのScalaをインストールする。以下からダウンロードし、PATHを通しておく。
※バージョンを合わせないとコンパイル時にエラーが発生
http://www.scala-lang.org/download/2.10.4.html

sbt

以下からダウンロードし、解凍後、sbt-launch.jarを~/bin配下に配置する。
http://www.scala-sbt.org/download.html

~bin/sbtを作成

SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $JAVA_OPTS $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

JAVA_OPTS環境変数にProxy情報を設定

JAVA_OPTS= -Dhttp.proxyHost={Proxyサーバ名} -Dhttp.proxyPort={Port番号} -Dhttps.proxyHost={Proxyサーバ名} -Dhttps.proxyPort={Port番号}

sbt-assemblyプラグインを利用するために、project/plugins.sbtに以下を記載

plugins.sbt
resolvers += "Bintray sbt plugin releases" at "http://dl.bintray.com/sbt/sbt-plugin-releases/"

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

以下のディレクトリを作成。ソースはsrc/main/scala配下に配置。

src/main/scala/
src/test/scala/
lib
project
target

SparkSQLサンプルプログラムコンパイル及び実行

test.scala

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame

object Test{
  case class TUser(tId: String, attribute: String)
  case class NUser(nId: String)

  def exec1(dirPath:String, sc: SparkContext,sqlContext: SQLContext, nDF:DataFrame){
    try {
      val filePath = dirPath+"t.txt"
      import sqlContext.implicits._
      val tDF = sc.textFile(filePath).map { record =>
        val splitRecord = record.split(",")
        val tId = splitRecord(0)
        val attribute = splitRecord(1)
        TUser(tId,attribute)
      }.toDF

      val tuser = tDF.distinct().count()
      printf("The number of user is %s \n",tuser)

      val nuser = nDF.distinct().count()
      printf("The number of nuser is %s \n",nuser)

      val tnDF = tDF.join(nDF,tDF("tId") === nDF("nId"),"inner").select($"tId")
      val numtnuser = tnDF.distinct().count()
      printf("The number of tnuser is %s \n",numtnuser)
    }
  }

  def main(args: Array[String]){

    require(args.length >=1, "Pls specify path")
    val dirPath = args(0)
    val conf = new SparkConf
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    try {
      val filePath = dirPath+"n.txt"
      import sqlContext.implicits._
      val nDF = sc.textFile(filePath).map { record =>
        val splitRecord = record.split(",")
        val nId = splitRecord(0)
        NUser(nId)
      }.toDF

    exec1(dirPath,sc,sqlContext,nDF)
    }
  }
}
build.sbt
name := "Test"
version := "0.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10" % "1.5.0" % "provided", "org.apache.spark" % "spark-sql_2.10" % "1.5.0" % "provided")
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

コンパイル

$ sbt assembly

ジョブ実行

$ spark-submit --class Test--name Test target/scala-2.10/Test-assembly-0.1.jar /user/yotsu/input/
2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?