LoginSignup
3
2

データサイエンス100本ノックをPySparkでやってみた

Posted at

はじめに

本記事では、「データサイエンス100本」をPySparkを使用して実施する方法について解説します。データサイエンス100本は、実践的なデータ分析のスキルを身に付けるための有名な課題集です。PySparkは、Apache SparkのPython向けAPIであり、大規模なデータセットを分散処理するための強力なツールです。

本記事の目的は、PySparkを使用したデータサイエンス100本ノックの実施方法を詳しく解説し、読者がPySparkを活用してデータ分析のスキルを向上させる手助けをすることです。具体的には、PySparkの基礎知識から始め、データセットの読み込み、クリーニング、変換、集計のタスクをPySparkを使って実際に解決していきます。

PySparkの特徴

PySparkの特徴は以下の通りです。

  • 分散処理: PySparkは、Apache Sparkの分散処理エンジンを活用しており、大規模なデータセットの処理に優れています。複数のマシンやクラスタ上で処理を分散させることで、高速なデータ処理を実現します。
  • データ操作の柔軟性: PySparkは、高レベルのデータ操作インターフェースであるDataFrameを提供しています。DataFrameは、異なるデータ形式やソースからのデータの読み込みや変換を容易に行うことができます。
  • 拡張性: PySparkはPythonをベースとしており、Pythonの豊富なライブラリやエコシステムとの連携が可能です。また、ユーザー定義関数(UDF)の作成やカスタム処理の追加などもサポートしています。
  • パフォーマンスの最適化: PySparkは、データのパーティショニングやクエリの最適化など、パフォーマンスの最適化に関する豊富な機能を提供しています。これにより、大規模データセットでも効率的に処理を行うことができます。

本記事では、PySparkの基礎知識から始めながら、データサイエンティスト100本ノックの各課題をPySparkを使って解決していきます。PySparkの特徴を活かしながら、データ分析の手法を紹介します。パフォーマンスの最適化については本記事では対象外としています。
次章では、データサイエンティスト100本ノックについて詳しく説明します。

データサイエンス100本ノックについて

データサイエンティスト協会が提供している構造化データ(テーブルデータ)についての読み込みやクリーニング、変換、集計の問題が100問収録されている課題集です。

PySparkは、データサイエンス100本ノックにおいて有用なツールの1つです。以下の理由から、PySparkを使用することで効率的なデータ処理と分析が可能となります。

  • スケーラビリティ: PySparkは、分散処理エンジンであるApache Sparkを基にしており、複数のマシンやクラスタで処理を分散することができます。これにより、大容量のデータを高速に処理することが可能です。

  • 柔軟なデータ操作: PySparkは、高レベルのデータ操作インターフェースであるDataFrameを提供しています。DataFrameを使用することで、異なるデータ形式やソースからのデータの読み込みや変換を容易に行うことができます。また、SQLライクなクエリ言語もサポートしており、データの操作や集計を簡潔に記述することができます。

  • 拡張性と豊富な機能: PySparkはPythonをベースとしており、Pythonの豊富なライブラリやエコシステムとの連携が可能です。また、ユーザー定義関数(UDF)の作成やカスタム処理の追加などもサポートされています。これにより、データの前処理や特徴量エンジニアリングにおいて、柔軟かつ高度な処理を行うことができます。

データサイエンス100本ノックで扱うデータは小規模ですが、解説記事でよく扱われているPandasと同様の処理をPySparkで実現することができることを目標としています。

次章では、PySparkの基本的な使い方とデータの読み込みについて詳しく説明します。

PySparkのセットアップと基礎知識

PySparkのセットアップと基礎知識について説明します。
なお、本記事では環境設定を省略するため、Google Colabを利用しています。

まずは、PySparkをインストールします。

!pip install pyspark

その後、Sparkのセッションを立ち上げます。appNameは適当で大丈夫です。

from pyspark.sql import SparkSession 
spark = SparkSession.builder \
                    .master("local[1]") \
                    .appName("IAB") \
                    .getOrCreate()

