概要
Databricks ( Spark ) にてデータフレームのカラム名を一括で変更する方法を共有します。
withColumnRenamed
関数によりカラム名を変更できますが、多数のカラムが場合などには次のような関数を利用することがおすすめです。
def rename_df_cols(
df,
cols_names,
):
counter = 0
for new_col in cols_names:
counter_str = str(counter)
old_col_name = df.columns[counter]
df = df.withColumnRenamed(old_col_name, new_col)
counter += 1
return df
CSV をソースとしたデータフレームを header
を False
とした作成した場合に_#
というカラム名になるため、上記の関数が役立ちます。
実行例
事前準備
import inspect
from pyspark.sql import DataFrame, SparkSession
from pyspark.dbutils import DBUtils
def put_files_to_storage(
path,
content,
is_overwrite = True,
):
# 最初の行と最後の行を削除
content = inspect.cleandoc(content)
spark = SparkSession.getActiveSession()
dbutils = DBUtils(spark)
return dbutils.fs.put(path, content, is_overwrite)
# 検証用ファイルの配置
path = 'dbfs:/test/test.csv'
csv_data_rows = """
1,"user_aaa","user_aaa@test.com",53.5,180
2,"user_bbb","user_bbb@test.com",53.5,180
3,"user_ccc","user_ccc@test.com",53.5,180
"""
_ = put_files_to_storage(path, csv_data_rows)
# ファイルを確認
file_content = dbutils.fs.head(path)
print(file_content)
Wrote 125 bytes.
1,"user_aaa","user_aaa@test.com",53.5,180
2,"user_bbb","user_bbb@test.com",53.5,180
3,"user_ccc","user_ccc@test.com",53.5,180
# データフレームを作成
df = (
spark
.read
.format('csv')
.option("inferSchema", "False")
.option('header', 'False')
.load(path)
)
df.display()
1. カラムのリスト型変数により置換する方法
def rename_df_cols(
df,
cols_names,
):
counter = 0
for new_col in cols_names:
counter_str = str(counter)
old_col_name = df.columns[counter]
df = df.withColumnRenamed(old_col_name, new_col)
counter += 1
return df
col_names = ['id','name','mail','weight','height',]
df_2 = rename_df_cols(df, col_names)
df_2.display()
2. カラムの辞書型変数により置換する方法
def rename_df_cols_with_col_map(
df,
renamed_cols_names,
):
for existing_col,new_col in renamed_cols_names.items():
df = df.withColumnRenamed(existing_col, new_col)
return df
# ソースファイルにヘッダーがないため、カラム名を変更
renamed_cols_names = {
'_c0':'id',
'_c1':'name',
'_c2':'mail',
'_c3':'weight',
'_c4':'height',
}
df_3 = rename_df_cols_with_col_map(df, renamed_cols_names)
df_3.display()
事後処理
dbutils.fs.rm(path, True)