概要
- 以下のページのSparkのtutorialをやる
- Sparkって?
- ScalaとPythonで試してみる
- Sparkの内部はScalaで書かれているが、PythonやJavaのAPIも提供されている
- Pythonを使う場合はPysparkを使う
環境構築
- macOS Mojave, 10.14.3
- spark
- 公式サイトからinstallする
- https://spark.apache.org/downloads.html
- 古いバージョンを入れたい場合はArchived Releasesから選ぶ
- 今回は訳あって古いspark2.2.0を入れた
- .bashrcとかにSPARK_HOMEをちゃちゃっと設定
export SPARK_HOME='/usr/local/bin/spark-2.2.0-bin-hadoop2.7'
installすとときにwithout-hadoop
を選んだ場合は別にhadoopを入れてHADOOP_HOMEとかの設定をする必要があるみたい。
PySparkを使う場合は
pip pyspark
とかで入れる。sparkとバージョンがずれてるとうまく動かなかったりするので、バージョンを指定したい場合は
pip pyspark-2.2.1
とかやって指定する。
Scalaで動かしてみる
ディレクトリ構成はこんな感じ。
中身はこれから説明する。
$ tree
.
├── SimpleApp.scala
├── build.sbt
├── input.txt
└── run.sh
SimpleApp.scalaはtutorialとほぼ同じ。inputだけちょっと変えてある。
text fileを読み込んで、"a"と"p"がいくつ含まれるかを数える。
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "input.txt" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numPs = logData.filter(line => line.contains("p")).count()
println(s"Lines with a: $numAs, Lines with p: $numPs")
spark.stop()
}
}
input.txtの中は適当な文章
this is a pen
this is an apple
apple pen
pen pen
sbt package
を実行すると、build.sbtの内容をベースにtarget以下にjar fileを生成してくれる。(targetもこの時勝手に作られる)
build.sbtの例
name := "simple"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
この場合、生成されるjar fileはtarget/scala-2.11/simple_2.11-1.0.jar
になる。
run.shは実行用のスクリプト。
spark-submit \
--class SimpleApp \
--master local[4] \
--conf spark.driver.host=localhost \
target/scala-2.11/simple_2.11-1.0.jar
sh run.sh
を実行すると結果がずらずら出てくるが、
Lines with a: 3, Lines with p: 4
が含まれていればおっけー。
inputの分のうちaが含まれているのが3行、pが含まれているのが4行という意味。
spark.driver.host=localhost
の部分はtutorialには含まれていなかったが、これを書かないと自分の環境だと
Exception in thread "main" java.lang.AssertionError: assertion failed: Expected hostname
が出ちゃったので追加した。
Pythonで動かしてみる
Pythonの場合はもう少し簡単。
ファイル構成は以下
$ tree
.
├── SimpleApp.py
├── input.txt
└── run.sh
from pyspark.sql import SparkSession
logFile = "input.txt" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numPs = logData.filter(logData.value.contains('p')).count()
print("Lines with a: %i, lines with p: %i" % (numAs, numPs))
spark.stop()
spark-submit \
--master local[4] \
--conf spark.driver.host=localhost \
SimpleApp.py
input.txtは同じ。pipでPyparkが入ってればスクリプトを実行しなくても
python SimpleApp.py
だけで実行できるっぽい。
簡単な説明
今回はSparkSessionを用いてsparkを実行している。
SparkContextを使っても同じようなことができる。
SparkContextとSparkSessionの違いがよくわからずに調べたら、SparkSessionは内部にSpark Contextを内包していて、SparkContextよりも後に登場した書き方みたいだ。
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
でappNameを指定してsparkインスタンスを生成する。
logData = spark.read.text(logFile).cache()
でファイルを読み込む。cache()はメモリ内に永続化する設定のようだ。(今回のコードだとなしでも動いた)
今回はtext fileを読み込んでいるが、csvファイルやsqlのテーブルのような構造化データでも読み込むことができる。
読み込まれたデータはrdd
によって並列化されて計算される。
rddにはmapやfileterなどいろんな関数が用意されている。
filterは文字通りフィルタリングするためのメソッド。
今回はファイルを1行ずつ読み取って、count()を使って特定の文字列を含むものがいくつあるかを数えている。
numAs = logData.filter(logData.value.contains('a')).count()
SparkSessionはhttpサーバとして起動しているみたいなので、最後にstop()
で止める。
spark.stop()
まとめ
- sparkのtutorialをやった
- 説明に間違いなどがあれば随時訂正、更新予定