はじめに。
こんにちは、7coco といいます。
仕事の関係から Apache Spark に爆速で入門する必要に駆られましたので、覚えておくべきだと感じたことをここにメモしておきます。あくまでメモでありかなり省略した書き方をしていますが、大枠を把握したり、一度学習したことを思い出したり、グーグル検索のためのワードを得るための資料として参考になれば幸いです。
学習にあたって参考にしたのは以下の書物です。少々古く、最新バージョンでは動かないサンプルもあるとのことですが、大枠を捉えるのに支障が少ないこと、職場で採用しているのと同じ Scala 言語を採用していることから選びました。
ApacheSpark 入門
https://www.amazon.co.jp/dp/4798142662/
本記事では「基本編」として主に4章〜6章の内容を要約しています。「応用編」を書くのかは未定です。
ざっくりとした前提知識と用語解説
RDD
Spark のデータ処理には「RDD」と呼ばれるデータ構造を利用している。
RDD は大量のデータを要素として保持する分散コレクション。複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションというかたまりに分割されている。パーティションを分散処理の単位として、RDD をパーティションごとに複数マシンで処理することで、大量のデータを扱える。
他には後述のスケジューリングによって遅延評価される、イミュータブルである、永続化が可能であるという特徴を持つ。
RDD の操作
RDD には変換とアクションと呼ばれる2種類の処理が適用できる。
変換は RDD を加工し、新しい RDD を得る処理。 filter, map, flatMap, zip, reduceBykey, join など。
アクションは RDD の内容を元に目的の結果を得る操作。saveAsTextFile, count など。
アクション処理によって実際にジョブが実行され、遅延されていた RDD の生成や変換、そしてアクションが実行される。
エグゼキュータ
Spark アプリケーションを分散処理するプロセスのこと。
クライアントがクラスタにアプリケーションをデプロイするのと同時に、その実行に必要なエグゼキュータのスペックを指定、起動の要求を行う。
ドライバプログラム
Spark アプリケーションのエントリーポイント。記述された RDD の生成や変換ロジックを元にアプリケーションを制御する。
RDD は遅延評価され、実際の処理はドライバプログラム上では実行されず、ジョブ ( 後述 ) としてクラスタ上で実行される。
タスクのスケジューリングと実行
Spark アプリケーションでは、 RDD の生成からアクションの適用までを「ジョブ」という処理単位として扱う。ジョブは、ドライバプログラムに含まれるスケジューラによってエグゼキュータが処理可能な「タスク」という単位に分割され、実行がスケジューリングされる。
エグゼキュータが個々の多数を処理することで、RDD 全体が分散処理されることになる。
Spark アプリケーションの開発と実行
Spark アプリケーションはソースコードをコンパイルしてJARファイルにパッケージングする必要がある。
今回はこれに sbt を用いる。 sbt は、Scala や Java のコードのコンパイルやライブラリの依存関係の管理、パッケージングなどのビルドプロセスを統合的に管理するためのツール。なお、利用するには JRE か JDK がインストールされている必要がある。
以上については、準備ができている前提で話を進める。
sbt はアプリケーションをパッケージングする手段を標準で提供しているが、外部ライブラリを利用している場合それらが同梱されない。この問題を解決するために、 sbt-assembly プラグインを用いる。
こちらについては、後述の方法で用意できるため現段階で準備する必要はない。
sbt プロジェクトのディレクトリ構造
sbt プロジェクトでは、ディクトリの役割が以下のように決まっている。
ディレクトリ | 役割 |
---|---|
src/main/scala | Scala で記述されたプログラムを配置 |
src/test/scala | Scala で記述されたテストプログラムを配置 |
lib | sbt で管理しない依存ライブラリを配置 |
project | sbt 関連の設定ファイルを配置 |
今回は、上記の決まりに沿って spark-simple-app
という名前でプロジェクトを作成する。
mkdir spark-simple-app
cd spark-simple-app
mkdir -p src/main/scala
mkdir project
sbt のプロジェクトファイル
sbt でアプリケーションをビルドするには、 build.sbt
ファイルを作成し、ビルド定義を記述する必要がある。
追加で sbt-assembly のようなプラグインを利用するには、 plugins.sbt
ファイルに記述する。
build.sbt の作成
build.sbt
はプロジェクトのルートディレクトリ直下に作成する。
記述できるビルド定義には様々な項目があるが、一例として以下を紹介する。
キー | 設定内容 |
---|---|
name | プロジェクトの名前 |
version | プロジェクトのバージョン |
scalaVersion | Scala のバージョン |
libraryDependencies | 依存するライブラリ、及びそのライブラリがコンパイル時やパッケージング時などビルドプロセス中のどの段階で依存するか |
assemblyOption in assembly | sbt-assembly プラグインのオプション |
例えば以下のように記述できる。
name := "spark-simple-app"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies ++=
Seq("org.apache.spark" %% "spark-core" % "2.4.4" % "provided",
"joda-time" % "joda-time" % "2.10.5")
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
libraryDependency は、 "<groupID>" % "<artifactID>" % "<version>" % "<configuration>"
の書式で書く。groupID>
, <artifactID>
, <version>
は依存ライブラリを管理するサイトなどで提供されていて、 例えば Maven が管理している場合は http://search.maven.org から検索できる。
<configrations>
はビルドのどのフェーズで依存するかを制御する設定項目。コンパイル時に必要なライブラリや、 sbt-assembly を使って同梱するライブラリには特に指定は要らない。
今回は spark-core が provided
に設定されている。これはコンパイル時にクラスパスに含まれるが、 sbt-assembly を用いたパッケージング時にはJARファイルに含まなくする。
plugins.sbt の作成
今回例に示したように sbt-assembly を利用するには、 plugins.sbt
を project ディレクトリの直下に作成し、次のように記述する。
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
アプリケーションのビルド
サンプルアプリケーションとして、下記のソースコードを spark-simple-app/src/main/scala/com/example/chapter4
配下に SundayCount.scala
という名前で配置する。
package com.example.chapter4
import org.apache.spark.{SparkConf, SparkContext}
import org.joda.time.{DateTime, DateTimeConstants}
import org.joda.time.format.DateTimeFormat
object SundayCount {
def main(args: Array[String]): Unit = {
if(args.length < 1){
throw new IllegalArgumentException("コマンドの引数に日付が記録されたファイルへのパスを入力してください")
}
val filePath = args(0)
val conf = new SparkConf
val sc = new SparkContext(conf)
try {
// テキストファイルをロード
val textRDD = sc.textFile(filePath)
// 文字列で表現された日付から DateTIme のインスタンスを生成
val dateTimeRDD = textRDD.map { dateStr =>
val pattern = DateTimeFormat.forPattern("yyyyMMdd")
DateTime.parse(dateStr, pattern)
}
// 日曜日だけ抽出
val sundayRDD = dateTimeRDD.filter{ dateTime =>
dateTime.getDayOfWeek == DateTimeConstants.SUNDAY
}
// sundayRDD に含まれる日曜日の数を数える
val numOfSunday = sundayRDD.count()
println(s"与えられたデータの中に日曜日は${numOfSunday}個含まれていました")
} finally {
sc.stop()
}
}
}
ここまで準備できれば、 プロジェクトのトップディレクトリに移動して、 sbt assembly
コマンドでJARファイルを作成できる ( やや時間がかかるが見守ろう ) 。
ビルドが成功するとプロジェクトのルート配下に target ディレクトリが作成されていて、さらにその下に scala-2.11 ディレクトリが、さらにその中にJARファイルが作られる。今回の場合は spark-simple-app/target/scala-2.11/spark-simple-app-assembly-0.1.jar
のようなファイルが作成される。
Sparkアプリケーションの実行
ここまでにビルドしたアプリケーションを例に、Sparkアプリケーションを実行する。
Sparkアプリケーションの実行には spark-submit
コマンドを利用する。書式は以下の通り。
${SPARK_HOME}/bin/spark-submit \
--master <動作モード> \
--class <main メソッドが実装されているクラス> \
<spark-submit コマンドのオプション> \
アプリケーションのクラスが含まれるJARファイル \
<アプリケーションに渡すオプション>
例えば、任意の場所に以下のような date.txt
を置けば、下記のコマンドでアプリケーションの実行ができる。
20150322
20150331
20150417
20150426
20150506
20150523
20150524
20150712
20150728
20150801
20150830
20150927
park-submit \
--master local \
--class com.example.chapter4.SundayCount \
--name SubdayCount \
./target/scala-2.11/spark-simple-app-assembly-0.1.jar \
/path/to/date.txt
--master
で指定できる動作モードはいくつかあるが、以下にうち三つを紹介する。
local モード
spark-submit
コマンドを実行したクライアント上でプロセスを起動し、その中でエグゼキュータを起動してアプリケーションを実行する。
-
--master local
エグゼキュータに1つだけスレッドを割り当てる -
--master local[*]
エグゼキュータにクライアントに搭載されているCPUコア数分のスレッドを割り当てる -
--master local[<数字>]
エグゼキュータに指定した数のスレッドを割り当てる
yarn-client モード / yarn-cluster モード
これらは YARN で管理されたクラスタ上でアプリケーションを実行するモード。クラスタないの NodeManager 上でエグゼキュータが動作し、それぞれのエグゼキュータによって分散処理が行われる。
yarn-client モードではドライバプログラムが spark-submit
を実行したクライアント上で動作する。
yarn-cluster モードではドライバプログラムがクラスタ内の NodeManager 上で動作する。
基本API
Spark が提供する基本APIによって、以下のことが可能。
- RDD の生成と操作
- RDD の永続化
- 永続化先のストレージの制御 ( メモリ or ディスク or 両方 )
- シリアライズの有無の制御
- レプリケーションの制御
- 共有変数 ( ドライバプログラムやタスクをまたいでアクセスされる変数 ) の利用
- アキュムレータ ( ドライバプログラムからは値の設定と参照、タスクからは値の加算のみを行う共有変数 )
- ブロードキャスト変数 ( 全てのエグゼキュータに一度配布され、参照のみされる共有変数 )
数多くある基本APIのうち一部を紹介し、最後に使用例を書く。
挙動についての詳細な説明や、引数については書いていないので適宜補うこと。あくまでもIDEでの補完機能呼び出しやググるときのワードとして。
SparkContext
アプリケーション全体の実行に関係する情報を集約したもの。
ドライバプログラム内で SparkConf
というアプリケーションの設定を行うためのクラスを利用して生成する。
- textFile()
- テキストファイルから RDD を生成するためのメソッド
- stop()
- アプリケーションを停止する
- broadcast()
- 引数をブロードキャスト変数として配布する
- accumulator()
- アキュムレータを作成する ( Spark のデフォルトでは、Int, Long, Float, Double 型のアキュムレータを作成可 )
RDD
アクション処理を行うメソッドは太文字で表記する。
-
collect()
- RDD に含まれる要素を配列にしてドライバプログラムに返却するアクション。
- filter()
- RDD に含まれる要素のうちフィルタリング条件にマッチした要素なみを残した RDD を生成する
- map()
- RDD の要素一つずつに、要素を引数にとる関数を実行して、その結果を変換後の要素として返す
- reduceByKey()
- 要素が ( キー、 バリュー ) 型となる RDD に含まれる要素を同じキーを持つ物でグループ分けし、バリューを集約処理する
- flatMap()
- ネストしたコレクションをフラットにする
- sortByKey()
- ( キー、 バリュー ) のタプルを要素にもつ RDD に対して、キーの代償に基づいて要素全体をソートする
-
take()
- 引数に与えられた数の要素を RDD の先頭から取り出し、配列にして返す
- join()
- ( K, V ) と ( K, W ) のタプル型の要素を持つ二つの RDD で、キーが同じ要素同士を結合して ( K, ( V, W )) 型の要素を持つ RDD を生成する
-
saveAsTextFile()
- 引数で指定したディレクトリを作成し、その中にパーティションごとのテキストファイルを作成して内容を出力する
-
count()
- RDD に含まれる要素の数を数える
-
sum()
- 数値型の要素を持つ RDD を対象に、内部の要素の合計値を計算する
-
reduce()
- RDD に含まれる要素すべてに集約処理を行う
-
foreach()
- RDD に含まれる要素を T とすると、各要素を引数に T => Unit 型の関数を実行する
- persist()
- 引数で設定された永続化レベルに応じて RDD を永続化する
- cache()
- persist メソッドに MEMORY_ONLY を設定して呼び出す場合の簡略的な記法
使用例
その1. ワードカウンタプログラム
英文の書かれたテキストファイルを任意の場所におき、そのファイルパスを引数に指定すると動作する。
package com.example.chapter5
import java.util.Properties
import org.apache.log4j.PropertyConfigurator
import org.apache.spark.{SparkConf, SparkContext}
object WordCountTop3 {
val props = new Properties()
props.load(getClass.getClassLoader.getResourceAsStream("log4j.properties"))
PropertyConfigurator.configure(props)
def main(args: Array[String]): Unit = {
require(args.length >= 1,
"ドライバプログラムの引数に単語をカウントするファイルへのパスを指定してください")
val conf = new SparkConf
val sc = new SparkContext(conf)
try {
// ( 単語, 出現回数 ) のタプルを作る
val filePath = args(0)
val wordAndCountRDD = sc.textFile(filePath)
.flatMap(_.split("[ ,.]"))
.filter(_.matches("""\p{Alnum}+"""))
.map((_, 1))
.reduceByKey(_ + _)
// 出現回数が多い 3つの単語を見つける
val top3Words = wordAndCountRDD.map {
case (word, count) => (count, word)
}.sortByKey(false).map {
case (count, word) => (word, count)
}.take(3)
top3Words.foreach(println)
} finally {
sc.stop()
}
}
}
その2. アンケート集計プログラム
以下のような csv を任意の場所に置き、第一引数にそのファイルパスを、第二引数に任意のパスを指定すると動作する。
23,F,3
22,F,5
20,M,4
35,F,2
33,F,4
18,M,4
28,M,5
42,M,3
18,M,3
56,F,2
53,M,1
30,F,4
19,F,5
17,F,4
33,M,4
26,F,3
22,F,2
27,M,4
45,F,2
package com.example.chapter5
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object QuestionnaireSummarization {
/**
* アンケート全ての評価の平均値を求めるメソッド
*/
private def computeAllAvg(rdd: RDD[(Int, String, Int)]) = {
val (totalPoint, count) = rdd.map(record => (record._3, 1)).reduce {
case ((intermedPoint, intermedCount), (point, one)) =>
(intermedPoint + point, intermedCount + one)
}
totalPoint / count.toDouble
}
/**
* 世代別の平均値を求めるメソッド
*/
private def computeAgeRangeAvg(rdd: RDD[(Int, String, Int)]) = {
rdd
.map(record => (record._1, (record._3, 1)))
.reduceByKey {
case ((intermedPoint, intermedCount), (point, one)) =>
(intermedPoint + point, intermedCount + one)
}
.map {
case (ageRange, (totalPoint, count)) =>
(ageRange, totalPoint / count.toDouble)
}
.collect
}
/**
* 男女別の評価の平均値を求めるメソッド
*/
private def computeMorFAvg(rdd: RDD[(Int, String, Int)],
numMAcc: LongAccumulator,
totalPointMAcc: LongAccumulator,
numFAcc: LongAccumulator,
totalPointFAcc: LongAccumulator) = {
rdd.foreach {
case (_, maleOrFemale, point) =>
maleOrFemale match {
case "M" =>
numMAcc.add(1)
totalPointMAcc.add(point)
case "F" =>
numFAcc.add(1)
totalPointFAcc.add(point)
}
}
Seq(("Male", totalPointMAcc.value / numMAcc.value.toDouble),
("Female", totalPointFAcc.value / numFAcc.value.toDouble))
}
def exec(args: Array[String]) {
require(args.length >= 2,
"""
|アプリケーションの引数に
|<アンケートのCSVファイルのパス>
|<結果の出力先のパス> を指定してください。
|""".stripMargin)
val conf = new SparkConf
val sc = new SparkContext(conf)
try {
val filePath = args(0)
val questionnaireRDD = sc.textFile(filePath).map { record =>
val splitRecord = record.split(",")
val ageRange = splitRecord(0).toInt / 10 * 10
val maleOrFemale = splitRecord(1)
val point = splitRecord(2).toInt
(ageRange, maleOrFemale, point)
}
questionnaireRDD.cache
val avgAll = computeAllAvg(questionnaireRDD)
val avgAgeRange = computeAgeRangeAvg(questionnaireRDD)
val numMAcc = sc.longAccumulator("Number of M")
val totalPointMAcc = sc.longAccumulator("TotalPoint of M")
val numFAcc = sc.longAccumulator("Number of F")
val totalPointFAcc = sc.longAccumulator("TotalPoint of F")
val avgMorF = computeMorFAvg(
questionnaireRDD,
numMAcc,
totalPointMAcc,
numFAcc,
totalPointFAcc
)
println(s"AVG ALL: $avgAll")
avgAgeRange.foreach{
case (ageRange, avg) =>
println(s"AVG Age Range($ageRange): $avg")
}
avgMorF.foreach {
case (mOrF, avg) =>
println(s"AVG $mOrF: $avg")
}
}finally {
sc.stop()
}
}
}
Spark SQL
Spark SQL はデータセットをよういかつ効率的に取り扱うための手段を提供している。
標準でサポートされている構造化データセットは以下の通り。
- JSON フォーマットのファイル
- Parquet フォーマットのファイル
- ORC フォーマットのファイル
- JDBC をサポートするデータソース
- Hive 互換のテーブル
- SparkSQL 独自のテーブル
- など。
Spark SQL ではドラ馬プログラムから様々な形式のデータセットを統一的に扱うために、 DataFrame
と呼ばれる抽象的なデータ構造を用いる。これは RDBMS のテーブルのように行と名前とデータ型が付与された、列の概念を持つデータ構造。
ドライバプログラムの中で Spark SQL を利用する場合、ユーザーはデータセットや RDD をもとに DataFrame
を生成、操作することで処理を記述する。
また、データセットをテーブルとして扱い、専用のインタラクティブシェルや JDBC を介してクエリ言語でデータ処理を記述することもできる。
DataFrame
には実際のデータは含まれておらず、スキーマ情報やデータ読み込み方法の論理プランが含まれている。 実際のデータはアクション操作が行われた時に得られる。
DataFrame
は、DataFrame API という SQL 句に似たメソッドを使う方法か DataFrame
を一時テーブルに登録し、そのテーブルに対してクエリを発行する方法で操作できる。このような操作は基本APIと似ているが、RDD ベースの操作と比べると、可読性が高くなる、オプティマイザが働くといった利点がある。
データセットから DataFrame
をテーブルを生成するために、Spark SQL は Data Sources API を採用している。
Data Sources API の規約にそってデータセットから DataFrame
やテーブルとの対応づけを定義したものは「プロバイダ」と呼ばれ、データセットに対する読み書きや Spark SQL のデータ型との対応づけなどを定義する。
ドライバプログラムから Spark SQL を利用するvばあい、Spark SQL の初期化処理として SQLContext を作成する必要がある。
以上のように、Spark SQL では DataFrame, テーブル形式のデータ構造、そして Data Sources API によってデータセットを処理する統一の方法をユーザーに提供している。
Spark SQL を利用する上で便利なメソッド
アクション処理を行うメソッドは太文字で表記する。
RDD
- toDF()
- RDD から DataFrame を生成する
SqlContext
- sql()
- クエリを発行する
- read()
- DataFrameReader を取得する
- cacheTable()
- テーブルをエグゼキュータにキャッシュする
- uncacheTable()
- テーブルのキャッシュを解除
DataFrame
- printSchema()
- DataFrame のスキーマ情報を表示する
- rdd()
- DataFrame から RDD を生成する
- registerTempTable()
- DataFrame を一時テーブルに登録する
-
show()
- DataFrame が表すデータセットの内容を表示する
- select()
- DataFrame の列を選択する
- where()
- 行のフィルタリングをする
- orderBy()
- 行をソートする
- agg()
- 集約関数を利用する
- groupBy()
- 行をグループ化する
- join()
- DataFrame 同士を結合する
- write()
- DataFrameWtiter を取得する
- chache()
- DataFrame をエグゼキュータにキャッシュする
- unpersist()
- DataFrame のキャッシュを解除
DataFrameWriter
- format()
- ファイル形式のフォーマットを指定する
- save()
- DataFrame の内容をファイルシステム上に書き出す
- saveAsTable()
- DataFrame の内容をテーブルに書き出す
- mode()
- セーブモードを設定する ( セーブモードは以下の通り )
- ErrorExists 例外を発生させる ( デフォルト )
- Append 既既存のデータセットに追記する
- Overwrite 既存のデータセットを上書きする
- Ignore 既存のデータに変更を加えない
- セーブモードを設定する ( セーブモードは以下の通り )
- partitionBy()
- DataFrame が表すデータセットをパーティショニングして書き出せる
DataFrameReader
- format()
- ファイル形式のフォーマットを指定する
- load()
- ファイルから DataFrame を取得する
- table()
- テーブルから DataFrame を取得する
- schema()
- スキーマ情報を設定する