LoginSignup
14
10

More than 3 years have passed since last update.

サンプルデータセット

今回はkaggleのデータセット「Brazilian E-Commerce Public Dataset by Olist」をサンプルとして、Azure Databricksを使ったSparkの操作を行っていきます。
このデータはOlist StoreというブラジルのECサイトで行われた2016年から2018年までの約10万件の注文に関するデータが含まれています。
データ量としてはビッグデータというほどに多くありませんが、注文の商品明細やレビューなどが複数のCSVに分かれて保存され、それぞれがIDで紐づけられているため、PySparkやSpark SQLの練習に適しています。

CSVの読み込み

注文ごとの商品の明細情報「olist_order_items_dataset.csv」を使ってデータの読み込みとPySparkの操作を行っていきます。

DataFrameに読み込み

下記スクリプトでCSVをSpark DataFrameとして読み込みます。
読み込むCSVはカラム名を示す行が先頭にあるため、読み込みオプションとして「header="true"」、またカラムのデータ型を自動推定するため「inferSchema="true"」として読み込んでいます。
(※CSV読み込みオプションの詳細はDatabricksドキュメントも参照してください)

order_items_csv = "dbfs:/mnt/my_blob_container/brazilian-ecommerce/olist_order_items_dataset.csv"

df = spark.read\
  .format("csv")\
  .options(header="true", inferSchema="true")\
  .load(order_items_csv)

display(df)

img04-0001.png

スキーマを指定して読み込み

スキーマを指定して読み込みを行う場合は下記のようにします。

※指定できる型はSparkドキュメントも参照

カラムのデータ型指定にinferSchemaを使用した場合、型推定のため1回余計に読み込むことになり、読み込みのパフォーマンスが低下します。
データのスキーマがわかっている場合は、スキーマを指定して読み込むことを推奨します。

from pyspark.sql.types import *

# order_items_csv = "dbfs:/mnt/my_blob_container/brazilian-ecommerce/olist_order_items_dataset.csv"

# スキーマ指定
schema = StructType([
  StructField("order_id", StringType(), False),
  StructField("order_item_id", StringType(), False),
  StructField("product_id", StringType(), False),
  StructField("seller_id", StringType(), False),
  StructField("shipping_limit_date", StringType(), False),
  StructField("price", DoubleType(), False),
  StructField("freight_value", DoubleType(), False),
])

df_spec = spark.read\
  .format("csv")\
  .options(header="true")\
  .load(order_items_csv, schema=schema)

display(df_spec)

img04-0002.png

データ型の確認

display(df.dtypes)

img04-0003.png

PySparkでのDataFrameの基本操作

読み込んだCSVでPySparkの基本操作を実行します。

  • 指定行数抽出して表示
display(df.head(5))

img04-0004.png

  • 全レコード数のカウント
df.count()

img04-0005.png

  • 計算列の追加
# 送料込合計列追加
df = df.withColumn("total_price", df["price"] + df["freight_value"])
display(df)

img04-0006.png

  • カラムを指定して抽出
display(df.select(
  df["product_id"],
  df["shipping_limit_date"].alias("limit_date"), # aliasでカラム名の変更が可能
  df["price"]
))

img04-0007.png

  • 条件でレコード抽出
# 2018/01/01 ~ 2018/01/31のデータを抽出
from datetime import datetime
df_jan = df.filter((df["shipping_limit_date"] >= datetime(2018, 1, 1)) & (df["shipping_limit_date"] < datetime(2018, 2, 1)))
display(df_jan)

img04-0008.png

  • レコードのカウント
# product_idごとの売り上げの個数
df_count = df_jan.groupBy("product_id").count()
display(df_count)

img04-0009.png

  • レコードの集計
# product_idごとの売り上げの合計
df_sum = df_jan.groupBy("product_id").agg({"price": "sum"})
display(df_sum)

img04-0010.png

Spark SQLを使用した操作

DataFrameをTemp Tableに登録することでSpark SQLを使用した集計が可能になります。

  • Temp Tableの登録
df.createOrReplaceTempView("order_items")

