LoginSignup
2
3

More than 5 years have passed since last update.

はじめてのSpark

Posted at

導入から実行まで

1.scalaをinstallする

brew install scala

2.sparkをclone

git clone https://github.com/apache/spark.git

3.spark直下へ移動

cd spark 

4.versionを1.3.1に変更

git checkout -b version2 v1.3.1

5.mvnを使用し、install

mvn -DskipTests clean package

6.work/logs/test.txtにテストデータを作成

7.spark-shellを実行

macbook:spark user$ ./bin/spark-shell 

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1
      /_/

8.ローカルのファイルからRDDを生成

scala> val file = sc.textFile("work/logs/test.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

9.user1というカラムでmaping

scala> val filter = file.filter(_.contains("user1"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23

10.print処理

scala>  println(filter.count)
2

spark-submitを使用しstandaloneを作成

1.installしたsparkにpathを通す

echo 'alias spark-shell="~/work/spark/bin/spark-shell"' >>~/.bash_aliases
echo 'alias spark-submit="~/work/spark/bin/spark-submit"'>>~/.bash_aliases
source ~/.bash_alias

2.sbtをinstall

brew install sbt

3.file構成

$ mkdir ~/work
$ cd work
$ mkdir spark
$ mkdir spark/src
$ mkdir spark/src/main
$ mkdir spark/src/main/scala
$ mkdir spark/project
$ mkdir spark/log
$ vi spark/log/access_log #access_logは適宜用意
$ mkdir spark/target

4.build.sbtを作成 ※1

build.sbt
//build-version
version := "0.1" 

//scalaVersion
scalaVersion := "2.11.7" 

5.依存ライブラリのinstall
libraryDependencies ++= Seq(
   "org.xerial" % "sqlite-jdbc" % "3.7.2",
   "org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
   //"org.apache.spark" %% "spark-streaming" % "1.3.1"
   //"org.apache.spark" %% "spark-streaming-kafka" % "1.3.1"
)

//※3 重複防止
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

6.plugin.sbtを作成

plugin.sbt
logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

7.access_logをParseし件数を取得(Example.scala)

Example.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//import org.apache.spark.streaming.{Time, Seconds, StreamingContext}

object ExampleApp {
  def main(args: Array[String]) {
    //用意したaccess_log
    val logFile = "log/access_log"
    //AppNameの設定コメントで書いているがsubmitコマンド押下時に読み込まれるAppNameはファイル名
    val conf = new SparkConf().setAppName("Example Application")
    //sparkcontextのインスタンス生成
    val sc = new SparkContext(conf)
    //access_logデータを取得と同時にキャッシュに読み込み
    val logData = sc.textFile(logFile, 2).cache()
    //logDataの中からlinkを取得+count
    val index = logData.filter(line => line.contains("index.html")).count()
    val html1 = logData.filter(line => line.contains("1.html")).count()
    val html2 = logData.filter(line => line.contains("2.html")).count()
    val gif1  = logData.filter(line => line.contains("1.gif")).count()
    val gif2  = logData.filter(line => line.contains("2.gif")).count()
    //表示
    println("Lines with index.html: %s".format(index))
    println("Lines with 1.html: %s".format(html1))
    println("Lines with 2.html: %s".format(html2))
    println("Lines with 1.gif:  %s".format(gif1))
    println("Lines with 2.gif:  %s".format(gif2))
  }
}

8.jarファイルの作成 ※1

sbt assembly

9.spark-submitの実行

spark-submit --class ExampleApp target/scala-2.11/scala-assembly-0.1.jar

10.実行結果(結果のみ)

Lines with index.html: 141882
Lines with 1.html: 19996
Lines with 2.html: 18567
Lines with 1.gif:  20996
Lines with 2.gif:  52393

11.etc
※1

Name, Version, ScalaVersion, LibraryDependencies などを

間に空行を入れて記述します。最後にも空行を入れます。

※2

sbtのversionによって記述が必要なようです。
今回のversionではあるとエラー

※3

初期設定時この記述が無く、依存ライブラリが重複しjarファイル作成時にエラーが出ていた

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3