LoginSignup
5
4

More than 5 years have passed since last update.

環境

  • vagrant CentOS 7.3
  • python 2.7
  • spark 2.2.0

サンプルデータ

こちらのサンプルデータから受注履歴と顧客情報を使わせていただきました。
https://community.tableau.com/docs/DOC-5552
使いやすいようにexcel -> csv 変換と同姓同名を除外してます。

実装

お題:2010年カテゴリ別世代別売上額ランキング
テーブル結合、グループ関数、ウインドウ関数を使ってみました。

sample_spark.py
# encoding: utf-8

from __future__ import print_function
from pyspark.sql import SparkSession, SQLContext
import sys

# SparkContext設定.
sc = SparkSession.builder.appName(name="PySparkShell") \
    .master(master="local[*]") \
    .getOrCreate().sparkContext
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

# SQLContext設定.
sql_ctx = SQLContext(sc)

# ロガー設定.
log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger("sample_spark")
logger.info("開始")


# CSV読み込み & 列名変更.
orders = sql_ctx.read.format("com.databricks.spark.csv").options(
      delimiter=",", charset="UTF-8", header=True
    ).load(sys.argv[1]).toDF(
      "order_id", "order_date", "priority", "amount", "sale", "discount_rate",
      "shipping_mode", "profits", "unit_price", "advertising_expense",
      "shipping_expense", "customer_name", "prefecture", "municipal", "region",
      "store_name", "customer_classification", "product_category", "product_sub_category",
      "product_id", "product_name", "product_description", "product_container",
      "product_base_margin", "supplier", "delivery_date", "shipping_date"
    )

customer = sql_ctx.read.format("com.databricks.spark.csv").options(
      delimiter=",", charset="UTF-8", header=True
    ).load(sys.argv[2]).toDF(
      "customer_name", "life_time_revenue", "age_group", "tenure"
    )

# ビュー登録.
orders.createOrReplaceTempView("orders")
customer.createOrReplaceTempView("customer")

# SQL実行.
result = sql_ctx.sql(
  "SELECT"
+ "        o.product_category"
+ "        ,c.age_group"
+ "        ,SUM(cast("
+ "            sale AS INT"
+ "        ))"
+ "        ,rank() OVER ("
+ "            PARTITION BY"
+ "                o.product_category"
+ "            ORDER BY"
+ "                SUM(cast("
+ "                    sale AS INT"
+ "                )) DESC"
+ "        )"
+ "    FROM"
+ "        orders AS o"
+ "            JOIN customer AS c"
+ "                ON o.customer_name = c.customer_name"
+ "    WHERE"
+ "        year("
+ "            cast("
+ "                o.order_date AS DATE"
+ "            )"
+ "        ) = 2010"
+ "    GROUP BY"
+ "        o.product_category"
+ "        ,c.age_group"
+ "    ORDER BY"
+ "        o.product_category"
+ "        ,c.age_group"
).toDF("カテゴリ", "世代", "売上合計", "順位")

# ファイル出力.
result.write.mode("overwrite").csv(sys.argv[3])

# 標準出力.
result.show()

logger.info("終了")

実行

実行コマンド

$ export PYTHONIOENCODING=utf8
$ spark-submit --driver-java-options=-Dlog4j.configuration=file:///home/vagrant/spark-log4.properties sample_spark.py orders.csv customer.csv output/

実行結果

+------+---+--------+---+
|  カテゴリ| 世代|    売上合計| 順位|
+------+---+--------+---+
|テクノロジー|20s|39135615|  3|
|テクノロジー|30s|26420070|  5|
|テクノロジー|40s|46309336|  2|
|テクノロジー|50s|37829027|  4|
|テクノロジー|60s|47053734|  1|
|  事務用品|20s|29073938|  2|
|  事務用品|30s|21144412|  4|
|  事務用品|40s|27352827|  3|
|  事務用品|50s|20414653|  5|
|  事務用品|60s|32666069|  1|
|    家具|20s|28766543|  4|
|    家具|30s|27859004|  5|
|    家具|40s|39000277|  1|
|    家具|50s|31579662|  3|
|    家具|60s|33126584|  2|
+------+---+--------+---+

入出力はともかく分析本体部分がSQL文だけで実行出来ました。RDDやDataFrameをあれこれするよりだいぶ楽かと思います。

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4