Pandas vs PySpark DataFrame With Examples - Spark by {Examples}の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
PandasとPySparkデータフレームの違いを、定義、機能、利点、作成方法、互いに変換する方法のサンプルを通じて学びましょう。
Pandasとは?
Pandasは分析のために構造化表データを取り扱うために最も使用されているオープンソースのPythonライブラリの一つです。Pandasライブラリは、データ分析、機械学習、データサイエンスプロジェクトなどで多く使われています。
Pandasは、CSV、JSON、SQLなどのフォーマットからデータをロードすることができ、(SQLテーブルと同じように)行と列を含む構造化オブジェクトであるデータフレームを作成します。
分散処理をサポートしていないので、増大するデータをサポートするために追加の馬力を必要とした際には、常にリソースを追加する必要があります。
Pandasデータフレームは可変であり、遅延評価されず、デフォルトで統計関数がそれぞれのカラムに適用されます。pandasに関しては、pandas DataFrame Tutorial For Beginners Guideで学習することができます。
Pandasデータフレームの例
PythonでPandasライブラリを使用するためには、import pandas as pd
を用いてインポートする必要があります。
listからPandasデータフレームを作成するサンプルを示します。
import pandas as pd
data = [["James","","Smith",30,"M",60000],
["Michael","Rose","",50,"M",70000],
["Robert","","Williams",42,"",400000],
["Maria","Anne","Jones",38,"F",500000],
["Jen","Mary","Brown",45,None,0]]
columns=['First Name','Middle Name','Last Name','Age','Gender','Salary']
# Create the pandas DataFrame
pandasDF=pd.DataFrame(data=data, columns=columns)
# print dataframe.
print(pandasDF)
以下のデータをコンソールに出力します。Pandasは全てのデータフレームにインデックスのシーケンス番号を追加することに注意してください。
Pandasの変換処理
Pandasデータフレームに実行できる変換処理のいくつかを以下に示します。デフォルトでそれぞれのカラムに対して統計関数が適用されることに注意してください。どのカラムに対して統計関数を適用したいのかを明示的に指定する必要はありません。count()
関数であって、(null/None値を無視して)それぞれのカラムのカウントを返します。
- df.count() – それぞれのカラムのカウントを返します(カウントには非null値のものだけが含まれます)。
- df.corr() – データフレーム内のカラムの相関を返します。
- df.head(n) – 上からn行を返します。
- df.max() – それぞれのカラムの最大値を返します。
- df.mean() – それぞれのカラムの平均値を返します。
- df.median() – それぞれのカラムの中央値を返します。
- df.min() – それぞれのカラムの最小値を返します。
- df.std() – それぞれのカラムの標準偏差を返します。
- df.tail(n) – 最後のn行を返します。
print(pandasDF.count())
First Name 5
Middle Name 5
Last Name 5
Age 5
Gender 4
Salary 5
print(pandasDF.max())
First Name Robert
Middle Name Rose
Last Name Williams
Age 50
Salary 500000
print(pandasDF.mean())
Age 41.0
Salary 206000.0
PySparkとは?
とても簡単に言うと、Pandasはシングルマシンでオペレーションを実行し、PySparkは複数台のマシンで処理を実行します。より大きなデータセットを取り扱う機械学習アプリケーションに取り組んでいるのであれば、Pandasよりもはるかに高速(100倍)オペレーションを実行できるPySparkが最適です。
PySparkは、NumPyを含み多くのデータサイエンスライブラリがPytohnで記述されていることから、PySparkはデータサイエンス、機械学習コミュニティで広く利用されています。また、Tensorflowも大規模データセットを効率的に処理できることから利用されています。PySparkはWalmart、Trivago、Sanofi、Runtasticなどの多くの企業で活用されています。
PySparkは、Apache Sparkの機能を用いてPythonを実行するための、Pythonで記述されたライブラリです。PySparkを用いることで、分散クラスター(マルチノード)あるいはシングルノードでもアプリケーションを並列に実行することができます。
Apache Sparkは大規模かつパワフルな分散データ処理、機械学習アプリケーションのための分析処理エンジンです。
ソース: https://databricks.com/
Sparkは基本的にScalaで記述されており、以降業界での導入が進んだことでPy4Jを用いてPython向けAPI PySparkがリースされました。Py4JはPySpark内でインテグレーションされており、Pythonが動的にJVMオブジェクトとやりとりすることを可能にしているので、PySparkを実行するにはPython、Apache SparkとJavaをインストールする必要があります。
さらに、開発においては、PySparkアプリケーションを実行するために、Spyder IDEやJupyter notebookのような有用なツールを数多く備えている(機械学習コミュニティで広く使われている)Anacondaディストリビューションを使うこともできます。
PySparkの機能
- インメモリの計算処理
- 並列化による分散処理
- さまざまなクラスターマネージャ(Spark、Yarn、Mesosなど)で利用可能
- フォールトトレラント
- イミュータブル
- 遅延評価
- キャッシュ & 永続化
- データフレームを使う際のビルトインの最適化処理
- ANSI SQLのサポート
PySparkの利点
- PySparkは、分散によってデータを効率的に処理できる汎用、インメモリ、分散処理エンジンです。
- PySparkで実行するアプリケーションは、従来システムより100倍高速です。
- データ取り込みパイプラインにおいてPySparkを用いることで非常に大きな効果を得ることができます。
- PySparkを用いることで、Hadoop HDFS、AWS S3など数多くのファイルシステムのデータを処理することができます。
- また、PySparkはストリーミングやKafkaを用いてリアルタイムデータを処理することに使うことができます。
- PySparkのストリーミングを用いることで、ファイルシステムやソケットからのストリームからファイルをストリーミングすることができます。
- PySparkはネイティブで機械学習、グラフライブラリを持っています。
PySparkのモジュールとパッケージ
- PySpark RDD (pyspark.RDD)
- PySpark DataFrame と SQL (pyspark.sql)
- PySpark Streaming (pyspark.streaming)
- PySpark MLib (pyspark.ml, pyspark.mllib)
- PySpark GraphFrames (GraphFrames)
- PySpark Resource (pyspark.resource) PySpark 3.0で新たに追加
PySparkデータフレームの例
PySparkデータフレームはイミュータブル(不変:作成すると変更できません)、フォールトトレンラントであり、変換処理は遅延評価されます(アクションが呼び出されるまで実行されません)。PySparkデータフレームはクラスターで分散され(PySparkデータフレームのデータはクラスターの異なるマシンに格納されることを意味します)、PySparkのいかなるオペレーションは全てのマシン上で並列に実行されます。
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples.com') \
.getOrCreate()
data = [("James","","Smith",30,"M",60000),
("Michael","Rose","",50,"M",70000),
("Robert","","Williams",42,"",400000),
("Maria","Anne","Jones",38,"F",500000),
("Jen","Mary","Brown",45,"F",0)]
columns = ["first_name","middle_name","last_name","Age","gender","salary"]
pysparkDF = spark.createDataFrame(data = data, schema = columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)
CSVファイルを読み込みます。
#Read a CSV file
df = spark.read.csv("/tmp/resources/zipcodes.csv")
PySparkの変換処理
PySparkの変換処理はLazyであり、アクションが呼び出されるまでは実行されないことを意味します。
from pyspark.sql.functions import mean, col, max
#Example 1
df2=pysparkDF.select(mean("age"),mean("salary"))
.show()
#Example 2
pysparkDF.groupBy("gender") \
.agg(mean("age"),mean("salary"),max("salary")) \
.show()
PySparkのSQL互換性
PySparkは変換処理を実行するためのSQLクエリーをサポートしています。必要なのは、PySparkデータフレームからテーブル/ビューを作成することです。
pysparkDF.createOrReplaceTempView("Employee")
spark.sql("select * from Employee where salary > 100000").show()
#Prints result
+----------+-----------+---------+---+------+------+
|first_name|middle_name|last_name|Age|gender|salary|
+----------+-----------+---------+---+------+------+
| Robert| | Williams| 42| |400000|
| Maria| Anne| Jones| 38| F|500000|
+----------+-----------+---------+---+------+------+
spark.sql("select mean(age),mean(salary) from Employee").show()
#Prints result
+---------+------------+
|mean(age)|mean(salary)|
+---------+------------+
| 41.0| 206000.0|
+---------+------------+
PandasからPySparkデータフレームの作成
複数のマシン上の全てのコアで並列実行を行うことでPySparkはPandasよりも高速にオペレーションを実行できるので、多くの場合、より良い性能を得るためにPandasデータフレームをPySpark(PythonによるSpark)に変換する必要が出てきます。これがPandasデータフレームとPySparkデータフレームの大きな違いの一つです。
#Create PySpark DataFrame from Pandas
pysparkDF2 = spark.createDataFrame(pandasDF)
pysparkDF2.printSchema()
pysparkDF2.show()
PySparkデータフレームからPandasの作成
Sparkde変換処理が完了した後で、toPandas()
メソッドを用いることで簡単にPandasに戻すことができます。
注意
toPandas()
メソッドは、データをSparkドライバーのメモリーに集めるアクションなので、大規模データセットを取り扱っている際には注意を払う必要があります。収集したデータがSparkドライバーのメモリーに収まらない際にはOutOfMemoryException
に遭遇することになります。
#Convert PySpark to Pandas
pandasDF = pysparkDF.toPandas()
print(pandasDF)
PythonとJVM間の転送のためにApache Arrowを活用
Apache SparkはPythonとJVMの間でデータを転送する際に、インメモリの列指向フォーマットであるApache Arrowを使用します。デフォルトでは無効化されているので、Apache Arrowを使うには有効化する必要があります。また、pip install pyspark[sql]
を用いるか、Apache Arrow for Pythonから直接ダウンロードして、全てのSparkクラスターノードにApache Arrow(PyArrow)をインストールする必要があります。
spark.conf.set("spark.sql.execution.arrow.enabled","true")
上の文を使用するには、Sparkと互換性のあるApache Arrowをインストールする必要があります。Apache Arrowをインストールしていない場合には以下のエラーが発生します。
\apps\Anaconda3\lib\site-packages\pyspark\sql\pandas\conversion.py:289: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
PyArrow >= 0.15.1 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
エラーが起きた場合には、Sparkは自動でArrowの最適化実装を使わない処理にフォールバックします。この挙動はspark.sql.execution.arrow.pyspark.fallback.enabled
で制御することができます。
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
注意
Apache Arrowは現在、MapType、TimestampTypeのArrayType、ネストされたStructType以外の全てのSpark SQLデータタイプをサポートしています。
PandasかPySparkかをどのように決めればいいのか
PandasではなくPySparkを選択すべき検討事項を以下に示します。
- お使いのデータが膨大かつ毎年増加し続けており、処理時間を改善したい。
- フォールトトレラントが必要
- ANSI SQL互換性
- 使用する言語(SparkはPython、Scala、Java、Rをサポートしています)
- 機械学習の機能が必要
- Parquet、Avro、Hive、Casandra、Snowflakeなどからデータを読み込みたい。
- データをストリーミングし、リアルタイムで処理したい。
結論
本書では、PandasとPySparkデータフレームの違い、機能、それぞれの作成方法、互いの変換方法を非常にハイレベルにカバーしました。
学びを楽しんでください!!