はじめに
本記事では、「データサイエンス100本」をPySparkを使用して実施する方法について解説します。データサイエンス100本は、実践的なデータ分析のスキルを身に付けるための有名な課題集です。PySparkは、Apache SparkのPython向けAPIであり、大規模なデータセットを分散処理するための強力なツールです。
本記事の目的は、PySparkを使用したデータサイエンス100本ノックの実施方法を詳しく解説し、読者がPySparkを活用してデータ分析のスキルを向上させる手助けをすることです。具体的には、PySparkの基礎知識から始め、データセットの読み込み、クリーニング、変換、集計のタスクをPySparkを使って実際に解決していきます。
PySparkの特徴
PySparkの特徴は以下の通りです。
- 分散処理: PySparkは、Apache Sparkの分散処理エンジンを活用しており、大規模なデータセットの処理に優れています。複数のマシンやクラスタ上で処理を分散させることで、高速なデータ処理を実現します。
- データ操作の柔軟性: PySparkは、高レベルのデータ操作インターフェースであるDataFrameを提供しています。DataFrameは、異なるデータ形式やソースからのデータの読み込みや変換を容易に行うことができます。
- 拡張性: PySparkはPythonをベースとしており、Pythonの豊富なライブラリやエコシステムとの連携が可能です。また、ユーザー定義関数(UDF)の作成やカスタム処理の追加などもサポートしています。
- パフォーマンスの最適化: PySparkは、データのパーティショニングやクエリの最適化など、パフォーマンスの最適化に関する豊富な機能を提供しています。これにより、大規模データセットでも効率的に処理を行うことができます。
本記事では、PySparkの基礎知識から始めながら、データサイエンティスト100本ノックの各課題をPySparkを使って解決していきます。PySparkの特徴を活かしながら、データ分析の手法を紹介します。パフォーマンスの最適化については本記事では対象外としています。
次章では、データサイエンティスト100本ノックについて詳しく説明します。
データサイエンス100本ノックについて
データサイエンティスト協会が提供している構造化データ(テーブルデータ)についての読み込みやクリーニング、変換、集計の問題が100問収録されている課題集です。
PySparkは、データサイエンス100本ノックにおいて有用なツールの1つです。以下の理由から、PySparkを使用することで効率的なデータ処理と分析が可能となります。
-
スケーラビリティ: PySparkは、分散処理エンジンであるApache Sparkを基にしており、複数のマシンやクラスタで処理を分散することができます。これにより、大容量のデータを高速に処理することが可能です。
-
柔軟なデータ操作: PySparkは、高レベルのデータ操作インターフェースであるDataFrameを提供しています。DataFrameを使用することで、異なるデータ形式やソースからのデータの読み込みや変換を容易に行うことができます。また、SQLライクなクエリ言語もサポートしており、データの操作や集計を簡潔に記述することができます。
-
拡張性と豊富な機能: PySparkはPythonをベースとしており、Pythonの豊富なライブラリやエコシステムとの連携が可能です。また、ユーザー定義関数(UDF)の作成やカスタム処理の追加などもサポートされています。これにより、データの前処理や特徴量エンジニアリングにおいて、柔軟かつ高度な処理を行うことができます。
データサイエンス100本ノックで扱うデータは小規模ですが、解説記事でよく扱われているPandasと同様の処理をPySparkで実現することができることを目標としています。
次章では、PySparkの基本的な使い方とデータの読み込みについて詳しく説明します。
PySparkのセットアップと基礎知識
PySparkのセットアップと基礎知識について説明します。
なお、本記事では環境設定を省略するため、Google Colabを利用しています。
まずは、PySparkをインストールします。
!pip install pyspark
その後、Sparkのセッションを立ち上げます。appNameは適当で大丈夫です。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[1]") \
.appName("IAB") \
.getOrCreate()
以下のようなSession情報が出れば、成功です。
SparkSession - in-memory
SparkContext
Version v3.3.0
Master local[1]
AppName IAB
PySparkの基本的なデータ操作と変換については以下のような内容です。
-
データの読み込みと表示: PySparkでは、様々なデータソースからデータを読み込むことができます。例えば、CSVファイル、JSONファイル、データベース、Parquetファイルなどです。データの読み込みには、spark.readメソッドを使用します。読み込んだデータはDataFrameとして表現され、.show()メソッドを使用して表示することができます。
-
データのフィルタリングと選択: DataFrameでは、SQLライクなクエリを使用してデータのフィルタリングや選択を行うことができます。例えば、filter()メソッドやwhere()メソッドを使用して条件に一致する行を抽出することができます。また、select()メソッドを使用して特定の列を選択することも可能です。
-
データの集計とグループ化: DataFrameでは、データの集計やグループ化を行うためのメソッドが提供されています。例えば、groupBy()メソッドを使用して特定の列でグループ化し、agg()メソッドを使用して集計関数(例: sum、avg、maxなど)を適用することができます。
-
データの変換と追加: DataFrameでは、既存の列に対して変換を行ったり、新たな列を追加することができます。例えば、withColumn()メソッドを使用して既存の列に対して演算や関数を適用し、新たな列を追加することができます。
-
DataFrameとRDDの違い: PySparkでは、DataFrameとRDD(Resilient Distributed Dataset)という2つのデータ構造が使用されます。DataFrameは、データを構造化して表現するための高レベルのAPIであり、スキーマ情報を持った行と列のテーブルとして扱います。一方、RDDは低レベルのAPIであり、分散処理の基本単位となる抽象化されたデータセットです。DataFrameはRDD上に構築されており、RDDよりも高レベルで使いやすく効率的な処理が可能です。
以上がPySparkの基本的なセットアップと基礎知識の説明です。次章では、実際にPySparkを使用してデータセットの読み込みや基本的なデータ操作を行う方法について詳しく解説します。
データサイエンス100本ノックで実践
以下で説明する内容を使って、対象のデータを変更していることが多いため、各課題についての詳細な解説については省略します。一部特殊なものについては解説いたします。
PySparkでよく利用する機能
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window
pyspark.sql.types :型変換する際にとても便利
pyspark.sql.functions:sum()やmean()などの計算用以外にもwhereのような条件指定処理を行うのにも使うので、とても便利
pyspark.sql.window :pandasでいうlagなどの処理をするのに利用する。windowの分け方を指定できるので、活用できると便利になる
データの読み込みと表示
まず、データサイエンス100本ノックで扱うデータを読み込みます。
read という機能があるので、こちらを利用します。
df_customer = spark.read.format("csv")\
.options(header="true", inferSchema="true")\
.load("customer.csv")
や
df_customer = spark.read.csv("customer.csv")
ほとんどの課題がデータの内容を「10件表示させよ。」 というものなので、データの内容を表示する必要があります。
show という機能があるので、こちらを利用します。
df_receipt.show(10, truncate=False)
データのフィルタリングと選択
ある条件を満たすデータを抽出する課題があるため、条件指定を行う必要があります。
filter という機能があるので、こちらを利用します。
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
.filter(df_receipt["customer_id"]=="CS018205000001").show()
や
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"])\
.filter((F.col("customer_id")=="CS018205000001") & ((F.col("amount")>=1000) & (F.col("amount")<=2000))).show()
データの対象を指定して表示するものがあるため、指定のカラム(列)のみを取得する必要があります。
select という機能があるので、こちらを利用します。
df_receipt.select(["sales_ymd", "customer_id", "product_cd", "amount"]).show(10)
データの集計とグループ化
データをグループ化して集約する方法としてgroupby という機能があるので、こちらを利用します。
groupby後にデータの集計を行うための多くの組み込み関数を利用することで、データの合計や平均、最大値・最小値の算出が可能になります。
df_receipt.groupBy("customer_id").max("sales_ymd").show(10)
や
df_receipt.groupBy("customer_id").agg({"sales_ymd":"min"}).show(10)
データの変換と追加
既存のデータフレームに新しい列を追加したり、不要な列を削除したりすることができます。
withColumn やdropを使用して列の追加・変換と削除が可能になります。
「販売価格 - 費用」から利益を計算し、列として追加する
df_product.withColumn("unit_profit", F.col("unit_price") - F.col("unit_cost")).show(10)
カラムの型を変更する
df_customer.withColumn("birth_day", F.to_date("birth_day")
カラムを削除する
df_customer.drop(F.col("gender"))
一部問題の解説
P-020:
レシート明細データフレーム(df_receipt)に対し、1件あたりの売上金額(amount)が高い順にランクを付与し、先頭10件を抽出せよ。項目は顧客ID(customer_id)、売上金額(amount)、付与したランクを表示させること。なお、売上金額(amount)が等しい場合でも別順位を付与すること。
P-020以外にもWindowを利用する場面がありますが、例として紹介します。
df_receipt.withColumn("row_number", F.row_number().over(Window.orderBy(F.col("amount").desc())))\
.select(["customer_id", "amount", "row_number"]).show(10)
Windowは .partitionBy と .orderBy を利用して活用します。
- partitionBy:データの単位をどこで切るのか。例:日単位でデータを使う場合は
partitionBy("日付カラム")
になります。 - orderBy:昇順、降順の対象となるカラムはどれか。例:日単位で1時間前のデータを使う場合は
partitionBy("日付カラム").orderBy("時間カラム")
になります。
P-046:
顧客データフレーム(df_customer)の申し込み日(application_date)はYYYYMMD形式の文字列型でデータを保有している。これを日付型(dateやdatetime)に変換し、顧客ID(customer_id)とともに抽出せよ。データは10件を抽出すれば良い。
回答としては以下になります。
直接to_date メソッドを使って変換することができない点が注意点になります。
df_customer.withColumn("application_date", F.col('application_date').cast(StringType()))\
.withColumn("application_date", F.to_date(F.unix_timestamp(F.col("application_date"), "yyyyMMdd").cast("timestamp")))\
.select(["customer_id", "application_date"]).show(10)
P-054:
顧客データデータフレーム(df_customer)の住所(address)は、埼玉県、千葉県、東京都、神奈川県のいずれかとなっている。都道府県毎にコード値を作成し、顧客ID、住所とともに抽出せよ。値は埼玉県を11、千葉県を12、東京都を13、神奈川県を14とすること。結果は10件表示させれば良い。
df_customer.withColumn("prefecture_cd", F.when(F.substring("address", 1, 3)=="埼玉県", "11")\
.when(F.substring("address", 1, 3)=="千葉県", "12")\
.when(F.substring("address", 1, 3)=="東京都", "13")\
.when(F.substring("address", 1, 3)=="神奈川", "14")).show(10)
列追加の際にwhen を利用することである条件の場合の値を入れることが出来ます。
whenの条件に当てはまらない場合、otherwiseを利用することで値を入れることが可能です。(otherwiseがない場合はNull値になります。)
応用編
PySparkでは、カスタム機能の実装や拡張が可能です。(P-028などで利用しています。)
以下に、PySparkのUDF(ユーザー定義関数)の作成方法と使用例、そしてPySparkの拡張機能やカスタムライブラリの利用方法を紹介します。
- PySparkのUDFの作成方法と使用例:PySparkでは、UDFを作成してDataFrameの列に適用することができます。
- UDFを作成するには、Pythonの関数を定義してudf()メソッドを使用します。
具体的な手順は以下の通りです。
1. Pythonの関数を定義する
@F.udf(DoubleType())
def udf_median(values_list):
med = np.median(values_list)
return float(med)
2. DataFrameにUDFを適用する
df_receipt.groupBy("store_cd").agg(udf_median(F.collect_list("amount")).alias("median_amount")).show(5)