3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

pysparkでデータ加工するための自分用チートシート

Last updated at Posted at 2022-03-15

案件でビッグデータを加工するジョブをそれなりに作成してきました。
その中で使ってきたpysparkの備忘録です。
こういう記事は他にもたくさんあるけれど、自分用にメモ書きしていきます。

■ファイルの読み込み

# ライブラリのインポート
#functionsモジュールをインポート
import pyspark.sql.functions as F
# windowモジュールをインポート
from pyspark.sql.window import Window

★ファイルの操作

■ファイルの読み込み

# 読み込みたいファイルが格納されているディレクトリパス(単一ファイル指定可)
dir_path = '(ディレクトリパス)'
# 読み込み形式(csv or parquet or json or jdbc)
file_type = 'csv'
# headerの有無(true or false)
necessity= 'true'
### 読み込み
df = spark.read.format(file_type).option('header', necessity).load(dir_path)
### テーブルから読み込み
df = spark.read.format('jdbc').option('url', '(jdbcのurl)').option('dbtable', '(SQL文)').option('user', '(DBのユーザ名)').option('password', '(DBのパスワード)')

■ファイルの書き出し

# 書き出し先ディレクトリパス
dir_path = '(ディレクトリパス)'
# 書き出し形式(csv or parquet or json)
file_type = 'csv'
# 書き出しモード(overwrite or append)
write_mode = 'overwrite'
# headerの有無(true or false)
necessity= 'true'
# パーティション数(出力ファイル数)を変更してから書き込むことも可能
df = df.repartition(file_num)
# 処理は早いがパーティション数を減らすときのみ使用可能
df = df.coalesce(file_num)
※1ファイルが100MBになるように設定すると早いらしい
# パーティションカラムを設定すると、出力時のディレクトリ構成を設定できる
df = df.partitionBy("col1", "col2")
※出力時のパス:dir_path/col1=値/col2=値/***.parquet
### 書き込み
df.write.format(file_type).option('header', necessity).mode(write_mode).save(dir_path)
※.option('emptyValue', '')とすることで空文字を明示的に空文字として出力できる

★データの操作

■新規カラムの作成

# 新規作成カラム名(既存と同じものにすると更新される)
new_col_name = 'new_col'
# 参照するカラム
old_col_name = 'old_col'
### カラムを追加
df = df.withColumn( new_col_name, col(old_col_name) )
### カラム名の変換
df = df.withColumnRenamed( old_col_name, new_col_name )

■フィルタリング

df = df.filter( (条件式) )
※whereでも同じ挙動になる
※戻り値がbooleanなら使える
※df = df.filter( ~(条件式) )とすることでnotになる
※not検索ではnullは表示されないので個別にisNull()の条件を絡めて表示する

■列削除

df = df.drop('hoge', 'fuga')

■指定列の抽出

# 列指定
col_list = ['hoge', 'fuga']
### 列抽出
select_df = df.select( col_list )
### リスト化
col_data_list = df.select(col_list[0]).rdd.flatMap(lambda x: x).collect()

■キャスト

# 型指定
cast_kind = 'string'
### キャスト
df = df.withColumn( new_col_name, F.col(old_col_name).cast(cast_kind) )

■正規表現(戻り値:true or false)

# 正規表現パターン
pattern = '^\d+$'
### 正規表現
df = df.withColumn( new_col_name, F.col(old_col_name).rlike(pattern) )

■リストの内容含む(戻り値:true or false)

# 含んでいてほしいデータ群
ok_list = ['hoge', 'fuga']
### 対象データが含まれていればtrueを返す
df = df.withColumn( new_col_name, F.col(old_col_name).isin(ok_list) )

■null値の扱い

### nullをチェック(戻り値:true or false)
# nullのレコードを検索
df = df.withColumn( new_col_name, F.col(old_col_name).isNull() )
# nullではないレコードを検索
df = df.withColumn( new_col_name, F.col(old_col_name).isNotNull() )
### null値の補完
# nullの置換が必要なカラムを指定
target_col = 'col_name'
df = df.fillna('hoge', subset=target_col)
※カラム指定なしなら、全カラムが対象になる
# 辞書形式で複数条件の補完可能
fillna_dict = { "target_col1": 'hoge1', "target_col2": 'hoge2'}
df = df.fillna(fillna_dict)
### nullを含むレコードを削除
# nullのチェックが必要なカラムを指定
target_col = 'col_name'
df = df.dropna(subset=target_col)
※カラム指定なしなら、全カラムが対象になる

■テーブル名の付与(ジョインする前によく使う)

# テーブル名の定義
table_name = 'hoge'
### エイリアスの付与
df = df.alias(table_name)
# カラム名単体にも使える
target_col = 'hoge'
new_col_name = 'fuga'
df = df.select(target_col).alias(new_col_name)

■ジョイン

### 横結合
# ジョインキー
join_key = 'col_name'
※異なるカラムどうしでジョインする場合はSQLのように書く
# 結合方式(inner, cross, outer, left, right, left_anti)
how = 'left'
※left_antiジョインは、null値を同値として扱ってくれないので注意
df = df1.join(df2, join_key, how)
### 縦結合(ただのunionは事故るので列名参照する)
df = df1.unionByName(df2)

■集計

### カウント
df = df.groupBy(col_name).count()
### 最大値
df = df.groupBy(col_name).agg({col_name : 'max'})
### 最小値
df = df.groupBy(col_name).agg({col_name : 'min'})
### 平均値
df = df.groupBy(col_name).agg({col_name : 'ave'})

■ソート

# 昇順
df = df.sort(col_name, ascending=True)
# 降順
df = df.sort(col_name, ascending=False)
# 昇順
df = df.orderBy(col('col_name').asc())
# 降順
df = df.orderBy(col('col_name').desc())

■データ加工のお供"pyspark.sql.functions"
公式:https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html

### 列を複製
df = df.withColumn( new_col_name, F.col(old_col_name) )
### 同じ値で埋めた列を作成
df = df.withColumn( new_col_name, F.lit(1) )
### 条件によって値を変化させた列を作成
df = df.withColumn(new_col_name, 
                   F.when( (条件式) ,
                          (trueの場合の値)
                         ).otherwise( (falseの場合の値) ) 
                   )
※whenはチェーン可→F.when().when().otherwise()
### 文字列の連結
df = df.withColumn(new_col_name, 
                   F.concat( F.col(old_col_name), F.lit('hoge') )
                  )
### 文字列の置換
df = df.withColumn(new_col_name, 
                   F.translate( old_col_name, (置換対象文字列), (置換後文字列) )
                  )
### 文字列の置換(正規表現)
pattern = '^\d+$'
df = df.withColumn(new_col_name, 
                   F.regexp_replace( old_col_name, pattern, (置換後文字列) )
                  )
### 絶対値に変換
df = df.withColumn( new_col_name, F.abs( F.col(old_col_name) ) )
### 文字列形式の変換(例として、数字を0埋め3桁変換)
pattern = '%03d'
df = df.withColumn( new_col_name, F.format_string(pattern, F.lit(5)) )
### 文字列の抽出(2~4文字目を取得)
df = df.withColumn( new_col_name, F.substring(F.col(old_col_name), 2, 4) )
### 文字列の大文字化
df = df.withColumn( new_col_name, F.upper(F.col(old_col_name)) )
### 文字列の小文字化
df = df.withColumn( new_col_name, F.lower(F.col(old_col_name)) )
### 文字列の長さを取得
df = df.withColumn( new_col_name, F.length(F.col(old_col_name)) )
### 文字列を分割する(例として"/"区切り)
pattern = '/'
df = df.withColumn( new_col_name, F.split(F.col(old_col_name), pattern) )
### 文字列を逆さにする
df = df.withColumn( new_col_name, F.reverse(F.col(old_col_name)) )

##### window関数(集計関数)を使う #####
# 順位付け対象カラム名
col_name = 'hoge'
# 集積関数の定義
myWindow = Window.orderBy(col_name)
# パーティションあり
partition_col = 'fuga'
myWindow = Window.partitionBy(partition_col).orderBy(col_name)
### 順位付け
df = df.withColumn( new_col_name, F.rank().over(myWindow) )
# denseRank:同率ランクがある場合の順位飛ばしあり
# rank:同率ランクがある場合の順位飛ばしなし
# rowNumber:順位の重複なし

##### 日付系 #####
# 日付形式
pattern = 'yyyyMMdd'
### 指定の日付形式の文字列からdate型へ変更
df = df.withColumn( new_col_name, F.to_date( F.col(old_col_name), pattern ) )
### 指定の日付形式の文字列からtimestamp型へ変更
df = df.withColumn( new_col_name, F.to_timestamp( F.col(old_col_name), pattern ) )
### 指定した形式のdate型へ変更
df = df.withColumn( new_col_name, F.date_format( F.col(old_col_name), pattern ) )
### 日付の計算(左から右を引き算して日数を返す)(戻り値:int)
df = df.withColumn( new_col_name, F.datediff( F.col(date_col1), F.col(date_col2) ) )

★データの検証

### レコード数のカウント
df.count()
### 1行目を表示
df.head()
### 上から指定した分だけ表示
num = 5
df.show(num)
### カラム一覧表示
df.columns
### データフレームどうしの比較(カラムは一致すること)
df = df1.subsract(df2)
※戻り値はleft anti joinの結果
### 指定行のデータを抜き出す
# index列を作成して固定化(sparkデータフレームは行が流動的)
df.createOrReplaceTempView('df')
temp_df = spark.sql('SELECT *, row_number() over (order by 'some_column') as index FROM df').persist()
# 例として5行目から10行目を抜き出す
select_row_df = temp_df.filter(temp_df.index.between(5, 10))
3
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?