3
0

More than 1 year has passed since last update.

pysparkのrdd.sortByで、関数の引数で受け取ったカラム名でソートしたい場合の解決法

Last updated at Posted at 2022-11-04

事象

関数の引数で受け取ったカラム名で、rdd.sortByを使って、repartitionする際にソートしつつrepartitionしたいが、strの引数で処理しようとするとエラーが出る

環境

python 3.7.4
pyspark 3.1.2

関数例

def hoge(df: pyspark.sql.DataFrame,  partition_num: int, sort_key: str) -> None:
    repartitioned_df = (
        df.rdd.sortBy(lambda r: r[sort_key], numPartitions=partition_num)
        .toDF()
    )
    
    repartitioned_df.write.csv('any_path')

飛んでくるエラー

df = spark.range(5)
>> df.show()
| id|
|---|
|  0|
|  1|
|  2|
|  3|
|  4|

>> hoge(df, 5, 'id')
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

なんかシリアライズ出来ないっぽい?

解決法

受け取ったカラム名のindex(DataFrameを左からの順番)(いい言葉あったら教えて下さい)を、keyfuncで渡すと行ける

関数例

def hoge(df: pyspark.sql.DataFrame,  partition_num: int, sort_key: str) -> None:
    sort_key_index = df.column.index(sort_key)
    repartitioned_df = (
        df.rdd.sortBy(lambda r: r[sort_key_index], numPartitions=partition_num)
        .toDF()
    )
    
    repartitioned_df.write.csv('any_path')

ちなみに

def hoge(df: pyspark.sql.DataFrame,  partition_num: int) -> None:
    repartitioned_df = df.rdd.sortBy(lambda r: r['id'], numPartitions=partition_num).toDF()
    
    repartitioned_df.write.csv('any_path')

カラム名をハードコーディングするといけた。
引数がダメっぽい。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0