もっと綺麗に書けたので、記事を書き直しました
idだけを格納した配列があり、それをSpark上でidからvalueを引っ張ってきて、「:」でつなげる処理をしたかった。
具体的なイメージとしては
[1, 3, 5, 7]
といったデータを以下の形式に変更したかった。
りんご:もも:バナナ:パイナップル
環境
- spark(2.0.0)
- scala(2.11.5)
やったこと
- flatMapValuesで配列を展開
- 商品が載っているRDDを外部結合
- groupByKeyでキーごとに値をまとめる
具体例
- ユーザー購入データ
user_id | buy_list |
---|---|
1 | [1, 2, 3] |
2 | [2, 4] |
- 商品データ
product_id | product_name |
---|---|
1 | りんご |
2 | バナナ |
3 | レモン |
4 | みかん |
商品データをRDDに
val productRDD = productDF
.select("product_id","product_name")
.rdd
.map(x => {
// tupleの一番目はidに!
(x.getString(0), x.getString(1))
})
ユーザー購入データをRDDにして、商品データを外部結合
/**
* 経由駅を連結して表示するため、分解
*/
val userBuylistRDD = userBuylistRDD.select("user_id", "buy_list").rdd.map(x => {
val id = x.getString(0)
val codes = x.getSeq[String](1)
(id, codes)
}).flatMapValues(x => x) // ここで平坦にしているよ
.map(x => (x._2, x._1)) // 外部結合するために、商品IDをtupleの一番目の引数にしているよ
.leftOuterJoin(stationRDDCodeAndName)
.map(x => {
// なんか適当な処理
})
この時点で以下のようなデータ構造となっている
user_id | buy_list |
---|---|
1 | りんご |
1 | レモン |
1 | バナナ |
2 | バナナ |
2 | みかん |
この後にgroupByKeyを使ってキーごとに値をまとめる作業をすれば、当初目的としていた「:」でつなげる処理が完成する
val userBuylistRDD = userBuylistRDD.select("user_id", "buy_list").rdd.map(x => {
val id = x.getString(0)
val codes = x.getSeq[String](1)
(id, codes)
}).flatMapValues(x => x) // ここで平坦にしているよ
.map(x => (x._2, x._1)) // 外部結合するために、商品IDをtupleの一番目の引数にしているよ
.leftOuterJoin(stationRDDCodeAndName)
.map(x => {
//さっきのところを具体的に。
(x._2._1, x._2._2.getOrElse(""))
}).groupByKey().map(x => (x._1, x._2.mkString(":")))
んでこれは最終的に以下のようなデータになる
user_id | buy_list |
---|---|
1 | りんご:レモン:バナナ |
2 | バナナ:みかん |
注意
groupByKeyでまとめる時に配列の順番は変わってるっぽい。
順番を重視するデータならば、少し違ったやり方が必要かも?