6
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Udemy】データエンジニアリングのためのPySpark入門を受講してみた

Posted at

はじめに

現在実務でデータ分析基盤を構築するプロジェクトに携わっており、
その中でも特に重要な役割を担ってるのが、共通前処理(EMR)を改修する機会があった。
処理を書いていく中でsparkについての知見が不足していると感じる機会が多かったため、
本講座を受講しようと思った。
※本投稿は、今回受講した講座の備忘録になります。

前提知識

・遅延評価
・sparkのアクション・トランスフォーメーションがあるということ
 →アクション処理をして初めて動き出す
・分散処理しているということ
・エグゼキューター(子)とドライバー(親)

講座の内容

分散処理の基礎

MapReduce(Hadoop)

Map→Shaffle→Reduce

MAPフェーズ

keyとValueに分ける

Shuffleフェーズ

同じkeyのものを集める

Reduceフェーズ

keyごとに値を合算する

■MapReduceの欠点
①煩雑さ ②I/Oバウンド
→上記を解決するためにHiveとSparkが誕生

■HiveによってSQLライクに処理できるようになった

■sparkはファイルへの書き込みの代わりにインメモリに
データを保持するため、I/Oバウンドが解消
→ただし、メモリ上に保持するためにメモリ容量をたくさん用意しておく必要あり!
※OOM(アウトオブメモリ)を引き起こす可能性があるため

PySpark

ドライバーとエグゼキュータ

image.png

トランスフォーメーションとアクションについて

image.png

PySparkの基礎

※知らなかったもの、重要なもののみピックアップ
■型変換(withColumn)

from pyspark.sql import functions as F
sdf= sdf.withColumn('snap_CA',F.col('snap_CA').cast('int'))

■条件で抽出(where/filter)

# where
sdf.where((F.col('y')>0)&(F.col('y')<3)).show(n=5)

# filter
sdf.filter((F.col('y')>0)&(F.col('y')<3)).show(n=5)

# filter × filter
sdf.filter(F.col('y')>0).filter(F.col('y')<3).show(n=5)

# where × where
sdf.where(F.col('y')>0).where(F.col('y')<3).show(n=5)

■結合(join~on~how)

# 内部結合
sdf_join = sdf_y_sample.join(sdf_x_sample,on=['unique_id','ds'],how='inner')

# 左外部結合
sdf_join_left = sdf_y_sample.join(sdf_x_sample,on=['unique_id','ds'],how='left')

■条件分岐(when)

#条件分岐
sdf = sdf.withColumn('dauy_cat',F.when(F.dayofweek('ds')\
.isin(1,7),'Holiday')\
.otherwise('Weekday'))

■欠測値の取り扱い

# nanをNoneに置き換え
sdf_x = sdf_x.withColumn('event_name_1',F.when(F.col('event_name_1')=='nan',None).otherwise(F.col('event_name_1')))\
             .withColumn('event_name_2',F.when(F.col('event_name_2')=='nan',None).otherwise(F.col('event_name_2')))
             
# NoneをNo Eventで埋める(fillna)
sdf_x_filled = sdf_x.fillna({'event_name_1':'No Event'}).show()           

# Noneをdropする(dropna)
sdf_drop = sdf_x.dropna(subset=['event_name_1','event_name_2']).show()

■SparkSQL

# ビューを作成することでSQLを使ってクエリ可能
sdf.createOrReplaceTempView('daily_sales')
sdf_daily = spark.sql("""
SELECT
  unique_id
  ,ds
  ,SUM(y) AS total‗value
FROM
  daily_sales
GROUP BY
unique_id ,ds
"""
)

■UDFについて

from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql import types as T

# 関数の作成
@pandas_udf(T.DoubleType())
def has_sales(y:pd.Series) ->pd.Series:
  return (y>0).astype(int)

# 作った関数の呼び出し
sdf = sdf.withColumn('has_sales',has_sales('y'))

■SparkのDF→PandasのDFへの変換

pdf = sdf.toPandas()

※ただし、ドライバーに全データを集めることになるため、データ量を絞り込んでから
どうしてもPandasDFにないといけない場合のみ行うのが基本(Excel出力やグラフ可など)

まとめ

手を動かしながらPySparkの記載方法を学ぶいことができた点は非常によかったが、
sparkについては既存の知識以上のものはなかった。
他の講座も受講していく予定なので、また感想(備忘録)を残したいと思う。

6
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?