12
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pandas API on SparkでpandasとSparkの良いところ取り

Last updated at Posted at 2023-11-19

こちらのサンプルを動かしながら、Pandas API on Spark(Koalas)を説明します。

以前にこちらの記事も翻訳してます。

Pandas API on Sparkとは

その前にpandasの話をさせてください。Pythonを使っている人であれば、まず間違いなく使ったことがあるであろうpandas。テーブルデータを取り扱う際には欠かせないものとなっています。ですので、学校、書籍などでデータサイエンスを学ぶ際には必修科目となっていることでしょう。しかし、pandasには以下のような課題があります。

  • メモリに乗り切らない大規模データは処理できません
  • (通常は)並列処理が行えません

一方で、文法はわかりやすく、小規模なデータを取り扱うのであればベストな選択肢と言えます。
Screenshot 2023-11-19 at 19.35.00.png

しかし、年を経るごとに取り扱うデータ量は増加の一途を辿っており、pandasで全てをやり切ることが困難になっているのも事実です。そう言った背景もあり、Apache Sparkなどの並列処理エンジンの人気が高まり、PythonのAPIであるPySparkも広く利用されるようになっています。

しかし、PySparkの文法はpandasとは結構異なっており、初学者にとっては敷居の高いものとなっていました。

pandasデータフレーム PySparkデータフレーム
列指定 df['col'] df['col']
データフレームの可変性 可変 不変
処理実行 即時実行(eagerly) 遅延実行(lazily)
列の加算 df['c'] = df['a'] + df['b'] df = df.withColumn('c', df['a'] + df['b'])
列名の変更 df.columns = ['a','b'] df = df.select(df['c1'].alias('a'),
df['c2'].alias('b'))
df = df.toDF('a', 'b')
値のカウント df['col'].value_counts() df.groupBy(df['col']).count().orderBy('count', ascending=False)

このようなギャップを埋める目的でスタートしたプロジェクトがKoalasです。

Screenshot 2023-11-19 at 19.43.37.png

  • 2019/4/24に発表
  • Apache Spark上でpandas APIを提供することが狙い
    • 馴染み深いAPIで2つのエコシステムを統合
    • 小規模データと大規模データ間のシームレスな移行
  • pandasユーザーのメリット
    • Koalasを用いてpandasコードをスケールアウト
    • PySparkの学習をより簡単に
  • PySparkユーザーのメリット
    • pandasライクの機能でより生産的に

そして、このKoalasプロジェクトはSpark 3.2でSparkに統合されたので、個別にKoalasをインストールしなくてもPandas API on Sparkでpandas APIを活用することができるのです!

pandas PySpark Pandas API on Spark(Koalas)
import pandas as pd
df = pd.read_csv("/path/to/my_data.csv")
df = (spark.read
.option("inferSchema", "true")
.csv("/path/to/my_data.csv"))
import pyspark.pandas as ps
df = ps.read_csv("/path/to/my_data.csv")
df.columns = ['x', 'y', 'z1'] df = df.toDF('x', 'y', 'z1') df.columns = ['x', 'y', 'z1']
df['x2'] = df.x * df.x df = df.withColumn('x2', df.x * df.x) df['x2'] = df.x * df.x

サンプルのウォークスルー

# Pandasのread_json(from pandas import read_json)ではなく、Sparkの分散能力を活用するために、単にpyspark pandasをインポートします
from pyspark.pandas import read_json
pdf = read_json(cloud_storage_path+"/koalas/users")
print(f"pdf is of type {type(pdf)}")
display(pdf)

このデータフレームはSparkデータフレームではなく、Pyspark.pandasデータフレームであることに注意してください。はまりそうなポイントが丁寧にまとめられています。以下のどのデータフレームを操作しているのかに注意を払う必要があります。

  • pandasデータフレーム
  • Sparkデータフレーム
  • Pandas on Sparkデータフレーム
pdf is of type <class 'pyspark.pandas.frame.DataFrame'>

Screenshot 2023-11-19 at 19.54.55.png

pandasデータフレームからPandas on Sparkデータフレームへの変換

from pyspark.pandas import from_pandas
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

# pandasデータフレームをpandas on sparkに変換し、Sparkのスピードと並列実効性の全てを手に入れます
pdf = from_pandas(df)
print(f"pdf is of type {type(pdf)}")
# 標準的なpandasオペレーションを適用します
pdf.mean()
pdf is of type <class 'pyspark.pandas.frame.DataFrame'>
A    0.083945
B   -0.044224
C    0.530140
D    0.577575
dtype: float64

SparkデータフレームからPandas on Sparkデータフレームへの変換

# また、一つの指示でSparkデータフレームをPandas on Sparkデータフレームに変換することができます
# 例として、Unity Catalogからひとつのテーブルを簡単に読み込んで、データサイエンスの分析にpandas APIを活用することができます
users = spark.read.json(cloud_storage_path+"/koalas/users").pandas_api()
print(f"users is of type {type(users)}")
users.describe()

Screenshot 2023-11-19 at 19.59.38.png

データセットの探索

users["age_group"].value_counts(dropna=False).sort_values()
10.0     497
0.0      510
7.0      941
3.0      976
2.0      990
1.0      991
9.0      995
8.0     1005
5.0     1014
6.0     1025
4.0     1056
Name: age_group, dtype: int64

Pandas on Sparkデータフレームに対するSQL実行

from pyspark.pandas import sql
age_group = 2
sql("""SELECT age_group, COUNT(*) AS customer_per_segment FROM {users} 
        where age_group > {age_group}
      GROUP BY age_group ORDER BY age_group """, users=users, age_group=age_group)

Screenshot 2023-11-19 at 20.01.11.png

可視化

Pandas API on Sparkはインタラクティブなチャート生成でplotlyを活用しています。洞察を得るには、.plotメソッドを使用します。

df = users.groupby('gender')['age_group'].value_counts().unstack(0)
df.plot.bar(title="Customer age distribution")

Screenshot 2023-11-19 at 20.02.17.png

まとめ

(私も)慣れ親しんでいるpandasのAPIを活用しながらも、Sparkの強力な並列分散処理能力を活用できる、Pandas API on Spark(Koalas)、是非ご活用ください!ただ、より複雑なことをやる可能性があるのであれば、このAPIを契機にしてPySparkも学ばれることもお勧めします。Databricksであれば、AIアシスタントも支援してくれます!

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

12
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?