Edited at

Jupyter NotebookでのpySparkコードサンプル


TL;DR

Jupyter Notebookで、pySparkで、データ前処理して、機械学習ライブラリを通して、評価値を出すところのコード例です。適当なところをコピペしたりクックブックのように使ってください。細かいところはAPIリファレンスを参照願います。

https://spark.apache.org/docs/latest/api/python/index.html

筆者がなにか誤解しているとか、見つけられてないとか、もっと良いやり方があるかもしれません。有識者の突っ込み歓迎します。間違い探し遊びにどうでしょうか?


前提

Jupyter 環境で、pySparkなカーネルに接続していて、pyspark.sql.session.SparkSession であるようなspark があることが前提です。notebook環境であれば自然にそうなると思います。


バージョン情報など


  • JupyterLab 0.35.4

  • python 3.6.6

  • pySpark 2.4.0


タブ区切りファイルを読み込んで、DataFrameを生成する。

/tmp/dokosoko/test.tsv においてある、5カラムあるTSVファイルを読む場合。必要なカラムが先頭部分にある場合は、不要カラム以降についてはStructType.add()しなくてもよさそうです。


tsvを読む例.py

from pyspark.sql.types import *

fields = StructType()\
.add("nantoka_id", LongType(), False)\
.add("kantoka_name", StringType(), False)\
.add("dousita_value", FloatType(), False)\
.add("hogehoge_date", TimestampType(), True)\
.add("fugafuga_date", TimestampType(), True)

df = spark.read.format('csv')\
.option("delimiter", "\t")\
.schema(fields).load('/tmp/dokosoko/test.tsv')



行の選択・列の射影

# 選択

# 指定した期間内のレコードを取り出す例。

import datetime
from dateutil.relativedelta import relativedelta

string_today = '2019-04-05 10:00:00'
today = datetime.datetime.strptime(string_today, '%Y-%m-%d %H:%M:%S')
sevenDayBefore = today + relativedelta(days=-7)

df = df .filter((sevenDayBefore < df.entry_dt ) & (df.entry_dt < today ))

# 射影
df = df .select('nantoka_id', 'fugafuga_date')

選択の際の条件を & | などで繋ぐ場合カッコが必要なので注意。列順序を変えるときも射影で取り直せば良い。

参考: https://stackoverflow.com/questions/42912156/python-pyspark-data-frame-rearrange-columns


列の削除

df = df.drop('nantoka_id')

必要な列を選択するのと、不要な列を削除するのとどちらが速いかは未確認。(あまり変わらないかも?)


列のリネーム

df = df.withColumnRenamed('research_id', 'r_research_id')


列を何らかの処理で更新する(あるいは新しい列を作り処理結果を入れる。)

withColumn が使える。更新したい列名を指定できる。新しい名前の場合はそのような列が作られる。


pySparkのライブラリの関数を使う場合


日付の差を新カラムに入れる例.py

from pyspark.sql.functions import datediff

df = df.withColumn("dateDiff", datediff(df.update_dt , df.entry_dt ))



自前で処理を書く場合。


簡単な分岐の例

when() が使える。

from pyspark.sql.functions import when 

df = df.withColumn("count",when( 360 < col("count") , float(1)).otherwise( col("count") / 360))

otherwise()をつけないと、null が入ることになるので注意。


複雑なロジックがあるときの例

一旦udf (User Defined Function) を作って利用する。

def calcNantoka(param1, param2 ):

# ここになにか複雑な処理があると思ってください。
return param1

from pyspark.sql.functions import udf

# IntegerType() というように戻り値の型を指定する。
calcNantoka_udf = udf(calcNantoka, IntegerType())

df = df.withColumn("new_nantoka",calcNantoka_udf (df["hogehoge"], df["fugafuga"] ))

Pythonのデコレータを使うともっと端的に書けるようですけど未確認です。


group byしてなにかする例

agg で、なにか集約して新しい列にする。


件数と最大値を取る例.py


gb = df.groupby(['nantoka_id'])

import pyspark.sql.functions as F

# 行数をカウント、最終の日付を取得。
summaryDf = gb.agg(F.count('*').alias('hogehoge_count') ,F.max('update_dt').alias('last_event_date') )



条件付きで件数を取る例.py

gb = df.groupby(['nantoka_id'])

# 条件付きの行数カウントは一工夫必要(らしい。)
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
response_when = gb.agg(
cnt_cond(F.col('days_diff') < 1 ).alias('quick_num'),
cnt_cond(F.col('days_diff') < 2 ).alias('normal_num'),
cnt_cond(F.col('days_diff') < 3 ).alias('late_num')
)


参考:https://stackoverflow.com/questions/49021972/pyspark-count-rows-on-condition


複数のDataFrameを特定の列の値でjoinする場合

df = df.join(target, df.nantoka_id == target.nantoka_id_t ,"left")

joinする列の名前を違うようにしておかないと怒られる。予めリネームしておくと良い。

注:aliasでなんとかできるかもしれない。(未確認)


列のデータの相関係数を測ってみる。

一旦一つのカラムに、配列として置き直してからCorrelation.corr()を呼ぶ。

呼んだ後、結果のDFの1行目1列目の値を取得して表示する。

from pyspark.ml.feature import VectorAssembler

from pyspark.ml.stat import Correlation

# features カラムに置き直す。
assembler = VectorAssembler(
inputCols=["hoge_value","fuga_value","some_count","dousita_num"],
outputCol="features")

output = assembler.transform(df)
output.printSchema()
pearsonCorr = Correlation.corr(output, column='features').collect()[0][0]
print(str(pearsonCorr).replace('nan', 'NaN'))


GeneralizedLinearRegression.fit() 向けのデータを作って呼ぶ。

相関係数の計算のように、一つのカラムに配列として置き直してからfitを呼ぶ。その際正規化とか色々あるから、Pipelineを通すのがセオリーのようになる。XY に分けるのとは違うので面食らう向きもあるかもしれない。1

from pyspark.ml import Pipeline

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.regression import GeneralizedLinearRegression

# 入力DataFrameの、inputColのカラムの値を、1個のVectorに詰め直し、新たに features_not_scaled というカラムを作って features_not_scaled というカラム名にして入れる
assembler = VectorAssembler(
inputCols=["hoge_value","fuga_value","some_count","dousita_num"],
outputCol="features_not_scaled")

# features_not_scaled のvectorの内容を正規化して、 features カラムを作ってそこに入れる。
mmScaler = MinMaxScaler(inputCol="features_not_scaled", outputCol="features")

glr = GeneralizedLinearRegression()

pipeline = Pipeline(stages=[assembler, mmScaler, glr])
pipelineModel = pipeline.fit(train)


RegressionEvaluator を使う。

DataFrameの2個のカラムについて、差を元にした以下のような計算を行える。

予想値と正解値との差の総合的な評価に使える。


  • 決定係数 


    • R2



  • 平均絶対誤差


    • Mean Absolute Error (mae)



  • 平均二乗誤差


    • Mean Squared Error (mse)



  • 平均平方二乗誤差


    • Root Mean Square Error (rmse)




from pyspark.ml.evaluation import RegressionEvaluator

#デフォルトでは、予測値が prediction カラム、 正解値が label カラムである。
evaluator = RegressionEvaluator()
evaluator.evaluate(resultDataFrame)

print(evaluator.evaluate(resultDataFrame, {evaluator.metricName: "r2"}))
print(evaluator.evaluate(resultDataFrame, {evaluator.metricName: "mae"}))


参考URL

本家APIドキュメント

https://spark.apache.org/docs/latest/api/python/index.html





  1. 僕もです。:grinning: