LoginSignup
0
4

More than 3 years have passed since last update.

【Python, Scala】Apache Sparkのtutorialをやる

Last updated at Posted at 2020-07-18

概要

環境構築

  • macOS Mojave, 10.14.3
  • spark
    • 公式サイトからinstallする
    • https://spark.apache.org/downloads.html
    • 古いバージョンを入れたい場合はArchived Releasesから選ぶ
    • 今回は訳あって古いspark2.2.0を入れた
  • .bashrcとかにSPARK_HOMEをちゃちゃっと設定
export SPARK_HOME='/usr/local/bin/spark-2.2.0-bin-hadoop2.7'

installすとときにwithout-hadoopを選んだ場合は別にhadoopを入れてHADOOP_HOMEとかの設定をする必要があるみたい。
PySparkを使う場合は

pip pyspark

とかで入れる。sparkとバージョンがずれてるとうまく動かなかったりするので、バージョンを指定したい場合は

pip pyspark-2.2.1

とかやって指定する。

Scalaで動かしてみる

ディレクトリ構成はこんな感じ。
中身はこれから説明する。

$ tree
.
├── SimpleApp.scala
├── build.sbt
├── input.txt
└── run.sh

SimpleApp.scalaはtutorialとほぼ同じ。inputだけちょっと変えてある。
text fileを読み込んで、"a"と"p"がいくつ含まれるかを数える。

SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "input.txt" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numPs = logData.filter(line => line.contains("p")).count()
    println(s"Lines with a: $numAs, Lines with p: $numPs")
    spark.stop()
  }
}

input.txtの中は適当な文章

this is a pen
this is an apple
apple pen
pen pen
sbt package

を実行すると、build.sbtの内容をベースにtarget以下にjar fileを生成してくれる。(targetもこの時勝手に作られる)
build.sbtの例

build.sbt
name := "simple"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"

この場合、生成されるjar fileはtarget/scala-2.11/simple_2.11-1.0.jarになる。

run.shは実行用のスクリプト。

run.sh
spark-submit \
 --class SimpleApp \
 --master local[4] \
 --conf spark.driver.host=localhost \
 target/scala-2.11/simple_2.11-1.0.jar 

sh run.shを実行すると結果がずらずら出てくるが、

Lines with a: 3, Lines with p: 4

が含まれていればおっけー。
inputの分のうちaが含まれているのが3行、pが含まれているのが4行という意味。
spark.driver.host=localhostの部分はtutorialには含まれていなかったが、これを書かないと自分の環境だと

Error
Exception in thread "main" java.lang.AssertionError: assertion failed: Expected hostname

が出ちゃったので追加した。

Pythonで動かしてみる

Pythonの場合はもう少し簡単。
ファイル構成は以下

$ tree
.
├── SimpleApp.py
├── input.txt
└── run.sh

SimpleApp.py
from pyspark.sql import SparkSession

logFile = "input.txt"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numPs = logData.filter(logData.value.contains('p')).count()

print("Lines with a: %i, lines with p: %i" % (numAs, numPs))

spark.stop()

run.sh
spark-submit \
  --master local[4] \
  --conf spark.driver.host=localhost \
  SimpleApp.py

input.txtは同じ。pipでPyparkが入ってればスクリプトを実行しなくても

python SimpleApp.py

だけで実行できるっぽい。

簡単な説明

今回はSparkSessionを用いてsparkを実行している。
SparkContextを使っても同じようなことができる。
SparkContextとSparkSessionの違いがよくわからずに調べたら、SparkSessionは内部にSpark Contextを内包していて、SparkContextよりも後に登場した書き方みたいだ。

spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

でappNameを指定してsparkインスタンスを生成する。

logData = spark.read.text(logFile).cache()

でファイルを読み込む。cache()はメモリ内に永続化する設定のようだ。(今回のコードだとなしでも動いた)
今回はtext fileを読み込んでいるが、csvファイルやsqlのテーブルのような構造化データでも読み込むことができる。

読み込まれたデータはrddによって並列化されて計算される。
rddにはmapやfileterなどいろんな関数が用意されている。

filterは文字通りフィルタリングするためのメソッド。
今回はファイルを1行ずつ読み取って、count()を使って特定の文字列を含むものがいくつあるかを数えている。

numAs = logData.filter(logData.value.contains('a')).count()

SparkSessionはhttpサーバとして起動しているみたいなので、最後にstop()で止める。

spark.stop()

まとめ

  • sparkのtutorialをやった
  • 説明に間違いなどがあれば随時訂正、更新予定
0
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
4