やりたいこと
Spark のデータフレームを使ってデータを整形した縦持ちになっているデータをマップ形式に集約して横持ちにしたい
こんな感じ
Column1(String) | Column2(String) | Column3(Int) |
---|---|---|
a | x | 2 |
a | y | 3 |
b | z | 1 |
a | x | 5 |
このようなデータを集約して、
Column1(String) | Column2(Map[String, Int]) |
---|---|
a | Map(x->7, y->3) |
b | Map(z->1) |
前提
既にデータをファイルやHDFSなどから取り出して、DataFrameとして扱える状態になっている
- DataFrameは、
Spark 1.3.0 +
の機能です。
方法
SparkのDataFrameを使った集約
def summarizeAsMap(df: DataFrame): RDD[(String, Map[String, Long])] = {
// 最初は、 RDD[((String, String), Long)] のデータ構造にreduceする
val firstReduce = df.rdd.map(row => row.getString(0) -> row.getString(1) -> row.getLong(2)).reduceByKey(_ + _)
// 次にuidをkey, Mapをvalueとするreduce
// reduceされたデータ構造 RDD[(String, Map(String, Long))]
val mapRdd = firstReduce.map(e => e._1._1 -> Map(e._1._2 -> e._2)).reduceByKey(_++_)
return mapRdd
}
- 3行目の
firstReduce
では、DataFrame
を一度RDD
形式に変換して、一行の要素毎で扱える形式に展開してます-
reduceByKey
で集計を行うため、row
の要素をそれぞれrow.getString
やrow.getLong
などで型を定義 -
row => row.getString(0) -> row.getString(1) -> row.getLong(2)
はrowを展開してタプル形式で保持 -
reduceByKey
の結果、RDD[((String, String), Long)]
のデータ構造に変換(してるはず。。。)
-
- 6行目の
mapRdd
では、集約した結果をMap
形式で保持しているRDD
データに変換-
reduceByKey
で集計を行うため、firstReduce
の要素をColumn1, (Map(Column2->Column3),Map(Column2->Column3), ...)
Column1とMapの配列のデータ形式化 -
row => row.getString(0) -> row.getString(1) -> row.getLong(2)
はrowを展開してタプル形式で保持 -
reduceByKey(_++_)
の結果、Mapの結果が更新される- Map同士の更新なので、
Key
が重複する場合Value
は加算される
- Map同士の更新なので、
-
- 結果は
RDD
形式で返るので、戻り値に対してtoDF
を使ってカラム名を再定義してください。
感想
Scalaわかりにくい気がしてます。
Scalaの特徴の一つである柔軟さが、時間が空いた時にソースを見直すときに与える負荷がどの程度になるかがコワイっす。
わかりやすいソースを最初から書けばいいというのは当たり前ですが、どんなにわかりやすく書いたつもりでも、時間が経つにつれわかりにくくなっていくものだと思ってます。
特に、Scalaを習い始めたばっかに書いたソースなので。。。
DataFrameについては、RやPandasで提供されているものに近いので、接しやすい方が多いかもしれません。
そして、Sparkは速いっす。