概要
RDBでよくやるSQLを使った集計と順序づけをSpark DataFrameでやってみました。その時のメモです。
なお、利用したミドルウェアの情報は以下になります。
いずれもMacOSXのローカルマシンにインストールして利用しています。
- PostgreSQL v10.1
- Apache Spark v2.3.0
Sparkはspark-shell(ScalaのREPL)を起動して簡単に確認してみました。
事前準備
以下の書籍の3章を参考に、データを用意してみました。
[ビッグデータ分析・活用のためのSQLレシピ]
https://book.mynavi.jp/ec/products/detail/id=65863
全部読めてないけど、とってもいい本です。
PostgreSQLで使うテーブル
-- 商品レビューテーブル
create table if not exists item_review
(
user_id varchar(255)
,product_id varchar(255)
,score numeric
,primary key (user_id,product_id)
);
insert into item_review (user_id, product_id, score)
values
('U001','A001',4.0)
,('U001','A002',5.0)
,('U001','A003',5.0)
,('U002','A001',2.0)
,('U002','A002',3.0)
,('U002','A003',4.0)
,('U003','A001',5.0)
,('U003','A002',4.0)
,('U003','A003',4.0)
;
-- 製品テーブル
create table if not exists products (
product_id varchar(255)
,category varchar(255)
,score numeric
,primary key (product_id)
);
insert into products
values
('A001', 'action', 94)
,('A002', 'action', 81)
,('A003', 'action', 78)
,('A004', 'action', 64)
,('D001', 'drama' , 90)
,('D002', 'drama' , 82)
,('D003', 'drama' , 78)
,('D004', 'drama' , 58)
;
Sparkで使うDataFrameの生成元csvファイル
/tmp/item_review.csv
user_id,product_id,score
U001,A001,4.0
U001,A002,5.0
U001,A003,5.0
U002,A001,2.0
U002,A002,3.0
U002,A003,4.0
U003,A001,5.0
U003,A002,4.0
U003,A003,4.0
/tmp/products.csv
product_id,category,score
A001,action,94
A002,action,81
A003,action,78
A004,action,64
D001,drama,90
D002,drama,82
D003,drama,78
D004,drama,58
spark-shell
から以下のように読み込んで使うようにします。
$ spark-shell --master local
scala> val df = spark.read.option("header","true").csv("/tmp/item_review.csv")
df: org.apache.spark.sql.DataFrame = [user_id: string, product_id: string ... 1 more field]
scala> df.show()
+-------+----------+-----+
|user_id|product_id|score|
+-------+----------+-----+
| U001| A001| 4.0|
| U001| A002| 5.0|
| U001| A003| 5.0|
| U002| A001| 2.0|
| U002| A002| 3.0|
| U002| A003| 4.0|
| U003| A001| 5.0|
| U003| A002| 4.0|
| U003| A003| 4.0|
+-------+----------+-----+
scala> val df2 = spark.read.option("header","true").csv("/tmp/products.csv")
df2: org.apache.spark.sql.DataFrame = [product_id: string, category: string ... 1 more field]
scala> df2.show()
+----------+--------+-----+
|product_id|category|score|
+----------+--------+-----+
| A001| action| 94|
| A002| action| 81|
| A003| action| 78|
| A004| action| 64|
| D001| drama| 90|
| D002| drama| 82|
| D003| drama| 78|
| D004| drama| 58|
+----------+--------+-----+
df
とdf2
を以降のインプットデータとします。
1. データの集約
1.1. PostgreSQL
select
user_id
,count(*) as total_count
,sum(score) as sum
,avg(score) as avg
,min(score) as min
,max(score) as max
from
item_review
group by
user_id
;
user_id | total_count | sum | avg | min | max
---------+-------------+------+--------------------+-----+-----
U001 | 3 | 14.0 | 4.6666666666666667 | 4.0 | 5.0
U002 | 3 | 9.0 | 3.0000000000000000 | 2.0 | 4.0
U003 | 3 | 13.0 | 4.3333333333333333 | 4.0 | 5.0
(3 rows)
1.2. Spark DataFrame
scala> :paste
// Entering paste mode (ctrl-D to finish)
df.groupBy("user_id")
.agg(
count("user_id") as "total_count",
sum("score") as "sum",
avg("score") as "avg",
min("score") as "min",
max("score") as "max")
.show()
// Exiting paste mode, now interpreting.
+-------+-----------+----+-----------------+---+---+
|user_id|total_count| sum| avg|min|max|
+-------+-----------+----+-----------------+---+---+
| U002| 3| 9.0| 3.0|2.0|4.0|
| U003| 3|13.0|4.333333333333333|4.0|5.0|
| U001| 3|14.0|4.666666666666667|4.0|5.0|
+-------+-----------+----+-----------------+---+---+
2. ウィンドウ関数による集計
2.1. PostgreSQL
select
user_id
,product_id
,score
-- 全体の平均スコア
,avg(score) over() as avg_score
-- ユーザの平均スコア
,avg(score) over(partition by user_id) as user_avg_score
-- 個別のスコアとユーザの平均スコアとの差
,score - avg(score) over(partition by user_id) as user_avg_score_diff
from
item_review
;
user_id | product_id | score | avg_score | user_avg_score | user_avg_score_diff
---------+------------+-------+--------------------+--------------------+---------------------
U001 | A001 | 4.0 | 4.0000000000000000 | 4.6666666666666667 | -0.6666666666666667
U001 | A002 | 5.0 | 4.0000000000000000 | 4.6666666666666667 | 0.3333333333333333
U001 | A003 | 5.0 | 4.0000000000000000 | 4.6666666666666667 | 0.3333333333333333
U002 | A001 | 2.0 | 4.0000000000000000 | 3.0000000000000000 | -1.0000000000000000
U002 | A002 | 3.0 | 4.0000000000000000 | 3.0000000000000000 | 0.0000000000000000
U002 | A003 | 4.0 | 4.0000000000000000 | 3.0000000000000000 | 1.0000000000000000
U003 | A001 | 5.0 | 4.0000000000000000 | 4.3333333333333333 | 0.6666666666666667
U003 | A002 | 4.0 | 4.0000000000000000 | 4.3333333333333333 | -0.3333333333333333
U003 | A003 | 4.0 | 4.0000000000000000 | 4.3333333333333333 | -0.3333333333333333
(9 rows)
2.2. Spark DataFrame
// Window関数を扱うにはインポートが必要。
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> :paste
// Entering paste mode (ctrl-D to finish)
df.select("user_id","product_id","score")
.withColumn("avg_score", avg("score").over()) // partitionのキーに指定されていないため警告が出るが結果としては問題ありません。
.withColumn("user_avg_score", avg("score").over(Window.partitionBy("user_id")))
.withColumn("user_avg_score_diff", col("score") - col("user_avg_score"))
.show()
// Exiting paste mode, now interpreting.
2018-05-16 00:26:15 WARN WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------+----------+-----+---------+-----------------+--------------------+
|user_id|product_id|score|avg_score| user_avg_score| user_avg_score_diff|
+-------+----------+-----+---------+-----------------+--------------------+
| U001| A001| 4.0| 4.0|4.666666666666667| -0.666666666666667|
| U001| A002| 5.0| 4.0|4.666666666666667| 0.33333333333333304|
| U001| A003| 5.0| 4.0|4.666666666666667| 0.33333333333333304|
| U002| A001| 2.0| 4.0| 3.0| -1.0|
| U002| A002| 3.0| 4.0| 3.0| 0.0|
| U002| A003| 4.0| 4.0| 3.0| 1.0|
| U003| A001| 5.0| 4.0|4.333333333333333| 0.666666666666667|
| U003| A002| 4.0| 4.0|4.333333333333333|-0.33333333333333304|
| U003| A003| 4.0| 4.0|4.333333333333333|-0.33333333333333304|
+-------+----------+-----+---------+-----------------+--------------------+
3. ウィンドウ関数で順序づけ
3.1. PostgreSQL
SELECT
product_id
,score
-- スコアに一意なランキング付与
,row_number() over(ORDER BY score desc) AS ROW
-- 同順位を許容するランキングを付与
,rank() over(ORDER BY score desc) AS RANK
-- 同順位を許容し、同順位の次の順位を飛ばさないランキングを付与
,dense_rank() over(ORDER BY score desc) AS DENSE_RANK
-- 現在の行より前の行の値を取得する
,lag(product_id) over(ORDER BY score desc) AS lag1
,lag(product_id,2) over(ORDER BY score desc) AS lag2
-- 現在より後の行を取得する
,lead(product_id) over(ORDER BY score desc) AS lead1
,lead(product_id,2) over(ORDER BY score desc) AS lead2
FROM
products
ORDER BY
ROW
;
product_id | score | row | rank | dense_rank | lag1 | lag2 | lead1 | lead2
------------+-------+-----+------+------------+------+------+-------+-------
A001 | 94 | 1 | 1 | 1 | | | D001 | D002
D001 | 90 | 2 | 2 | 2 | A001 | | D002 | A002
D002 | 82 | 3 | 3 | 3 | D001 | A001 | A002 | A003
A002 | 81 | 4 | 4 | 4 | D002 | D001 | A003 | D003
A003 | 78 | 5 | 5 | 5 | A002 | D002 | D003 | A004
D003 | 78 | 6 | 5 | 5 | A003 | A002 | A004 | D004
A004 | 64 | 7 | 7 | 6 | D003 | A003 | D004 |
D004 | 58 | 8 | 8 | 7 | A004 | D003 | |
(8 rows)
3.2. Spark DataFrame
// Window関数を扱うにはインポートが必要。
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> :paste
// Entering paste mode (ctrl-D to finish)
df2.select("product_id","score")
.withColumn("row",row_number().over(Window.orderBy(desc("score"))))
.withColumn("rank",rank().over(Window.orderBy(desc("score"))))
.withColumn("dense_rank",dense_rank().over(Window.orderBy(desc("score"))))
.withColumn("lag1",lag("product_id",1).over(Window.orderBy(desc("score"))))
.withColumn("lag2",lag("product_id",2).over(Window.orderBy(desc("score"))))
.withColumn("lead1",lead("product_id",1).over(Window.orderBy(desc("score"))))
.withColumn("lead2",lead("product_id",2).over(Window.orderBy(desc("score"))))
.orderBy("row")
.show()
// Exiting paste mode, now interpreting.
2018-05-16 01:40:29 WARN WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------+-----+---+----+----------+----+----+-----+-----+
|product_id|score|row|rank|dense_rank|lag1|lag2|lead1|lead2|
+----------+-----+---+----+----------+----+----+-----+-----+
| A001| 94| 1| 1| 1|null|null| D001| D002|
| D001| 90| 2| 2| 2|A001|null| D002| A002|
| D002| 82| 3| 3| 3|D001|A001| A002| A003|
| A002| 81| 4| 4| 4|D002|D001| A003| D003|
| A003| 78| 5| 5| 5|A002|D002| D003| A004|
| D003| 78| 6| 5| 5|A003|A002| A004| D004|
| A004| 64| 7| 7| 6|D003|A003| D004| null|
| D004| 58| 8| 8| 7|A004|D003| null| null|
+----------+-----+---+----+----------+----+----+-----+-----+
4. ウィンドウ関数でORDER BY句と集約関数を組み合わせる
4.1. PostgreSQL
select
product_id
,score
-- スコア順に一意なランキングを付与する
,row_number() over(order by score desc) as row
-- ランキング上位からの累計スコアを求める
,sum(score) over(
order by score desc rows between unbounded preceding and current row)
as cum_score
-- いまの行と前後一行ずつの合計3行の平均スコアを計算する
,avg(score) over(
order by score desc rows between 1 preceding and 1 following)
as local_avg
-- ランキング最上位の商品IDを取得する
,first_value(product_id) over(
order by score desc rows between unbounded preceding and unbounded following)
as first_value
-- ランキング最下位の商品IDを取得する
,last_value(product_id) over(
order by score desc rows between unbounded preceding and unbounded following)
as last_value
from products
order by row
;
product_id | score | row | cum_score | local_avg | first_value | last_value
------------+-------+-----+-----------+---------------------+-------------+------------
A001 | 94 | 1 | 94 | 92.0000000000000000 | A001 | D004
D001 | 90 | 2 | 184 | 88.6666666666666667 | A001 | D004
D002 | 82 | 3 | 266 | 84.3333333333333333 | A001 | D004
A002 | 81 | 4 | 347 | 80.3333333333333333 | A001 | D004
A003 | 78 | 5 | 425 | 79.0000000000000000 | A001 | D004
D003 | 78 | 6 | 503 | 73.3333333333333333 | A001 | D004
A004 | 64 | 7 | 567 | 66.6666666666666667 | A001 | D004
D004 | 58 | 8 | 625 | 61.0000000000000000 | A001 | D004
(8 rows)
4.2. Spark DataFrame
scala> import org.apache.spark.sql.expressions.Window
df2.select("product_id","score")
.withColumn("row",row_number().over(Window.orderBy(desc("score"))))
.withColumn("cum_score", sum("score").over(Window.orderBy(desc("score")).rowsBetween(Long.MinValue,0)))
.withColumn("local_avg", avg("score").over(Window.orderBy(desc("score")).rowsBetween(-1,1)))
.withColumn("first_value", first("product_id").over(Window.orderBy(desc("score")).rowsBetween(Long.MinValue, Long.MaxValue)))
.withColumn("last_value", last("product_id").over(Window.orderBy(desc("score")).rowsBetween(Long.MinValue, Long.MaxValue)))
.orderBy("row")
.show()
// Exiting paste mode, now interpreting.
2018-05-17 02:28:32 WARN WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2018-05-17 02:28:32 WARN WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2018-05-17 02:28:32 WARN WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2018-05-17 02:28:32 WARN WindowExec:66 - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------+-----+---+---------+-----------------+-----------+----------+
|product_id|score|row|cum_score| local_avg|first_value|last_value|
+----------+-----+---+---------+-----------------+-----------+----------+
| A001| 94| 1| 94.0| 92.0| A001| D004|
| D001| 90| 2| 184.0|88.66666666666667| A001| D004|
| D002| 82| 3| 266.0|84.33333333333333| A001| D004|
| A002| 81| 4| 347.0|80.33333333333333| A001| D004|
| A003| 78| 5| 425.0| 79.0| A001| D004|
| D003| 78| 6| 503.0|73.33333333333333| A001| D004|
| A004| 64| 7| 567.0|66.66666666666667| A001| D004|
| D004| 58| 8| 625.0| 61.0| A001| D004|
+----------+-----+---+---------+-----------------+-----------+----------+
調査が不足していることとして、以下が残っているのであとで追記する。
- SoarkのSQL実行計画?を確認してみること
- UNIONALLのSpark実装、実行計画