More than 1 year has passed since last update.


環境


  • vagrant CentOS 7.3

  • python 2.7

  • spark 2.2.0


サンプルデータ

こちらのサンプルデータから受注履歴と顧客情報を使わせていただきました。

https://community.tableau.com/docs/DOC-5552

使いやすいようにexcel -> csv 変換と同姓同名を除外してます。


実装

お題:2010年カテゴリ別世代別売上額ランキング

テーブル結合、グループ関数、ウインドウ関数を使ってみました。


sample_spark.py

# encoding: utf-8


from __future__ import print_function
from pyspark.sql import SparkSession, SQLContext
import sys

# SparkContext設定.
sc = SparkSession.builder.appName(name="PySparkShell") \
.master(master="local[*]") \
.getOrCreate().sparkContext
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

# SQLContext設定.
sql_ctx = SQLContext(sc)

# ロガー設定.
log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger("sample_spark")
logger.info("開始")

# CSV読み込み & 列名変更.
orders = sql_ctx.read.format("com.databricks.spark.csv").options(
delimiter=",", charset="UTF-8", header=True
).load(sys.argv[1]).toDF(
"order_id", "order_date", "priority", "amount", "sale", "discount_rate",
"shipping_mode", "profits", "unit_price", "advertising_expense",
"shipping_expense", "customer_name", "prefecture", "municipal", "region",
"store_name", "customer_classification", "product_category", "product_sub_category",
"product_id", "product_name", "product_description", "product_container",
"product_base_margin", "supplier", "delivery_date", "shipping_date"
)

customer = sql_ctx.read.format("com.databricks.spark.csv").options(
delimiter=",", charset="UTF-8", header=True
).load(sys.argv[2]).toDF(
"customer_name", "life_time_revenue", "age_group", "tenure"
)

# ビュー登録.
orders.createOrReplaceTempView("orders")
customer.createOrReplaceTempView("customer")

# SQL実行.
result = sql_ctx.sql(
"SELECT"
+ " o.product_category"
+ " ,c.age_group"
+ " ,SUM(cast("
+ " sale AS INT"
+ " ))"
+ " ,rank() OVER ("
+ " PARTITION BY"
+ " o.product_category"
+ " ORDER BY"
+ " SUM(cast("
+ " sale AS INT"
+ " )) DESC"
+ " )"
+ " FROM"
+ " orders AS o"
+ " JOIN customer AS c"
+ " ON o.customer_name = c.customer_name"
+ " WHERE"
+ " year("
+ " cast("
+ " o.order_date AS DATE"
+ " )"
+ " ) = 2010"
+ " GROUP BY"
+ " o.product_category"
+ " ,c.age_group"
+ " ORDER BY"
+ " o.product_category"
+ " ,c.age_group"
).toDF("カテゴリ", "世代", "売上合計", "順位")

# ファイル出力.
result.write.mode("overwrite").csv(sys.argv[3])

# 標準出力.
result.show()

logger.info("終了")



実行

実行コマンド

$ export PYTHONIOENCODING=utf8

$ spark-submit --driver-java-options=-Dlog4j.configuration=file:///home/vagrant/spark-log4.properties sample_spark.py orders.csv customer.csv output/

実行結果

+------+---+--------+---+

| カテゴリ| 世代| 売上合計| 順位|
+------+---+--------+---+
|テクノロジー|20s|39135615| 3|
|テクノロジー|30s|26420070| 5|
|テクノロジー|40s|46309336| 2|
|テクノロジー|50s|37829027| 4|
|テクノロジー|60s|47053734| 1|
| 事務用品|20s|29073938| 2|
| 事務用品|30s|21144412| 4|
| 事務用品|40s|27352827| 3|
| 事務用品|50s|20414653| 5|
| 事務用品|60s|32666069| 1|
| 家具|20s|28766543| 4|
| 家具|30s|27859004| 5|
| 家具|40s|39000277| 1|
| 家具|50s|31579662| 3|
| 家具|60s|33126584| 2|
+------+---+--------+---+

入出力はともかく分析本体部分がSQL文だけで実行出来ました。RDDやDataFrameをあれこれするよりだいぶ楽かと思います。