3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

「Sparkによる実践データ解析」を実践

Last updated at Posted at 2019-10-02

はじめに

Sparkによる実践データ解析
スクリーンショット 2019-10-03 1.32.20.png

Sparkの勉強でこちらの書籍を読み始めたので、備忘録的に実施内容を綴りたいと思います。
基本的に書籍の内容と同じことを行うため、細かい説明はしておりません。

Sparkとは

Apache Sparkは、複数のマシンからなるクラスタに渡ってプログラムを分散させるためのエンジンを、そのエンジン上でプログラムを書くための洗練されたモデルと組み合わせたオープンンソースのフレームワークです。
(本文より抜粋)

大規模データ処理を高速に行うための分散フレームワークで、 Python, Scala, SQLから利用することができます。本記事では書籍の流れに沿って、Scalaによる実装や、MLlibという機械学習ライブラリを使ってみたいと思います。Scalaどころか、Javaすらまともに触ったことがないですが、習うより慣れろで進めてゆきます。

環境準備

今回は手っ取り早く、かつAWSの勉強の意味も込めて、AWSのEMR Notebookを使いたいと思います。
Amazon EMR > ノートブック > ノートブックの作成 として一瞬でノートブックを作成できます。
スクリーンショット 2019-10-03 1.09.50.png

第2章:ScalaとSparkによるデータ分析の紹介

Scalaを使って基本的な集計を行います。
データとしてカリフォルニア大学アーバイン校のMachine Learning Repositoryに含まれているサンプルデータセットを使います。

クラスタからクライアントへのデータの転送

// csvファイルからRDDを作成
val rawblocks = sc.textFile("s3://s3上ファイルのパス") 

// RDDの先頭の要素を取得
rawblocks.first

// 最初の10行を取得
val head = rawblocks.take(10)
head.foreach(println)

// ヘッダ行を除く関数
def isHeader(line: String): Boolean = {
    line.contains("id_1")
}
//  ヘッダ行を表示
head.filter(isHeader).foreach(println)

// ヘッダ行以外を取得
head.filter(x => ! isHeader(x)).length
// head.filter(!isHeader(_)).length

// RDDに対してフィルタリング
val noheader = rawblocks.filter(x => !isHeader(x))

noheader.first 

データの構造化

レコード中の要素(患者1ID, 患者2ID, マッチスコア, マッチ判定)をタプルにパース

val line = head(5)
val pieces = line.split(',')

// 各要素を型変換
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean

val rawscores = pieces.slice(2,11)
def toDouble(s:String) = {
    if ("?".equals(s)) Double.NaN else s.toDouble
}

// パースされた値をタプルに入れて返す関数
def parse(line:String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2,11).map(toDouble)
    val matched = pieces(11).toBoolean
    (id1, id2, scores, matched)
}
val tup = parse(line)
// タプルの操作
tup._1
tup.productElement(0)
tup.productArity
// ケースクラスを利用して、名前レコードを作成する
case class MatchData(id1: Int, id2:Int, scores: Array[Double], matched: Boolean)

def parse(line:String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2,11).map(toDouble)
    val matched = pieces(11).toBoolean
    MatchData(id1, id2, scores, matched)
}
val md = parse(line)
md.matched
md.id1
md.scores
// 配列headに適用
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
// RDDに適用
// 実際に適用されるのは、RDDに対して何らかの出力呼び出しがあった時
val parsed = noheader.map(line => parse(line))
// パースされた結果をキャッシュしておく
parsed.cache()

集計・ヒストグラムの作成

// ローカルデータに対して集計
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)

val matchCounts = parsed.map(md => md.matched).countByValue

val matchCountsSeq = matchCounts.toSeq //seq型に変更
matchCountsSeq.sortBy(_._1).foreach(println)
  •  結果
    matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
    (false,5728201)
    (true,20931)

まとめ・今後の目標

scalaによるsparkプログラムの実行ができました。
次回はMllibを用いた機械学習処理ができればと思います。

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?