案件でビッグデータを加工するジョブをそれなりに作成してきました。
その中で使ってきたpysparkの備忘録です。
こういう記事は他にもたくさんあるけれど、自分用にメモ書きしていきます。
■ファイルの読み込み
# ライブラリのインポート
#functionsモジュールをインポート
import pyspark.sql.functions as F
# windowモジュールをインポート
from pyspark.sql.window import Window
★ファイルの操作
■ファイルの読み込み
# 読み込みたいファイルが格納されているディレクトリパス(単一ファイル指定可)
dir_path = '(ディレクトリパス)'
# 読み込み形式(csv or parquet or json or jdbc)
file_type = 'csv'
# headerの有無(true or false)
necessity= 'true'
### 読み込み
df = spark.read.format(file_type).option('header', necessity).load(dir_path)
### テーブルから読み込み
df = spark.read.format('jdbc').option('url', '(jdbcのurl)').option('dbtable', '(SQL文)').option('user', '(DBのユーザ名)').option('password', '(DBのパスワード)')
■ファイルの書き出し
# 書き出し先ディレクトリパス
dir_path = '(ディレクトリパス)'
# 書き出し形式(csv or parquet or json)
file_type = 'csv'
# 書き出しモード(overwrite or append)
write_mode = 'overwrite'
# headerの有無(true or false)
necessity= 'true'
# パーティション数(出力ファイル数)を変更してから書き込むことも可能
df = df.repartition(file_num)
# 処理は早いがパーティション数を減らすときのみ使用可能
df = df.coalesce(file_num)
※1ファイルが100MBになるように設定すると早いらしい
# パーティションカラムを設定すると、出力時のディレクトリ構成を設定できる
df = df.partitionBy("col1", "col2")
※出力時のパス:dir_path/col1=値/col2=値/***.parquet
### 書き込み
df.write.format(file_type).option('header', necessity).mode(write_mode).save(dir_path)
※.option('emptyValue', '')とすることで空文字を明示的に空文字として出力できる
★データの操作
■新規カラムの作成
# 新規作成カラム名(既存と同じものにすると更新される)
new_col_name = 'new_col'
# 参照するカラム
old_col_name = 'old_col'
### カラムを追加
df = df.withColumn( new_col_name, col(old_col_name) )
### カラム名の変換
df = df.withColumnRenamed( old_col_name, new_col_name )
■フィルタリング
df = df.filter( (条件式) )
※whereでも同じ挙動になる
※戻り値がbooleanなら使える
※df = df.filter( ~(条件式) )とすることでnotになる
※not検索ではnullは表示されないので個別にisNull()の条件を絡めて表示する
■列削除
df = df.drop('hoge', 'fuga')
■指定列の抽出
# 列指定
col_list = ['hoge', 'fuga']
### 列抽出
select_df = df.select( col_list )
### リスト化
col_data_list = df.select(col_list[0]).rdd.flatMap(lambda x: x).collect()
■キャスト
# 型指定
cast_kind = 'string'
### キャスト
df = df.withColumn( new_col_name, F.col(old_col_name).cast(cast_kind) )
■正規表現(戻り値:true or false)
# 正規表現パターン
pattern = '^\d+$'
### 正規表現
df = df.withColumn( new_col_name, F.col(old_col_name).rlike(pattern) )
■リストの内容含む(戻り値:true or false)
# 含んでいてほしいデータ群
ok_list = ['hoge', 'fuga']
### 対象データが含まれていればtrueを返す
df = df.withColumn( new_col_name, F.col(old_col_name).isin(ok_list) )
■null値の扱い
### nullをチェック(戻り値:true or false)
# nullのレコードを検索
df = df.withColumn( new_col_name, F.col(old_col_name).isNull() )
# nullではないレコードを検索
df = df.withColumn( new_col_name, F.col(old_col_name).isNotNull() )
### null値の補完
# nullの置換が必要なカラムを指定
target_col = 'col_name'
df = df.fillna('hoge', subset=target_col)
※カラム指定なしなら、全カラムが対象になる
# 辞書形式で複数条件の補完可能
fillna_dict = { "target_col1": 'hoge1', "target_col2": 'hoge2'}
df = df.fillna(fillna_dict)
### nullを含むレコードを削除
# nullのチェックが必要なカラムを指定
target_col = 'col_name'
df = df.dropna(subset=target_col)
※カラム指定なしなら、全カラムが対象になる
■テーブル名の付与(ジョインする前によく使う)
# テーブル名の定義
table_name = 'hoge'
### エイリアスの付与
df = df.alias(table_name)
# カラム名単体にも使える
target_col = 'hoge'
new_col_name = 'fuga'
df = df.select(target_col).alias(new_col_name)
■ジョイン
### 横結合
# ジョインキー
join_key = 'col_name'
※異なるカラムどうしでジョインする場合はSQLのように書く
# 結合方式(inner, cross, outer, left, right, left_anti)
how = 'left'
※left_antiジョインは、null値を同値として扱ってくれないので注意
df = df1.join(df2, join_key, how)
### 縦結合(ただのunionは事故るので列名参照する)
df = df1.unionByName(df2)
■集計
### カウント
df = df.groupBy(col_name).count()
### 最大値
df = df.groupBy(col_name).agg({col_name : 'max'})
### 最小値
df = df.groupBy(col_name).agg({col_name : 'min'})
### 平均値
df = df.groupBy(col_name).agg({col_name : 'ave'})
■ソート
# 昇順
df = df.sort(col_name, ascending=True)
# 降順
df = df.sort(col_name, ascending=False)
# 昇順
df = df.orderBy(col('col_name').asc())
# 降順
df = df.orderBy(col('col_name').desc())
■データ加工のお供"pyspark.sql.functions"
公式:https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html
### 列を複製
df = df.withColumn( new_col_name, F.col(old_col_name) )
### 同じ値で埋めた列を作成
df = df.withColumn( new_col_name, F.lit(1) )
### 条件によって値を変化させた列を作成
df = df.withColumn(new_col_name,
F.when( (条件式) ,
(trueの場合の値)
).otherwise( (falseの場合の値) )
)
※whenはチェーン可→F.when().when().otherwise()
### 文字列の連結
df = df.withColumn(new_col_name,
F.concat( F.col(old_col_name), F.lit('hoge') )
)
### 文字列の置換
df = df.withColumn(new_col_name,
F.translate( old_col_name, (置換対象文字列), (置換後文字列) )
)
### 文字列の置換(正規表現)
pattern = '^\d+$'
df = df.withColumn(new_col_name,
F.regexp_replace( old_col_name, pattern, (置換後文字列) )
)
### 絶対値に変換
df = df.withColumn( new_col_name, F.abs( F.col(old_col_name) ) )
### 文字列形式の変換(例として、数字を0埋め3桁変換)
pattern = '%03d'
df = df.withColumn( new_col_name, F.format_string(pattern, F.lit(5)) )
### 文字列の抽出(2~4文字目を取得)
df = df.withColumn( new_col_name, F.substring(F.col(old_col_name), 2, 4) )
### 文字列の大文字化
df = df.withColumn( new_col_name, F.upper(F.col(old_col_name)) )
### 文字列の小文字化
df = df.withColumn( new_col_name, F.lower(F.col(old_col_name)) )
### 文字列の長さを取得
df = df.withColumn( new_col_name, F.length(F.col(old_col_name)) )
### 文字列を分割する(例として"/"区切り)
pattern = '/'
df = df.withColumn( new_col_name, F.split(F.col(old_col_name), pattern) )
### 文字列を逆さにする
df = df.withColumn( new_col_name, F.reverse(F.col(old_col_name)) )
##### window関数(集計関数)を使う #####
# 順位付け対象カラム名
col_name = 'hoge'
# 集積関数の定義
myWindow = Window.orderBy(col_name)
# パーティションあり
partition_col = 'fuga'
myWindow = Window.partitionBy(partition_col).orderBy(col_name)
### 順位付け
df = df.withColumn( new_col_name, F.rank().over(myWindow) )
# denseRank:同率ランクがある場合の順位飛ばしあり
# rank:同率ランクがある場合の順位飛ばしなし
# rowNumber:順位の重複なし
##### 日付系 #####
# 日付形式
pattern = 'yyyyMMdd'
### 指定の日付形式の文字列からdate型へ変更
df = df.withColumn( new_col_name, F.to_date( F.col(old_col_name), pattern ) )
### 指定の日付形式の文字列からtimestamp型へ変更
df = df.withColumn( new_col_name, F.to_timestamp( F.col(old_col_name), pattern ) )
### 指定した形式のdate型へ変更
df = df.withColumn( new_col_name, F.date_format( F.col(old_col_name), pattern ) )
### 日付の計算(左から右を引き算して日数を返す)(戻り値:int)
df = df.withColumn( new_col_name, F.datediff( F.col(date_col1), F.col(date_col2) ) )
★データの検証
### レコード数のカウント
df.count()
### 1行目を表示
df.head()
### 上から指定した分だけ表示
num = 5
df.show(num)
### カラム一覧表示
df.columns
### データフレームどうしの比較(カラムは一致すること)
df = df1.subsract(df2)
※戻り値はleft anti joinの結果
### 指定行のデータを抜き出す
# index列を作成して固定化(sparkデータフレームは行が流動的)
df.createOrReplaceTempView('df')
temp_df = spark.sql('SELECT *, row_number() over (order by 'some_column') as index FROM df').persist()
# 例として5行目から10行目を抜き出す
select_row_df = temp_df.filter(temp_df.index.between(5, 10))