はじめに
Sparkの勉強でこちらの書籍を読み始めたので、備忘録的に実施内容を綴りたいと思います。
基本的に書籍の内容と同じことを行うため、細かい説明はしておりません。
Sparkとは
Apache Sparkは、複数のマシンからなるクラスタに渡ってプログラムを分散させるためのエンジンを、そのエンジン上でプログラムを書くための洗練されたモデルと組み合わせたオープンンソースのフレームワークです。
(本文より抜粋)
大規模データ処理を高速に行うための分散フレームワークで、 Python, Scala, SQLから利用することができます。本記事では書籍の流れに沿って、Scalaによる実装や、MLlibという機械学習ライブラリを使ってみたいと思います。Scalaどころか、Javaすらまともに触ったことがないですが、習うより慣れろで進めてゆきます。
環境準備
今回は手っ取り早く、かつAWSの勉強の意味も込めて、AWSのEMR Notebookを使いたいと思います。
Amazon EMR > ノートブック > ノートブックの作成 として一瞬でノートブックを作成できます。
第2章:ScalaとSparkによるデータ分析の紹介
Scalaを使って基本的な集計を行います。
データとしてカリフォルニア大学アーバイン校のMachine Learning Repositoryに含まれているサンプルデータセットを使います。
クラスタからクライアントへのデータの転送
// csvファイルからRDDを作成
val rawblocks = sc.textFile("s3://s3上ファイルのパス")
// RDDの先頭の要素を取得
rawblocks.first
// 最初の10行を取得
val head = rawblocks.take(10)
head.foreach(println)
// ヘッダ行を除く関数
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
// ヘッダ行を表示
head.filter(isHeader).foreach(println)
// ヘッダ行以外を取得
head.filter(x => ! isHeader(x)).length
// head.filter(!isHeader(_)).length
// RDDに対してフィルタリング
val noheader = rawblocks.filter(x => !isHeader(x))
noheader.first
データの構造化
レコード中の要素(患者1ID, 患者2ID, マッチスコア, マッチ判定)をタプルにパース
val line = head(5)
val pieces = line.split(',')
// 各要素を型変換
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean
val rawscores = pieces.slice(2,11)
def toDouble(s:String) = {
if ("?".equals(s)) Double.NaN else s.toDouble
}
// パースされた値をタプルに入れて返す関数
def parse(line:String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2,11).map(toDouble)
val matched = pieces(11).toBoolean
(id1, id2, scores, matched)
}
val tup = parse(line)
// タプルの操作
tup._1
tup.productElement(0)
tup.productArity
// ケースクラスを利用して、名前レコードを作成する
case class MatchData(id1: Int, id2:Int, scores: Array[Double], matched: Boolean)
def parse(line:String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2,11).map(toDouble)
val matched = pieces(11).toBoolean
MatchData(id1, id2, scores, matched)
}
val md = parse(line)
md.matched
md.id1
md.scores
// 配列headに適用
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
// RDDに適用
// 実際に適用されるのは、RDDに対して何らかの出力呼び出しがあった時
val parsed = noheader.map(line => parse(line))
// パースされた結果をキャッシュしておく
parsed.cache()
集計・ヒストグラムの作成
// ローカルデータに対して集計
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)
val matchCounts = parsed.map(md => md.matched).countByValue
val matchCountsSeq = matchCounts.toSeq //seq型に変更
matchCountsSeq.sortBy(_._1).foreach(println)
- 結果
matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
(false,5728201)
(true,20931)
まとめ・今後の目標
scalaによるsparkプログラムの実行ができました。
次回はMllibを用いた機械学習処理ができればと思います。