LoginSignup
7
4

More than 5 years have passed since last update.

RDBとSpark DataFrameにおける集計、順序の取扱い方

Last updated at Posted at 2018-05-15

概要

RDBでよくやるSQLを使った集計と順序づけをSpark DataFrameでやってみました。その時のメモです。

なお、利用したミドルウェアの情報は以下になります。
いずれもMacOSXのローカルマシンにインストールして利用しています。

  • PostgreSQL v10.1
  • Apache Spark v2.3.0

Sparkはspark-shell(ScalaのREPL)を起動して簡単に確認してみました。

事前準備

以下の書籍の3章を参考に、データを用意してみました。

[ビッグデータ分析・活用のためのSQLレシピ]
https://book.mynavi.jp/ec/products/detail/id=65863

全部読めてないけど、とってもいい本です。

PostgreSQLで使うテーブル

-- 商品レビューテーブル
create table if not exists item_review
(
  user_id varchar(255)
 ,product_id varchar(255)
 ,score numeric
 ,primary key (user_id,product_id)
);

insert into item_review (user_id, product_id, score)
values
  ('U001','A001',4.0)
 ,('U001','A002',5.0)
 ,('U001','A003',5.0)
 ,('U002','A001',2.0)
 ,('U002','A002',3.0)
 ,('U002','A003',4.0)
 ,('U003','A001',5.0)
 ,('U003','A002',4.0)
 ,('U003','A003',4.0)
;

-- 製品テーブル
create table if not exists products (
   product_id varchar(255)
  ,category   varchar(255)
  ,score      numeric
  ,primary key (product_id)
);

insert into products
values
   ('A001', 'action', 94)
  ,('A002', 'action', 81)
  ,('A003', 'action', 78)
  ,('A004', 'action', 64)
  ,('D001', 'drama' , 90)
  ,('D002', 'drama' , 82)
  ,('D003', 'drama' , 78)
  ,('D004', 'drama' , 58)
;

Sparkで使うDataFrameの生成元csvファイル

/tmp/item_review.csv
user_id,product_id,score
U001,A001,4.0
U001,A002,5.0
U001,A003,5.0
U002,A001,2.0
U002,A002,3.0
U002,A003,4.0
U003,A001,5.0
U003,A002,4.0
U003,A003,4.0
/tmp/products.csv
product_id,category,score
A001,action,94
A002,action,81
A003,action,78
A004,action,64
D001,drama,90
D002,drama,82
D003,drama,78
D004,drama,58

spark-shellから以下のように読み込んで使うようにします。

$ spark-shell --master local

scala> val df = spark.read.option("header","true").csv("/tmp/item_review.csv")
df: org.apache.spark.sql.DataFrame = [user_id: string, product_id: string ... 1 more field]

scala> df.show()
+-------+----------+-----+
|user_id|product_id|score|
+-------+----------+-----+
|   U001|      A001|  4.0|
|   U001|      A002|  5.0|
|   U001|      A003|  5.0|
|   U002|      A001|  2.0|
|   U002|      A002|  3.0|
|   U002|      A003|  4.0|
|   U003|      A001|  5.0|
|   U003|      A002|  4.0|
|   U003|      A003|  4.0|
+-------+----------+-----+

scala> val df2 = spark.read.option("header","true").csv("/tmp/products.csv")
df2: org.apache.spark.sql.DataFrame = [product_id: string, category: string ... 1 more field]

scala> df2.show()
+----------+--------+-----+
|product_id|category|score|
+----------+--------+-----+
|      A001|  action|   94|
|      A002|  action|   81|
|      A003|  action|   78|
|      A004|  action|   64|
|      D001|   drama|   90|
|      D002|   drama|   82|
|      D003|   drama|   78|
|      D004|   drama|   58|
+----------+--------+-----+

dfdf2を以降のインプットデータとします。

1. データの集約

1.1. PostgreSQL

select
  user_id
 ,count(*) as total_count
 ,sum(score) as sum
 ,avg(score) as avg
 ,min(score) as min
 ,max(score) as max
