はじめに
僕は現在データサイエンスの仕事に携わっています。学生時代はpythonを使って研究をしていましたが、社会人になり業務の中でpysparkに触る機会が増えました。はじめはPandasと本質的に何が違うの?とかお作法が絶妙に違うじゃんとか躓くことがあったので、その経験をもとに記事を書いてみました。この記事の目的は僕のような初心者・初学者の方に少しでもPySparkについて理解してもらうことです。僕自身まだまだ経験や知識が不足しているので間違った情報もあるかもしれませんが、自分なりにPysparkについてpandasとの違いや躓きやすいポイントなどに触れながらまとめてみました。
PandasとPySparkの違い
PandasとPySparkは、両方ともPythonで書かれたデータフレームライブラリですが、それぞれの目的や機能に違いがあります。
Pandasは、オープンソースのPythonライブラリで、構造化表データを扱うために最も使用されています。Pandasは、CSV、JSON、SQLなどのフォーマットからデータをロードすることができ、行と列を含む構造化オブジェクトであるデータフレームを作成します。Pandasは、データ分析、機械学習、データサイエンスプロジェクトなどで多く使われています。Pandasデータフレームは可変であり、遅延評価されず、デフォルトで統計関数がそれぞれのカラムに適用されます。
一方、PySparkは、Apache Sparkの機能を用いてPythonを実行するための、Pythonで記述されたライブラリです。PySparkを用いることで、分散クラスター (マルチノード)あるいはシングルノードでもアプリケーションを並列に実行することができます。Apache Sparkは大規模かつパワフルな分散データ処理、機械学習アプリケーションのための分析処理エンジンです。PySparkは、NumPyを含み多くのデータサイエンスライブラリがPythonで記述されていることから、PySparkはデータサイエンス、機械学習コミュニティで広く利用されています。
以下の表は、PandasとPySparkの主な違いをまとめたものです。
項目 | Pandas | PySpark |
---|---|---|
目的 | 構造化表データを扱うためのPythonライブラリ | 大規模かつパワフルな分散データ処理、機械学習アプリケーションのための分析処理エンジン |
データ処理のスケーラビリティ | 単一のマシン上で動作するため、比較的小規模なデータセットに適している | Apache Sparkという分散データ処理フレームワークのPython APIであり、大規模なデータセットを処理することが可能 |
データの保持方法 | メモリ内のデータフレームを使用する | 分散データフレーム(RDDやDataFrame)を使用してデータを処理し、分散データ処理を容易にするためにパーティション化され、複数のノード上で操作される |
データ処理の速度 | 単一のマシン上で動作するため、比較的小規模なデータセットでは高速だが、大規模なデータセットに対してはパフォーマンスが低下する可能性があり | 複数のノードで分散処理を行うため、大規模なデータセットの処理において高速 |
分散処理 | 不可 | 可能 |
遅延評価 | 不可 | 可能 |
機械学習のサポート | Scikit-learnなどの一般的な機械学習ライブラリと統合しているが、分散処理のサポートはなし | Sparkの機械学習ライブラリであるMLlibを使用して、分散データセット上で機械学習モデルをトレーニングおよび適用することが可能 |
コミュニティのサポート | pandasプロジェクトによってサポートされているが、Sparkのような広範な分散データ処理機能は提供されていない | Apache Sparkプロジェクトの一部であり、大規模なコミュニティとサポートがある |
躓きやすいポイント
次にPandasとPysparkのコードの書き方違いで躓きやすいポイントを実例を挙げて説明します。
1. データの読み込みと保存
Pandasの場合
import pandas as pd
# CSVファイルの読み込み
df = pd.read_csv('example.csv')
# データの保存
df.to_csv('output.csv', index=False)
PySparkの場合
from pyspark.sql import SparkSession
# Sparkセッションの作成
spark = SparkSession.builder.appName('example').getOrCreate()
# CSVファイルの読み込み
df = spark.read.csv('example.csv', header=True, inferSchema=True)
# データの保存
df.write.csv('output.csv', header=True, mode='overwrite')
躓きやすいポイント:
- PySparkでは、データフレームの作成にSparkセッションが必要です。また、CSV読み込み時にスキーマ(データ型)を推測するための
inferSchema=True
を指定する必要があります。 - PySparkでは、データの保存時にmodeパラメータを指定する必要があります。
overwrite
やappend
などが一般的です。
2. カラムの選択と操作
Pandasの場合
# 特定のカラムの選択
selected_df = df[['column1', 'column2']]
# 新しいカラムの作成
df['new_column'] = df['column1'] + df['column2']
PySparkの場合
from pyspark.sql.functions import col
# 特定のカラムの選択
selected_df = df.select('column1', 'column2')
# 新しいカラムの作成
df = df.withColumn('new_column', col('column1') + col('column2'))
躓きやすいポイント:
- PySparkでは、
select
メソッドを使用して特定のカラムを選択します。また、新しいカラムを作成するにはwithColumn
メソッドを使用します。 - PySparkでは、
col
関数を使用してカラムを指定します。これにより、カラム名が文字列でなくてもよくなります。
3. データのフィルタリング
Pandasの場合
# 条件に基づいてフィルタリング
filtered_df = df[df['column'] > 10]
PySparkの場合
# 条件に基づいてフィルタリング
filtered_df = df.filter(col('column') > 10)
躓きやすいポイント:
- PySparkでは、
filter
メソッドを使用してデータをフィルタリングします。条件式内でカラムへのアクセスにcol
関数を使用します。
4. グループ化と集計
Pandasの場合
# グループ化と集計
result_df = df.groupby('group_column').agg({'value_column': 'sum'})
PySparkの場合
from pyspark.sql import functions as F
# グループ化と集計
result_df = df.groupBy('group_column').agg(F.sum('value_column'))
躓きやすいポイント:
- PySparkでは、集計関数を使用する場合、
agg
メソッドに対象の列と集計関数を指定します。pyspark.sql.functions
モジュールを使用して関数を指定します。
集計例
最後に上記の書き方を使って簡単な集計をしてみます。今回はstudents.csvという学生ID,科目,点数が記録されているファイルを使って科目ごとの平均点や合計点を集計してみます。
students.csv
の内容
student_id,subject,score
1,Math,80
2,Math,75
3,Science,90
1,Science,88
2,English,92
3,English,78
1,History,85
2,History,80
3,Math,95
集計コード
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Sparkセッションの作成
spark = SparkSession.builder.appName('example').getOrCreate()
# CSVファイルの読み込み
df = spark.read.csv('students.csv', header=True, inferSchema=True)
# データフレームの中身を表示
print("Original DataFrame:")
df.show()
# グループ化と集計
grouped_df = df.groupBy('subject').agg(
F.count('student_id').alias('num_students'),
F.avg('score').alias('average_score'),
F.sum('score').alias('total_score')
)
# 集計結果の表示
print("Aggregated DataFrame:")
grouped_df.show()
出力結果
+-------+------------+------------------+-----------+
|subject|num_students| average_score|total_score|
+-------+------------+------------------+-----------+
|English| 2| 85.0| 170|
| Math | 3| 83.33333333333333| 250|
|History| 2| 82.5| 165|
|Science| 2| 89.0| 178|
+-------+------------+------------------+-----------+
こんな感じでさくっと集計ができました。
さいごに
冒頭でも述べさせていただきました通り僕自身まだまだPySpark初心者なのでこれからも勉強していかねばなと思ってます。次はPolarsについて勉強して記事をかけたらいいなと思ってます。