Python
Spark
sparksql

環境

  • vagrant CentOS 7.3
  • python 2.7
  • spark 2.2.0

サンプルデータ

こちらのサンプルデータから受注履歴と顧客情報を使わせていただきました。
https://community.tableau.com/docs/DOC-5552
使いやすいようにexcel -> csv 変換と同姓同名を除外してます。

実装

お題:2010年カテゴリ別世代別売上額ランキング
テーブル結合、グループ関数、ウインドウ関数を使ってみました。

sample_spark.py
# encoding: utf-8

from __future__ import print_function
from pyspark.sql import SparkSession, SQLContext
import sys

# SparkContext設定.
sc = SparkSession.builder.appName(name="PySparkShell") \
    .master(master="local[*]") \
    .getOrCreate().sparkContext
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

# SQLContext設定.
sql_ctx = SQLContext(sc)

# ロガー設定.
log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger("sample_spark")
logger.info("開始")


# CSV読み込み & 列名変更.
orders = sql_ctx.read.format("com.databricks.spark.csv").options(
      delimiter=",", charset="UTF-8", header=True
    ).load(sys.argv[1]).toDF(
      "order_id", "order_date", "priority", "amount", "sale", "discount_rate",
      "shipping_mode", "profits", "unit_price", "advertising_expense",
      "shipping_expense", "customer_name", "prefecture", "municipal", "region",
      "store_name", "customer_classification", "product_category", "product_sub_category",
      "product_id", "product_name", "product_description", "product_container",
      "product_base_margin", "supplier", "delivery_date", "shipping_date"
    )

customer = sql_ctx.read.format("com.databricks.spark.csv").options(
      delimiter=",", charset="UTF-8", header=True
    ).load(sys.argv[2]).toDF(
      "customer_name", "life_time_revenue", "age_group", "tenure"
    )

# ビュー登録.
orders.createOrReplaceTempView("orders")
customer.createOrReplaceTempView("customer")

# SQL実行.
result = sql_ctx.sql(
  "SELECT"
+ "        o.product_category"
+ "        ,c.age_group"
+ "        ,SUM(cast("
+ "            sale AS INT"
+ "        ))"
+ "        ,rank() OVER ("
+ "            PARTITION BY"
+ "                o.product_category"
+ "            ORDER BY"
+ "                SUM(cast("
+ "                    sale AS INT"
+ "                )) DESC"
+ "        )"
+ "    FROM"
+ "        orders AS o"
+ "            JOIN customer AS c"
+ "                ON o.customer_name = c.customer_name"
+ "    WHERE"
+ "        year("
+ "            cast("
+ "                o.order_date AS DATE"
+ "            )"
+ "        ) = 2010"
+ "    GROUP BY"
+ "        o.product_category"
+ "        ,c.age_group"
+ "    ORDER BY"
+ "        o.product_category"
+ "        ,c.age_group"
).toDF("カテゴリ", "世代", "売上合計", "順位")

# ファイル出力.
result.write.mode("overwrite").csv(sys.argv[3])

# 標準出力.
result.show()

logger.info("終了")

実行

実行コマンド

$ export PYTHONIOENCODING=utf8
$ spark-submit --driver-java-options=-Dlog4j.configuration=file:///home/vagrant/spark-log4.properties sample_spark.py orders.csv customer.csv output/

実行結果

+------+---+--------+---+
|  カテゴリ| 世代|    売上合計| 順位|
+------+---+--------+---+
|テクノロジー|20s|39135615|  3|
|テクノロジー|30s|26420070|  5|
|テクノロジー|40s|46309336|  2|
|テクノロジー|50s|37829027|  4|
|テクノロジー|60s|47053734|  1|
|  事務用品|20s|29073938|  2|
|  事務用品|30s|21144412|  4|
|  事務用品|40s|27352827|  3|
|  事務用品|50s|20414653|  5|
|  事務用品|60s|32666069|  1|
|    家具|20s|28766543|  4|
|    家具|30s|27859004|  5|
|    家具|40s|39000277|  1|
|    家具|50s|31579662|  3|
|    家具|60s|33126584|  2|
+------+---+--------+---+

入出力はともかく分析本体部分がSQL文だけで実行出来ました。RDDやDataFrameをあれこれするよりだいぶ楽かと思います。