はじめに
現在実務でデータ分析基盤を構築するプロジェクトに携わっており、
その中でも特に重要な役割を担ってるのが、共通前処理(EMR)を改修する機会があった。
処理を書いていく中でsparkについての知見が不足していると感じる機会が多かったため、
本講座を受講しようと思った。
※本投稿は、今回受講した講座の備忘録になります。
前提知識
・遅延評価
・sparkのアクション・トランスフォーメーションがあるということ
→アクション処理をして初めて動き出す
・分散処理しているということ
・エグゼキューター(子)とドライバー(親)
講座の内容
分散処理の基礎
MapReduce(Hadoop)
Map→Shaffle→Reduce
MAPフェーズ
keyとValueに分ける
Shuffleフェーズ
同じkeyのものを集める
Reduceフェーズ
keyごとに値を合算する
■MapReduceの欠点
①煩雑さ ②I/Oバウンド
→上記を解決するためにHiveとSparkが誕生
■HiveによってSQLライクに処理できるようになった
■sparkはファイルへの書き込みの代わりにインメモリに
データを保持するため、I/Oバウンドが解消
→ただし、メモリ上に保持するためにメモリ容量をたくさん用意しておく必要あり!
※OOM(アウトオブメモリ)を引き起こす可能性があるため
PySpark
ドライバーとエグゼキュータ
トランスフォーメーションとアクションについて
PySparkの基礎
※知らなかったもの、重要なもののみピックアップ
■型変換(withColumn)
from pyspark.sql import functions as F
sdf= sdf.withColumn('snap_CA',F.col('snap_CA').cast('int'))
■条件で抽出(where/filter)
# where
sdf.where((F.col('y')>0)&(F.col('y')<3)).show(n=5)
# filter
sdf.filter((F.col('y')>0)&(F.col('y')<3)).show(n=5)
# filter × filter
sdf.filter(F.col('y')>0).filter(F.col('y')<3).show(n=5)
# where × where
sdf.where(F.col('y')>0).where(F.col('y')<3).show(n=5)
■結合(join~on~how)
# 内部結合
sdf_join = sdf_y_sample.join(sdf_x_sample,on=['unique_id','ds'],how='inner')
# 左外部結合
sdf_join_left = sdf_y_sample.join(sdf_x_sample,on=['unique_id','ds'],how='left')
■条件分岐(when)
#条件分岐
sdf = sdf.withColumn('dauy_cat',F.when(F.dayofweek('ds')\
.isin(1,7),'Holiday')\
.otherwise('Weekday'))
■欠測値の取り扱い
# nanをNoneに置き換え
sdf_x = sdf_x.withColumn('event_name_1',F.when(F.col('event_name_1')=='nan',None).otherwise(F.col('event_name_1')))\
.withColumn('event_name_2',F.when(F.col('event_name_2')=='nan',None).otherwise(F.col('event_name_2')))
# NoneをNo Eventで埋める(fillna)
sdf_x_filled = sdf_x.fillna({'event_name_1':'No Event'}).show()
# Noneをdropする(dropna)
sdf_drop = sdf_x.dropna(subset=['event_name_1','event_name_2']).show()
■SparkSQL
# ビューを作成することでSQLを使ってクエリ可能
sdf.createOrReplaceTempView('daily_sales')
sdf_daily = spark.sql("""
SELECT
unique_id
,ds
,SUM(y) AS total‗value
FROM
daily_sales
GROUP BY
unique_id ,ds
"""
)
■UDFについて
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql import types as T
# 関数の作成
@pandas_udf(T.DoubleType())
def has_sales(y:pd.Series) ->pd.Series:
return (y>0).astype(int)
# 作った関数の呼び出し
sdf = sdf.withColumn('has_sales',has_sales('y'))
■SparkのDF→PandasのDFへの変換
pdf = sdf.toPandas()
※ただし、ドライバーに全データを集めることになるため、データ量を絞り込んでから
どうしてもPandasDFにないといけない場合のみ行うのが基本(Excel出力やグラフ可など)
まとめ
手を動かしながらPySparkの記載方法を学ぶいことができた点は非常によかったが、
sparkについては既存の知識以上のものはなかった。
他の講座も受講していく予定なので、また感想(備忘録)を残したいと思う。

