1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PySparkにて階層されているStructTypeを単一のカラムにフラット化する方法

Last updated at Posted at 2021-09-12

概要

PySparkにて階層されているStructTypeを単一のカラムにフラットする方法を共有します。

階層されているStructTypeとは、下記のデータフレームのstruct列のことです。
image.png

今回紹介するのは、structのカラムをstruct.strintg.in.structという単一のカラムにする方法です。
image.png

検証環境

databricks runtime: 8.3.x-cpu-ml-scala2.12
Python version: 3.8.8
pyspark version: 3.1.2.dev0

手順

1. データを準備

json_data ="""
{
    "array":[
        { "strintg.in.array": "値①"},
        { "strintg.in.array": "値②"},
        { "strintg.in.array": "値③"}
    ],
    "number":100,
    "string":"日本語",
    "struct":{
        "strintg.in.struct": "struct_1"
    },
}
"""

df = spark.read.json(sc.parallelize([json_data]))

df.display()
df.createOrReplaceTempView('tmp_nested_columns')

df.printSchema()

2. 関数を定義

def get_flatten_transformation(df:DataFrame, combine_str = '_'):
    """StructTypeの階層をフラット化
    """
    cols=[col(e['name']).alias(e['renamed_column_name']) for e in _get_defination(df.schema.jsonValue(), combine_str=combine_str)] 
    return df.select(cols)

def _get_defination(schema:dict, prefix='', combine_str='_'):
    """Sparkデータフレームのスキーマを辞書の変数としてリターン値
    """        
    fields = schema['fields']
    list_definations=[]
    prefix = prefix+'.' if prefix !='' else ''
    for field in fields:
        if type(field['type']) is dict:
            if field['type']['type'] == 'struct':
                column_name_prefix = f'{prefix}`{field["name"]}`'
                list_definations.extend(DataEngineering.get_defination(field['type'], column_name_prefix))
            else:
                field["name"] = f'{prefix}`{field["name"]}`'
                field["renamed_column_name"] = f'{field["name"]}'.replace('.', combine_str).replace('`', '')
                list_definations.append(field)
        else:
            field["name"] = f'{prefix}`{field["name"]}`'
            field["renamed_column_name"] = f'{field["name"]}'.replace('.', combine_str).replace('`', '')
            list_definations.append(field)
    return list_definations

3. 階層されているStructTypeをフラット化

df_2 = get_flatten_transformation(df)

df_2.printSchema()
df_2.display()

image.png

4. ArrayTypeのデータフレームをフラット化

# ArrayTypeをexplode関数によりStructTypeに変換
df_2_explode = (df
                    .select('*',explode('array').alias('_array'))
                    .drop('array')
                    .withColumnRenamed('_array','array')
               )

df_2_explode.display()
df_2_explode.printSchema()

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?