随時追記
表示
項目 | コード |
---|---|
全件表示 | .show() |
10件表示 | .show(10) |
RDDで全件取得 | .collect() |
RDDで10件取得 | .take(10) |
RDDで10件取得 | .head(10) |
RDDで先頭1件取得 | .first() |
RDDで末尾1件取得 | .last() |
抽出 .filter
項目 | コード |
---|---|
一致 | .filter(col('col_name') == 'A')) |
AND(※括弧必須) | .filter((col('col_nameA') == 'A') & (col('col_nameB') == 'B')) |
OR(※括弧必須) | .filter((col('col_nameA') == 'A') | (col('col_nameB') == 'B')) |
文字列を含む | .filter(col('col_name').contains('A')) |
NULL | .filter(col('col_name').isNull()) |
Not NULL | .filter(col('col_name').isNotNull()) |
SQL形式パターンマッチ | .filter(col('col_name').like('sql pattern match')) |
正規表現パターンマッチ | .filter(col('col_name').rlike('regex pattern')) |
列名の変更
# selectとaliasを利用する方法(他にも出力する列がある場合は列挙しておく)
df.select(col('col_name_before').alias('col_name_after'))
# withColumnRenamedを利用する方法
df.withColumnRenamed('col_name_before', 'col_name_after')
列の追加
case whenの様に列を追加する
.withColumn('new_column_name', when(col('colname') === "hoge", 1).otherwise(0))
結合 join
colを利用しない場合
left_dfとright_dfに同じ名前の列があっても使える
left_df.join(right_df, left_df.col_name == right_df.col_name, 'how')
colを利用する場合
left_dfとright_dfに同じ名前があると使えない
left_df.join(right_df, col('left_col_name') == col('right_col_name'), 'how')
結合方法はhowで指定する。
howを省略した場合はinner
になる
how | 挙動 |
---|---|
inner | 内部結合。デフォルトで指定されている。 |
cross | |
outer | |
full | |
left | |
left_outer | |
right | |
right_outer | |
left_semi | 右のDataFrameと共通の行だけ出力。 出力される列は左のDataFrameの列だけ |
left_anti | 右のDataFrameに無い行だけ出力される。 出力される列は左のDataFrameの列だけ。 |
重複削除
df.distinct()
ソート .orderBy
項目 | コード |
---|---|
ソート | .orderBy(col('col_name').asc()) |
逆順ソート | .orderBy(col('col_name').desc()) |
複数条件ソート | .orderBy(col('col_nameA').asc(), col('col_nameB').desc()) |
文字列操作
.select()の中で行う
項目 | コード |
---|---|
小文字化 | .select(lower(col('col_name')) |
置換 | .select(regexp_replace(col('col_name'), 'before', 'after')) |
分割 | .select(split(col('col_name'), 'delimiter')) |
集計
項目 | コード |
---|---|
件数 | .count() |
統計値 | .describe(col('col_name')) |
特定カラムの平均 | .groupBy().avg('col_name') |
複数カラムの平均 | .groupBy().avg('col_name1', 'col_name2') |
特定カラムの総和 | .groupBy().sum('col_name') |
複数カラムの総和 | .groupBy().sum('col_name1', 'col_name2') |
特定カラムの最大値 | .groupBy().max('col_name') |
複数カラムの最大値 | .groupBy().max('col_name1', 'col_name2') |
特定カラムの最小値 | .groupBy().min('col_name') |
複数カラムの最小値 | .groupBy().min('col_name1', 'col_name2') |
グルーピングと集計 (groupBy)
groupByでまとめてからaggで集計する
# 書き方1
df.groupBy(col('col_name')).agg({'col_name1': 'expr', 'col_name2': 'expr'})
#書き方2 .aliasがかけやすい
df.groupBy(col('col_name')).agg(sum(col('col_name')))
利用可能な集計方法
項目 | expr |
---|---|
件数 | count |
平均 | mean |
平均 | avg |
総和 | sum |
最大値 | max |
最小値 | min |
展開
df.select( posexplode(col('array_col_name')) )
PandasのDataFrameに変換する
df.toPandas()
SQL問い合わせ結果のキャッシュ
キャッシュテーブルの作成。幾つか作り方はあるみたい。
peopleがHiveのテーブル。people2というキャッシュテーブルを作る。
output = sqlContext.sql("select * from people")
output.registerTempTable('people2')
sqlContext.cacheTable('people2')
キャッシュテーブルの呼び出し
sqlContext.sql("select * from people2").count()
参考URL