3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

pandasの処理をPysparkで

Last updated at Posted at 2019-12-18

1.図

Apache Arrowのサイトに載っている図。

参考
image.png

図をみるとpandasがApache Arrowを経由してsparkというものにつながっている。
調べてみるとarrrowはいちいちcsvなどでデータ処理ツール間を経由させずに高速にデータを動かすことができる中間体みたいなものを提供し、sparkは分散処理に強いツールということでした。
特許データは集まるとかなり重いので、これはぜひ使ってみたい。ということで試してみることに

2.colaboratoryで実現

早速colaboratoryで実施できるようにしよう、ということで、ここを参照させてもらいつつ作成。

# 各ライブラリインストール
!pip install pyarrow
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

# 環境変数設定
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

# findsparkで環境設定
import findspark
findspark.init("/content/spark-2.4.4-bin-hadoop2.7")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

↑これを実行しておけばたぶんcolaboratoryでsparkが使えて、かつpyarrowも使える。
 (spark.conf.set("spark.sql.execution.arrow.enabled", "true")の部分)

 読み込み

spark_df = spark.read.csv('~.csv', header=True,encoding="sjis")

encoding="sjis"を使えば日本語も大丈夫問題なく読み込んでくれる。cp932じゃないのね。
が、parquetでsaveしようとするとエラーが出るのでなんとかしないと。調べると列名が問題のよう。

spark_df.write.save("test.parquet", format="parquet")

少し試し

たとえば

from pyspark.sql.functions import desc
spark_df.groupby("発明の名称").count().sort(desc("count")).show()

image.png

おお、ちゃんと出てくれる!SQLを叩いた後の結果のようでpandasとは違うんだなあと実感。

3.実験

ここを見ると結構ある。pandasの操作とsparkの操作を対比。
pandasっぽくできる部分と気を付けないといけない部分に注意。

import pandas as pd
spark = SparkSession.builder.master("local[*]").getOrCreate()
として

操作 pandas spark
csv読み込み pd.read_csv() spark.read.csv
excel読み込み pd.read_excel()
表示させる df spark_df.show()
.show()をつけないと表示されないので焦る。
先頭5行 df.head(5) spark_df.show(5)
substrの代わりに[0:4]もあり。
列追加(一部抽出で) df['newcolname']=df['colname'].str[0,4] spark_df.withColumn('newcolname', spark_df['colname'].substr(0,4)).show(5)
rename df.rename(columns={~~:**}) spark_df.withColumnRenamed('~~', '**')
groupby df.groupby() from pyspark.sql.functions import desc
spark_df.groupby('~').count().sort(desc("count")).show()
groupby⇒agg df.groupby(FUN) spark_df.groupby("A").agg(FUN).show()
列抽出 df[['A','B','D']] spark_df.select('A','B','D').show()
sort df.sort_values(by='A') spark_df.sort("A")
crosstab pd.crosstab(df['A'],df['B']) spark_df.crosstab('A','B').show()
UDF df.apply(FUN) from pyspark.sql.functions import pandas_udf, PandasUDFType
:pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").apply(normalize).show() (作成中・・・)
非常に多かったので、pandasで出来ることは大体実現できるのではという感触。
user defined function (UDF)も色々あったので追加しないと。
日付系もあるようなので、整理して後日追加予定。

2020/5/7追加

pandasとpysparkの間を取り持つライブラリを発見。その名もコアラ。
pandaslikeな操作感でpysparkを操作できる!
koalas

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?