まずはじめに
前回の記事で @poad1010 様にご指摘いただきもっと簡単に描けそうだったので、新しい記事として投稿してみました。
結果的には前回がクソコードすぎて、コード量的には半分以下になりました。
後述するおまじないの部分はZeppelinとかで動かしている時には遭遇しませんが、jarにするときとかにコンパイルエラーが起こるので注意すべし
やりたかったこと
idだけを格納した配列があり、それをSpark上でidからvalueを引っ張ってきて、「:」でつなげる処理をしたかった。
具体的なイメージとしては
[1, 3, 5, 7]
といったデータを以下の形式に変更したかった。
りんご:もも:バナナ:パイナップル
環境
- spark(2.1.0)
- scala(2.11.8)
やったこと
- explodeで配列を展開
- 商品が載っているDatasetを外部結合
- groupByKeyでキーごとに値をまとめる
具体例
- ユーザー購入データ
user_id | buy_list |
---|---|
1 | [1, 2, 3] |
2 | [2, 4] |
- 商品データ
product_id | product_name |
---|---|
1 | りんご |
2 | バナナ |
3 | レモン |
4 | みかん |
結合コードは以下。
# spark: SparkSession インスタンス
# df: ユーザー購入データのDataset
# procuctDf: 商品データのDataset
# おまじない(後述)
val sparkSession: SparkSession = spark
import sparkSession.implicits._
val master = df.select(df.col("user_id"), explode(df.col("buy_list")).as("buy_list"))
.join(procuctDf, procuctDf.col("product_id").equalTo(col("buy_list")))
.select(df.col("user_id"), procuctDf.col("product_name"))
.as(Encoders.tuple(Encoders.STRING, Encoders.STRING))
.groupByKey(x => x._1)
.mapGroups((x, y) => (x, y.map(z => z._2).mkString(":")))
.select(col("_1").as("user_id2"), col("_2").as("product_name2"))
master.show()とかすると以下の結果になっているはず
user_id2 | product_name2 |
---|---|
1 | りんご:レモン:バナナ |
2 | バナナ:みかん |
おまじないについて
一番最初はこんなコードを書いてました。
val master = df.select(df.col("user_id"), explode(df.col("buy_list")).as("buy_list"))
.join(procuctDf, procuctDf.col("product_id").equalTo(col("buy_list")))
.select(df.col("user_id"), procuctDf.col("product_name"))
.groupByKey(x => x.getString(0))
.mapGroups((x, y) => (x, y.map(z => z.getString(1)).mkString(":")))
.select(col("_1").as("user_id2"), col("_2").as("product_name2"))
このコードでコンパイルすると、
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
と言うエラーが。
groupByKeyでRowクラスを使っているのが悪いのかな。
と言うことで、これに対応するには以下のおまじないと
val sparkSession: SparkSession = spark
import sparkSession.implicits._
groupByKeyの前段階で以下の処理を追加する。
as(Encoders.tuple(Encoders.STRING, Encoders.STRING))