img04-0011.png

  • Spark SQLによるカラム抽出
query = """
SELECT
  product_id,
  shipping_limit_date,
  price
FROM order_items
"""

display(spark.sql(query))

img04-0012.png

  • %sqlマジックコマンドを使用する場合
%sql
SELECT
  product_id,
  shipping_limit_date,
  price
FROM order_items

img04-0013.png

DataFrameの結合

データセットに含まれる他のCSVファイルと組み合わせて商品カテゴリごとの売り上げを集計してみます。

各商品の詳細情報「olist_products_dataset.csv」をDataFrameに読み込みます。

products_csv = "dbfs:/mnt/my_blob_container/brazilian-ecommerce/olist_products_dataset.csv"

df_prod = spark.read\
  .format("csv")\
  .options(header="true", inferSchema="true")\
  .load(products_csv)

display(df_prod)

img04-0014.png

# カラム名の変更
df_prod = df_prod.withColumnRenamed("product_id", "product_id_2")

# DataFrame結合
df_join = df.join(df_prod, df["product_id"] == df_prod["product_id_2"])
display(df_join)

img04-0015.png

2018年1月の商品カテゴリごとの売り上げ金額を集計します。
「sum(price)」をソートすると「relogios_presentes」の売り上げが最も高いことがわかりますが、ポルトガル語なので何の商品カテゴリかわかりません。

df_category = df_join.filter((df["shipping_limit_date"] >= datetime(2018, 1, 1)) & (df["shipping_limit_date"] < datetime(2018, 2, 1)))\
  .groupBy("product_category_name")\
  .agg({"price": "sum"})

display(df_category)

img04-0016.png

データセットにカテゴリを英語翻訳したCSVファイルがあるため、これも読み込みます。

translation_csv = "dbfs:/mnt/my_blob_container/brazilian-ecommerce/product_category_name_translation.csv"

df_translate = spark.read\
  .format("csv")\
  .options(header="true", inferSchema="true")\
  .load(translation_csv)

display(df_translate)

img04-0017.png

結合して翻訳します。

df_category = df_category.join(df_translate, df_category["product_category_name"] == df_translate["product_category_name"])\
  .select(df_translate["product_category_name_english"], df_category["sum(price)"])

display(df_category)

img04-0018.png

Databricksではデータの可視化も簡単にできます。
売り上げが最も高いのは「watches_gifts」であることがわかりました。
img04-0019.png

CSVの書き出し

DataFrameを書き出す場合は下記コマンドを使用します。

# DataFrameCSV書き出し
output_path = "/mnt/my_blob_container/brazilian-ecommerce/order_items_with_detail.csv"
df.join.write\
  .format("csv")\
  .options(header="true")\
  .save(output_path)

img04-0020.png

CSVは指定したパスに直接書き出されるのではなく、指定パスのディレクトリが作成され、直下に分割されたCSVファイルとして出力されます。

display(dbutils.fs.ls(output_path))

img04-0021.png

ファイルを1つのCSVとして出力する場合は、HadoopのFileUtil.copyMergeを使用し、上記で出力したファイルをマージして1ファイルにまとめます。

%scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

var sourceDir = "/mnt/blob_container/output.csv"
var mergedFileName = "/mnt/blob_container/output_merge.csv"

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(sourceDir), hdfs, new Path(mergedFileName), true, hadoopConfig, null) 

img04-0022.png

import os

display(dbutils.fs.ls(os.path.dirname(output_path)))

img04-0023.png

Clusterのメモリ量に余裕がある場合は、下記スクリプトで1ファイルにデータを書き出すことができます。
この場合でも「output.csv/part-00000-xxxxxxxxx.csv」のような名称でファイルが出力されるため、出力後必要に応じてファイルの移動を行います。

output_path = "/mnt/my_blob_container/brazilian-ecommerce/order_items_with_detail_rep1.csv"

# DataFrameを1つのCSVに書き出し()
df.repartition(1).write\
  .format("csv")\
  .options(header="true")\
  .save(output_path)

img04-0024.png

display(dbutils.fs.ls(output_path))

img04-0025.png

14
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
10