以下のようなSession情報が出れば、成功です。

SparkSession - in-memory
SparkContext

Version   v3.3.0
Master    local[1]
AppName   IAB

PySparkの基本的なデータ操作と変換については以下のような内容です。

  • データの読み込みと表示: PySparkでは、様々なデータソースからデータを読み込むことができます。例えば、CSVファイル、JSONファイル、データベース、Parquetファイルなどです。データの読み込みには、spark.readメソッドを使用します。読み込んだデータはDataFrameとして表現され、.show()メソッドを使用して表示することができます。

  • データのフィルタリングと選択: DataFrameでは、SQLライクなクエリを使用してデータのフィルタリングや選択を行うことができます。例えば、filter()メソッドやwhere()メソッドを使用して条件に一致する行を抽出することができます。また、select()メソッドを使用して特定の列を選択することも可能です。

  • データの集計とグループ化: DataFrameでは、データの集計やグループ化を行うためのメソッドが提供されています。例えば、groupBy()メソッドを使用して特定の列でグループ化し、agg()メソッドを使用して集計関数(例: sum、avg、maxなど)を適用することができます。

  • データの変換と追加: DataFrameでは、既存の列に対して変換を行ったり、新たな列を追加することができます。例えば、withColumn()メソッドを使用して既存の列に対して演算や関数を適用し、新たな列を追加することができます。

  • DataFrameとRDDの違い: PySparkでは、DataFrameとRDD(Resilient Distributed Dataset)という2つのデータ構造が使用されます。DataFrameは、データを構造化して表現するための高レベルのAPIであり、スキーマ情報を持った行と列のテーブルとして扱います。一方、RDDは低レベルのAPIであり、分散処理の基本単位となる抽象化されたデータセットです。DataFrameはRDD上に構築されており、RDDよりも高レベルで使いやすく効率的な処理が可能です。

以上がPySparkの基本的なセットアップと基礎知識の説明です。次章では、実際にPySparkを使用してデータセットの読み込みや基本的なデータ操作を行う方法について詳しく解説します。

データサイエンス100本ノックで実践

以下で説明する内容を使って、対象のデータを変更していることが多いため、各課題についての詳細な解説については省略します。一部特殊なものについては解説いたします。

PySparkでよく利用する機能

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

pyspark.sql.types :型変換する際にとても便利
pyspark.sql.functions:sum()やmean()などの計算用以外にもwhereのような条件指定処理を行うのにも使うので、とても便利
pyspark.sql.window :pandasでいうlagなどの処理をするのに利用する。windowの分け方を指定できるので、活用できると便利になる

データの読み込みと表示

まず、データサイエンス100本ノックで扱うデータを読み込みます。
read という機能があるので、こちらを利用します。

df_customer = spark.read.format("csv")\
                   .options(header="true", inferSchema="true")\
                   .load("customer.csv")

df_customer = spark.read.csv("customer.csv")

ほとんどの課題がデータの内容を「10件表示させよ。」 というものなので、データの内容を表示する必要があります。
show という機能があるので、こちらを利用します。

df_receipt.show(10, truncate=False)

データのフィルタリングと選択

ある条件を満たすデータを抽出する課題があるため、条件指定を行う必要があります。
filter という機能があるので、こちらを利用します。

df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .filter(df_receipt["customer_id"]=="CS018205000001").show()

df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
          .filter((F.col("customer_id")=="CS018205000001") & ((F.col("amount")>=1000) & (F.col("amount")<=2000))).show()

データの対象を指定して表示するものがあるため、指定のカラム(列)のみを取得する必要があります。
select という機能があるので、こちらを利用します。

df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"]).show(10)

データの集計とグループ化

データをグループ化して集約する方法としてgroupby という機能があるので、こちらを利用します。
groupby後にデータの集計を行うための多くの組み込み関数を利用することで、データの合計や平均、最大値・最小値の算出が可能になります。

df_receipt.groupBy("customer_id").max("sales_ymd").show(10)

