LoginSignup
1
1

More than 5 years have passed since last update.

Sparkで配列型のデータを加工する備忘録(その2)

Posted at

まずはじめに

前回の記事@poad1010 様にご指摘いただきもっと簡単に描けそうだったので、新しい記事として投稿してみました。
結果的には前回がクソコードすぎて、コード量的には半分以下になりました。

後述するおまじないの部分はZeppelinとかで動かしている時には遭遇しませんが、jarにするときとかにコンパイルエラーが起こるので注意すべし

やりたかったこと

idだけを格納した配列があり、それをSpark上でidからvalueを引っ張ってきて、「:」でつなげる処理をしたかった。

具体的なイメージとしては

[1, 3, 5, 7]

といったデータを以下の形式に変更したかった。

りんご:もも:バナナ:パイナップル

環境

  • spark(2.1.0)
  • scala(2.11.8)

やったこと

  • explodeで配列を展開
  • 商品が載っているDatasetを外部結合
  • groupByKeyでキーごとに値をまとめる

具体例

  • ユーザー購入データ
user_id buy_list
1 [1, 2, 3]
2 [2, 4]
  • 商品データ
product_id product_name
1 りんご
2 バナナ
3 レモン
4 みかん

結合コードは以下。

# spark: SparkSession インスタンス
# df: ユーザー購入データのDataset
# procuctDf: 商品データのDataset

# おまじない(後述)
val sparkSession: SparkSession = spark
import sparkSession.implicits._

val master = df.select(df.col("user_id"), explode(df.col("buy_list")).as("buy_list"))
  .join(procuctDf, procuctDf.col("product_id").equalTo(col("buy_list")))
  .select(df.col("user_id"), procuctDf.col("product_name"))
  .as(Encoders.tuple(Encoders.STRING, Encoders.STRING))
  .groupByKey(x => x._1)
  .mapGroups((x, y) => (x,  y.map(z => z._2).mkString(":")))
  .select(col("_1").as("user_id2"), col("_2").as("product_name2"))

master.show()とかすると以下の結果になっているはず

user_id2 product_name2
1 りんご:レモン:バナナ
2 バナナ:みかん

おまじないについて

一番最初はこんなコードを書いてました。

val master = df.select(df.col("user_id"), explode(df.col("buy_list")).as("buy_list"))
  .join(procuctDf, procuctDf.col("product_id").equalTo(col("buy_list")))
  .select(df.col("user_id"), procuctDf.col("product_name"))
  .groupByKey(x => x.getString(0))
  .mapGroups((x, y) => (x, y.map(z => z.getString(1)).mkString(":")))
  .select(col("_1").as("user_id2"), col("_2").as("product_name2"))

このコードでコンパイルすると、

 Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

と言うエラーが。
groupByKeyでRowクラスを使っているのが悪いのかな。

と言うことで、これに対応するには以下のおまじないと

val sparkSession: SparkSession = spark
import sparkSession.implicits._

groupByKeyの前段階で以下の処理を追加する。

as(Encoders.tuple(Encoders.STRING, Encoders.STRING))
1
1
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1