LoginSignup
72
63

More than 3 years have passed since last update.

Pyspark dataframe操作

Last updated at Posted at 2019-03-29

随時追記

表示

項目 コード
全件表示 .show()
10件表示 .show(10)
RDDで全件取得 .collect()
RDDで10件取得 .take(10)
RDDで10件取得 .head(10)
RDDで先頭1件取得 .first()
RDDで末尾1件取得 .last()

抽出 .filter

項目 コード
一致 .filter(col('col_name') == 'A'))
AND(※括弧必須) .filter((col('col_nameA') == 'A') & (col('col_nameB') == 'B'))
OR(※括弧必須) .filter((col('col_nameA') == 'A') | (col('col_nameB') == 'B'))
文字列を含む .filter(col('col_name').contains('A'))
NULL .filter(col('col_name').isNull())
Not NULL .filter(col('col_name').isNotNull())
SQL形式パターンマッチ .filter(col('col_name').like('sql pattern match'))
正規表現パターンマッチ .filter(col('col_name').rlike('regex pattern'))

列名の変更

# selectとaliasを利用する方法(他にも出力する列がある場合は列挙しておく)
df.select(col('col_name_before').alias('col_name_after'))

# withColumnRenamedを利用する方法
df.withColumnRenamed('col_name_before', 'col_name_after')

列の追加

case whenの様に列を追加する

.withColumn('new_column_name', when(col('colname') === "hoge", 1).otherwise(0))

結合 join

colを利用しない場合
left_dfとright_dfに同じ名前の列があっても使える

left_df.join(right_df, left_df.col_name == right_df.col_name, 'how')

colを利用する場合
left_dfとright_dfに同じ名前があると使えない

left_df.join(right_df, col('left_col_name') == col('right_col_name'), 'how')

結合方法はhowで指定する。
howを省略した場合はinnerになる

how 挙動
inner 内部結合。デフォルトで指定されている。
cross
outer
full
left
left_outer
right
right_outer
left_semi 右のDataFrameと共通の行だけ出力。
出力される列は左のDataFrameの列だけ
left_anti 右のDataFrameに無い行だけ出力される。
出力される列は左のDataFrameの列だけ。

重複削除

df.distinct()

ソート .orderBy

項目 コード
ソート .orderBy(col('col_name').asc())
逆順ソート .orderBy(col('col_name').desc())
複数条件ソート .orderBy(col('col_nameA').asc(), col('col_nameB').desc())

文字列操作

.select()の中で行う

項目 コード
小文字化 .select(lower(col('col_name'))
置換 .select(regexp_replace(col('col_name'), 'before', 'after'))
分割 .select(split(col('col_name'), 'delimiter'))

集計

項目 コード
件数 .count()
統計値 .describe(col('col_name'))
特定カラムの平均 .groupBy().avg('col_name')
複数カラムの平均 .groupBy().avg('col_name1', 'col_name2')
特定カラムの総和 .groupBy().sum('col_name')
複数カラムの総和 .groupBy().sum('col_name1', 'col_name2')
特定カラムの最大値 .groupBy().max('col_name')
複数カラムの最大値 .groupBy().max('col_name1', 'col_name2')
特定カラムの最小値 .groupBy().min('col_name')
複数カラムの最小値 .groupBy().min('col_name1', 'col_name2')

グルーピングと集計 (groupBy)

groupByでまとめてからaggで集計する

# 書き方1
df.groupBy(col('col_name')).agg({'col_name1': 'expr', 'col_name2': 'expr'})

#書き方2 .aliasがかけやすい
df.groupBy(col('col_name')).agg(sum(col('col_name')))

利用可能な集計方法

項目 expr
件数 count
平均 mean
平均 avg
総和 sum
最大値 max
最小値 min

展開

df.select( posexplode(col('array_col_name')) )

PandasのDataFrameに変換する

df.toPandas()

SQL問い合わせ結果のキャッシュ

キャッシュテーブルの作成。幾つか作り方はあるみたい。
peopleがHiveのテーブル。people2というキャッシュテーブルを作る。

output = sqlContext.sql("select * from people")
output.registerTempTable('people2')
sqlContext.cacheTable('people2')

キャッシュテーブルの呼び出し

sqlContext.sql("select * from people2").count()

参考URL
- https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740

72
63
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
72
63