from
  item_review
group by
  user_id
;


 user_id | total_count | sum  |        avg         | min | max
---------+-------------+------+--------------------+-----+-----
 U001    |           3 | 14.0 | 4.6666666666666667 | 4.0 | 5.0
 U002    |           3 |  9.0 | 3.0000000000000000 | 2.0 | 4.0
 U003    |           3 | 13.0 | 4.3333333333333333 | 4.0 | 5.0
(3 rows)

1.2. Spark DataFrame

scala> :paste
// Entering paste mode (ctrl-D to finish)

df.groupBy("user_id")
.agg(
  count("user_id") as "total_count",
  sum("score") as "sum",
  avg("score") as "avg",
  min("score") as "min",
  max("score") as "max")
.show()

// Exiting paste mode, now interpreting.

+-------+-----------+----+-----------------+---+---+
|user_id|total_count| sum|              avg|min|max|
+-------+-----------+----+-----------------+---+---+
|   U002|          3| 9.0|              3.0|2.0|4.0|
|   U003|          3|13.0|4.333333333333333|4.0|5.0|
|   U001|          3|14.0|4.666666666666667|4.0|5.0|
+-------+-----------+----+-----------------+---+---+

2. ウィンドウ関数による集計

2.1. PostgreSQL

select 
   user_id
  ,product_id
  ,score
   -- 全体の平均スコア
  ,avg(score) over() as avg_score
  -- ユーザの平均スコア
  ,avg(score) over(partition by user_id) as user_avg_score 
  -- 個別のスコアとユーザの平均スコアとの差
  ,score - avg(score) over(partition by user_id) as user_avg_score_diff
from
  item_review
;


 user_id | product_id | score |     avg_score      |   user_avg_score   | user_avg_score_diff
---------+------------+-------+--------------------+--------------------+---------------------
 U001    | A001       |   4.0 | 4.0000000000000000 | 4.6666666666666667 | -0.6666666666666667
 U001    | A002       |   5.0 | 4.0000000000000000 | 4.6666666666666667 |  0.3333333333333333
 U001    | A003       |   5.0 | 4.0000000000000000 | 4.6666666666666667 |  0.3333333333333333
 U002    | A001       |   2.0 | 4.0000000000000000 | 3.0000000000000000 | -1.0000000000000000
 U002    | A002       |   3.0 | 4.0000000000000000 | 3.0000000000000000 |  0.0000000000000000
 U002    | A003       |   4.0 | 4.0000000000000000 | 3.0000000000000000 |  1.0000000000000000
 U003    | A001       |   5.0 | 4.0000000000000000 | 4.3333333333333333 |  0.6666666666666667
 U003    | A002       |   4.0 | 4.0000000000000000 | 4.3333333333333333 | -0.3333333333333333
 U003    | A003       |   4.0 | 4.0000000000000000 | 4.3333333333333333 | -0.3333333333333333
(9 rows)

2.2. Spark DataFrame

// Window関数を扱うにはインポートが必要。
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> :paste
// Entering paste mode (ctrl-D to finish)

df.select("user_id","product_id","score")
  .withColumn("avg_score", avg("score").over()) // partitionのキーに指定されていないため警告が出るが結果としては問題ありません。
  .withColumn("user_avg_score", avg("score").over(Window.partitionBy("user_id")))
  .withColumn("user_avg_score_diff", col("score") - col("user_avg_score"))
  .show()

// Exiting paste mode, now interpreting.

