事象
関数の引数で受け取ったカラム名で、rdd.sortByを使って、repartitionする際にソートしつつrepartitionしたいが、strの引数で処理しようとするとエラーが出る
環境
python 3.7.4
pyspark 3.1.2
例
関数例
def hoge(df: pyspark.sql.DataFrame, partition_num: int, sort_key: str) -> None:
repartitioned_df = (
df.rdd.sortBy(lambda r: r[sort_key], numPartitions=partition_num)
.toDF()
)
repartitioned_df.write.csv('any_path')
飛んでくるエラー
df = spark.range(5)
>> df.show()
| id|
|---|
| 0|
| 1|
| 2|
| 3|
| 4|
>> hoge(df, 5, 'id')
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
なんかシリアライズ出来ないっぽい?
解決法
受け取ったカラム名のindex(DataFrameを左からの順番)(いい言葉あったら教えて下さい)を、keyfuncで渡すと行ける
例
関数例
def hoge(df: pyspark.sql.DataFrame, partition_num: int, sort_key: str) -> None:
sort_key_index = df.column.index(sort_key)
repartitioned_df = (
df.rdd.sortBy(lambda r: r[sort_key_index], numPartitions=partition_num)
.toDF()
)
repartitioned_df.write.csv('any_path')
ちなみに
def hoge(df: pyspark.sql.DataFrame, partition_num: int) -> None:
repartitioned_df = df.rdd.sortBy(lambda r: r['id'], numPartitions=partition_num).toDF()
repartitioned_df.write.csv('any_path')
カラム名をハードコーディングするといけた。
引数がダメっぽい。