1.図
Apache Arrowのサイトに載っている図。
図をみるとpandasがApache Arrowを経由してsparkというものにつながっている。
調べてみるとarrrowはいちいちcsvなどでデータ処理ツール間を経由させずに高速にデータを動かすことができる中間体みたいなものを提供し、sparkは分散処理に強いツールということでした。
特許データは集まるとかなり重いので、これはぜひ使ってみたい。ということで試してみることに
2.colaboratoryで実現
早速colaboratoryで実施できるようにしよう、ということで、ここを参照させてもらいつつ作成。
# 各ライブラリインストール
!pip install pyarrow
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
# 環境変数設定
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
# findsparkで環境設定
import findspark
findspark.init("/content/spark-2.4.4-bin-hadoop2.7")
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
↑これを実行しておけばたぶんcolaboratoryでsparkが使えて、かつpyarrowも使える。
(spark.conf.set("spark.sql.execution.arrow.enabled", "true")の部分)
読み込み
spark_df = spark.read.csv('~.csv', header=True,encoding="sjis")
encoding="sjis"を使えば日本語も大丈夫問題なく読み込んでくれる。cp932じゃないのね。
が、parquetでsaveしようとするとエラーが出るのでなんとかしないと。調べると列名が問題のよう。
spark_df.write.save("test.parquet", format="parquet")
少し試し
たとえば
from pyspark.sql.functions import desc
spark_df.groupby("発明の名称").count().sort(desc("count")).show()
おお、ちゃんと出てくれる!SQLを叩いた後の結果のようでpandasとは違うんだなあと実感。
3.実験
ここを見ると結構ある。pandasの操作とsparkの操作を対比。
pandasっぽくできる部分と気を付けないといけない部分に注意。
import pandas as pd
spark = SparkSession.builder.master("local[*]").getOrCreate()
として
操作 | pandas | spark |
---|---|---|
csv読み込み | pd.read_csv() | spark.read.csv |
excel読み込み | pd.read_excel() | 無 |
表示させる | df | spark_df.show() .show()をつけないと表示されないので焦る。 |
先頭5行 | df.head(5) | spark_df.show(5) substrの代わりに[0:4]もあり。 |
列追加(一部抽出で) | df['newcolname']=df['colname'].str[0,4] | spark_df.withColumn('newcolname', spark_df['colname'].substr(0,4)).show(5) |
rename | df.rename(columns={~~:**}) | spark_df.withColumnRenamed('~~', '**') |
groupby | df.groupby() | from pyspark.sql.functions import desc spark_df.groupby('~').count().sort(desc("count")).show() |
groupby⇒agg | df.groupby(FUN) | spark_df.groupby("A").agg(FUN).show() |
列抽出 | df[['A','B','D']] | spark_df.select('A','B','D').show() |
sort | df.sort_values(by='A') | spark_df.sort("A") |
crosstab | pd.crosstab(df['A'],df['B']) | spark_df.crosstab('A','B').show() |
UDF | df.apply(FUN) | from pyspark.sql.functions import pandas_udf, PandasUDFType :pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def normalize(pdf): v = pdf.v return pdf.assign(v=(v - v.mean()) / v.std()) df.groupby("id").apply(normalize).show() (作成中・・・) |
非常に多かったので、pandasで出来ることは大体実現できるのではという感触。 | ||
user defined function (UDF)も色々あったので追加しないと。 | ||
日付系もあるようなので、整理して後日追加予定。 |
2020/5/7追加
pandasとpysparkの間を取り持つライブラリを発見。その名もコアラ。
pandaslikeな操作感でpysparkを操作できる!
koalas