2018-05-16 00:26:15 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------+----------+-----+---------+-----------------+--------------------+
|user_id|product_id|score|avg_score|   user_avg_score| user_avg_score_diff|
+-------+----------+-----+---------+-----------------+--------------------+
|   U001|      A001|  4.0|      4.0|4.666666666666667|  -0.666666666666667|
|   U001|      A002|  5.0|      4.0|4.666666666666667| 0.33333333333333304|
|   U001|      A003|  5.0|      4.0|4.666666666666667| 0.33333333333333304|
|   U002|      A001|  2.0|      4.0|              3.0|                -1.0|
|   U002|      A002|  3.0|      4.0|              3.0|                 0.0|
|   U002|      A003|  4.0|      4.0|              3.0|                 1.0|
|   U003|      A001|  5.0|      4.0|4.333333333333333|   0.666666666666667|
|   U003|      A002|  4.0|      4.0|4.333333333333333|-0.33333333333333304|
|   U003|      A003|  4.0|      4.0|4.333333333333333|-0.33333333333333304|
+-------+----------+-----+---------+-----------------+--------------------+

3. ウィンドウ関数で順序づけ

3.1. PostgreSQL

SELECT
   product_id
  ,score

  -- スコアに一意なランキング付与
  ,row_number() over(ORDER BY score desc) AS ROW

  -- 同順位を許容するランキングを付与
  ,rank() over(ORDER BY score desc) AS RANK

  -- 同順位を許容し、同順位の次の順位を飛ばさないランキングを付与
  ,dense_rank() over(ORDER BY score desc) AS DENSE_RANK

  -- 現在の行より前の行の値を取得する
  ,lag(product_id) over(ORDER BY score desc) AS lag1
  ,lag(product_id,2) over(ORDER BY score desc) AS lag2

  -- 現在より後の行を取得する
  ,lead(product_id) over(ORDER BY score desc) AS lead1
  ,lead(product_id,2) over(ORDER BY score desc) AS lead2

FROM
  products
ORDER BY
  ROW
;


 product_id | score | row | rank | dense_rank | lag1 | lag2 | lead1 | lead2
------------+-------+-----+------+------------+------+------+-------+-------
 A001       |    94 |   1 |    1 |          1 |      |      | D001  | D002
 D001       |    90 |   2 |    2 |          2 | A001 |      | D002  | A002
 D002       |    82 |   3 |    3 |          3 | D001 | A001 | A002  | A003
 A002       |    81 |   4 |    4 |          4 | D002 | D001 | A003  | D003
 A003       |    78 |   5 |    5 |          5 | A002 | D002 | D003  | A004
 D003       |    78 |   6 |    5 |          5 | A003 | A002 | A004  | D004
 A004       |    64 |   7 |    7 |          6 | D003 | A003 | D004  |
 D004       |    58 |   8 |    8 |          7 | A004 | D003 |       |
(8 rows)

3.2. Spark DataFrame

// Window関数を扱うにはインポートが必要。
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> :paste
// Entering paste mode (ctrl-D to finish)

df2.select("product_id","score") 
  .withColumn("row",row_number().over(Window.orderBy(desc("score"))))
  .withColumn("rank",rank().over(Window.orderBy(desc("score"))))
  .withColumn("dense_rank",dense_rank().over(Window.orderBy(desc("score"))))
 .withColumn("lag1",lag("product_id",1).over(Window.orderBy(desc("score"))))
  .withColumn("lag2",lag("product_id",2).over(Window.orderBy(desc("score"))))
.withColumn("lead1",lead("product_id",1).over(Window.orderBy(desc("score"))))
  .withColumn("lead2",lead("product_id",2).over(Window.orderBy(desc("score"))))
  .orderBy("row")
  .show()

// Exiting paste mode, now interpreting.

2018-05-16 01:40:29 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------+-----+---+----+----------+----+----+-----+-----+
|product_id|score|row|rank|dense_rank|lag1|lag2|lead1|lead2|
+----------+-----+---+----+----------+----+----+-----+-----+
|      A001|   94|  1|   1|         1|null|null| D001| D002|
|      D001|   90|  2|   2|         2|A001|null| D002| A002|
|      D002|   82|  3|   3|         3|D001|A001| A002| A003|
|      A002|   81|  4|   4|         4|D002|D001| A003| D003|
|      A003|   78|  5|   5|         5|A002|D002| D003| A004|
|      D003|   78|  6|   5|         5|A003|A002| A004| D004|
|      A004|   64|  7|   7|         6|D003|A003| D004| null|
|      D004|   58|  8|   8|         7|A004|D003| null| null|
+----------+-----+---+----+----------+----+----+-----+-----+

