Edited at

Pyspark dataframe操作

随時追記

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html


表示

項目
コード

全件表示
.show()

10件表示
.show(10)

RDDで全件取得
.collect()

RDDで10件取得
.take(10)

RDDで10件取得
.head(10)

RDDで先頭1件取得
.first()

RDDで末尾1件取得
.last()


抽出 .filter

項目
コード

一致
.filter(col('col_name') == 'A'))

AND(※括弧必須)
.filter((col('col_nameA') == 'A') & (col('col_nameB') == 'B'))

OR(※括弧必須)
.filter((col('col_nameA') == 'A') | (col('col_nameB') == 'B'))

文字列を含む
.filter(col('col_name').contains('A'))

NULL
.filter(col('col_name').isNull())

Not NULL
.filter(col('col_name').isNotNull())

SQL形式パターンマッチ
.filter(col('col_name').like('sql pattern match'))

正規表現パターンマッチ
.filter(col('col_name').rlike('regex pattern'))


列名の変更

# selectとaliasを利用する方法(他にも出力する列がある場合は列挙しておく)

df.select(col('col_name_before').alias('col_name_after'))

# withColumnRenamedを利用する方法
df.withColumnRenamed('col_name_before', 'col_name_after')


列の追加


case whenの様に列を追加する

.withColumn('new_column_name', when(col('colname') === "hoge", 1).otherwise(0))


結合 join

colを利用しない場合

left_dfとright_dfに同じ名前の列があっても使える

left_df.join(right_df, left_df.col_name == right_df.col_name, 'how')

colを利用する場合

left_dfとright_dfに同じ名前があると使えない

left_df.join(right_df, col('left_col_name' == col('right_col_name'), 'how')

結合方法はhowで指定する。

howを省略した場合はinnerになる

how
挙動

inner
内部結合。デフォルトで指定されている。

cross

outer

full

left

left_outer

right

right_outer

left_semi
右のDataFrameと共通の行だけ出力。
出力される列は左のDataFrameの列だけ

left_anti
右のDataFrameに無い行だけ出力される。
出力される列は左のDataFrameの列だけ。


重複削除

df.distinct()


ソート .orderBy

項目
コード

ソート
.orderBy(col('col_name').asc())

逆順ソート
.orderBy(col('col_name').desc())

複数条件ソート
.orderBy(col('col_nameA').asc(), col('col_nameB').desc())


文字列操作

.select()の中で行う

項目
コード

小文字化
.select(lower(col('col_name'))

置換
.select(regexp_replace(col('col_name'), 'before', 'after'))

分割
.select(split(col('col_name'), 'delimiter'))


集計

項目
コード

件数
.count()

統計値
.describe(col('col_name'))

特定カラムの平均
.groupBy().avg('col_name')

複数カラムの平均
.groupBy().avg('col_name1', 'col_name2')

特定カラムの総和
.groupBy().sum('col_name')

複数カラムの総和
.groupBy().sum('col_name1', 'col_name2')

特定カラムの最大値
.groupBy().max('col_name')

複数カラムの最大値
.groupBy().max('col_name1', 'col_name2')

特定カラムの最小値
.groupBy().min('col_name')

複数カラムの最小値
.groupBy().min('col_name1', 'col_name2')


グルーピングと集計 (groupBy)

groupByでまとめてからaggで集計する

# 書き方1

df.groupBy(col('col_name')).agg({'col_name1': 'expr', 'col_name2': 'expr'})

#書き方2 .aliasがかけやすい
df.groupBy(col('col_name')).agg(sum(col('col_name')))

利用可能な集計方法

項目
expr

件数
count

平均
mean

平均
avg

総和
sum

最大値
max

最小値
min


展開

df.select( posexplode(col('array_col_name')) )


PandasのDataFrameに変換する

df.toPandas()


SQL問い合わせ結果のキャッシュ

キャッシュテーブルの作成。幾つか作り方はあるみたい。

peopleがHiveのテーブル。people2というキャッシュテーブルを作る。

output = sqlContext.sql("select * from people")

output.registerTempTable('people2')
sqlContext.cacheTable('people2')

キャッシュテーブルの呼び出し

sqlContext.sql("select * from people2").count()

参考URL

- https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740