df_receipt.groupBy("customer_id").agg({"sales_ymd":"min"}).show(10)

データの変換と追加

既存のデータフレームに新しい列を追加したり、不要な列を削除したりすることができます。
withColumndropを使用して列の追加・変換と削除が可能になります。

「販売価格 - 費用」から利益を計算し、列として追加する

df_product.withColumn("unit_profit", F.col("unit_price") - F.col("unit_cost")).show(10)

カラムの型を変更する

df_customer.withColumn("birth_day", F.to_date("birth_day")

カラムを削除する

df_customer.drop(F.col("gender"))

一部問題の解説

P-020:

レシート明細データフレーム(df_receipt)に対し、1件あたりの売上金額(amount)が高い順にランクを付与し、先頭10件を抽出せよ。項目は顧客ID(customer_id)、売上金額(amount)、付与したランクを表示させること。なお、売上金額(amount)が等しい場合でも別順位を付与すること。

P-020以外にもWindowを利用する場面がありますが、例として紹介します。

df_receipt.withColumn("row_number", F.row_number().over(Window.orderBy(F.col("amount").desc())))\
          .select(["customer_id", "amount", "row_number"]).show(10)

Windowは .partitionBy.orderBy を利用して活用します。

  • partitionBy:データの単位をどこで切るのか。例:日単位でデータを使う場合はpartitionBy("日付カラム")になります。
  • orderBy:昇順、降順の対象となるカラムはどれか。例:日単位で1時間前のデータを使う場合はpartitionBy("日付カラム").orderBy("時間カラム")になります。

P-046:

顧客データフレーム(df_customer)の申し込み日(application_date)はYYYYMMD形式の文字列型でデータを保有している。これを日付型(dateやdatetime)に変換し、顧客ID(customer_id)とともに抽出せよ。データは10件を抽出すれば良い。

回答としては以下になります。
直接to_date メソッドを使って変換することができない点が注意点になります。

df_customer.withColumn("application_date", F.col('application_date').cast(StringType()))\
           .withColumn("application_date", F.to_date(F.unix_timestamp(F.col("application_date"), "yyyyMMdd").cast("timestamp")))\
           .select(["customer_id", "application_date"]).show(10)

P-054:

顧客データデータフレーム(df_customer)の住所(address)は、埼玉県、千葉県、東京都、神奈川県のいずれかとなっている。都道府県毎にコード値を作成し、顧客ID、住所とともに抽出せよ。値は埼玉県を11、千葉県を12、東京都を13、神奈川県を14とすること。結果は10件表示させれば良い。

df_customer.withColumn("prefecture_cd", F.when(F.substring("address", 1, 3)=="埼玉県", "11")\
                                         .when(F.substring("address", 1, 3)=="千葉県", "12")\
                                         .when(F.substring("address", 1, 3)=="東京都", "13")\
                                         .when(F.substring("address", 1, 3)=="神奈川", "14")).show(10)

列追加の際にwhen を利用することである条件の場合の値を入れることが出来ます。
whenの条件に当てはまらない場合、otherwiseを利用することで値を入れることが可能です。(otherwiseがない場合はNull値になります。)

応用編

PySparkでは、カスタム機能の実装や拡張が可能です。(P-028などで利用しています。)
以下に、PySparkのUDF(ユーザー定義関数)の作成方法と使用例、そしてPySparkの拡張機能やカスタムライブラリの利用方法を紹介します。

  • PySparkのUDFの作成方法と使用例:PySparkでは、UDFを作成してDataFrameの列に適用することができます。
  • UDFを作成するには、Pythonの関数を定義してudf()メソッドを使用します。

具体的な手順は以下の通りです。

1. Pythonの関数を定義する

@F.udf(DoubleType())
def udf_median(values_list):
    med = np.median(values_list)
    return float(med)

2. DataFrameにUDFを適用する

df_receipt.groupBy("store_cd").agg(udf_median(F.collect_list("amount")).alias("median_amount")).show(5)
3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2