4. ウィンドウ関数でORDER BY句と集約関数を組み合わせる

4.1. PostgreSQL

select
   product_id
  ,score

  -- スコア順に一意なランキングを付与する
  ,row_number() over(order by score desc) as row

 -- ランキング上位からの累計スコアを求める
  ,sum(score) over(
    order by score desc rows between unbounded preceding and current row) 
    as cum_score

  -- いまの行と前後一行ずつの合計3行の平均スコアを計算する
  ,avg(score) over(
    order by score desc rows between 1 preceding and 1 following)
    as local_avg

  -- ランキング最上位の商品IDを取得する
  ,first_value(product_id) over(
    order by score desc rows between unbounded preceding and unbounded following)
    as first_value

  -- ランキング最下位の商品IDを取得する
  ,last_value(product_id) over(
    order by score desc rows between unbounded preceding and unbounded following)
    as last_value

from products
order by row
;


 product_id | score | row | cum_score |      local_avg      | first_value | last_value
------------+-------+-----+-----------+---------------------+-------------+------------
 A001       |    94 |   1 |        94 | 92.0000000000000000 | A001        | D004
 D001       |    90 |   2 |       184 | 88.6666666666666667 | A001        | D004
 D002       |    82 |   3 |       266 | 84.3333333333333333 | A001        | D004
 A002       |    81 |   4 |       347 | 80.3333333333333333 | A001        | D004
 A003       |    78 |   5 |       425 | 79.0000000000000000 | A001        | D004
 D003       |    78 |   6 |       503 | 73.3333333333333333 | A001        | D004
 A004       |    64 |   7 |       567 | 66.6666666666666667 | A001        | D004
 D004       |    58 |   8 |       625 | 61.0000000000000000 | A001        | D004
(8 rows)

4.2. Spark DataFrame

scala> import org.apache.spark.sql.expressions.Window

df2.select("product_id","score")
  .withColumn("row",row_number().over(Window.orderBy(desc("score"))))
  .withColumn("cum_score", sum("score").over(Window.orderBy(desc("score")).rowsBetween(Long.MinValue,0)))
  .withColumn("local_avg", avg("score").over(Window.orderBy(desc("score")).rowsBetween(-1,1)))
  .withColumn("first_value", first("product_id").over(Window.orderBy(desc("score")).rowsBetween(Long.MinValue, Long.MaxValue)))
  .withColumn("last_value", last("product_id").over(Window.orderBy(desc("score")).rowsBetween(Long.MinValue, Long.MaxValue)))
  .orderBy("row")
  .show()

// Exiting paste mode, now interpreting.

2018-05-17 02:28:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2018-05-17 02:28:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2018-05-17 02:28:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2018-05-17 02:28:32 WARN  WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------+-----+---+---------+-----------------+-----------+----------+
|product_id|score|row|cum_score|        local_avg|first_value|last_value|
+----------+-----+---+---------+-----------------+-----------+----------+
|      A001|   94|  1|     94.0|             92.0|       A001|      D004|
|      D001|   90|  2|    184.0|88.66666666666667|       A001|      D004|
|      D002|   82|  3|    266.0|84.33333333333333|       A001|      D004|
|      A002|   81|  4|    347.0|80.33333333333333|       A001|      D004|
|      A003|   78|  5|    425.0|             79.0|       A001|      D004|
|      D003|   78|  6|    503.0|73.33333333333333|       A001|      D004|
|      A004|   64|  7|    567.0|66.66666666666667|       A001|      D004|
|      D004|   58|  8|    625.0|             61.0|       A001|      D004|
+----------+-----+---+---------+-----------------+-----------+----------+

調査が不足していることとして、以下が残っているのであとで追記する。

  • SoarkのSQL実行計画?を確認してみること
  • UNIONALLのSpark実装、実行計画
7
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
4