1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks ( Spark ) にて固定長テキストファイル(fixed-length text files)をソースとした場合における Spark データフレームの作成方法

Last updated at Posted at 2022-10-28

概要

Databricks ( Spark ) にて 固定長テキストファイル(fixed-length text files)をソースとした場合のデータフレーム作成方法を共有します。

固定長テキストファイル(fixed-length text files)とは、下記のように1レコードの長さがスペースなどで固定(下記例では、全部で80字で固定)されているファイルです。

1                   user_aaa            user_aaa@test.com   53.5                180                 
2                   user_bbb            user_bbb@test.com   53.5                180                 
3                   user_ccc            user_ccc@test.com   53.5                180                 

image.png

単一列で取り込み後、substring 関数と trim 関数により新規列と追加することで、想定のデータフレームとなります。

from pyspark.sql.functions import substring, trim
 
 
def split_fixed_width_cols(
    df,
    col_num,
    col_str_width,
    base_col_name_prefix="_c",
    should_drop_first_col=True,
):
    first_col_name = df.columns[0]
    counter = 1
    for tgt_col_num in range(col_num):
        new_col_name = f"{base_col_name_prefix}{str(counter)}"
        pos = tgt_col_num * col_str_width
        df = df.withColumn(
            new_col_name, trim(substring(df[first_col_name], pos, col_str_width))
        )
        counter += 1
    if should_drop_first_col:
        df = df.drop(first_col_name)
    return df

image.png

image.png

固定長テキストファイルのパターンとしては、文字数ではなく、バイト数が固定のファイルがあります。下記図がそのサンプルであり、1 行目の最初の値が というマルチバイトであるため、その後のスペースが 2 行目や 3 行目より 1つ少なくなっております。その対応方法としては、バイナリーにエンコーディング後、バイト単位でバイナリーから値をを取得して、デコードすることで対応できます。

image.png

ただし、PySpark にて、shift_jis のバイト単位での固定ファイルを区切る場合には、対応しているエンコーディングが限られていることに注意してください。検証した際にはコードがに動作しましたが、想定外の動作をする可能性があります。また、Pandas API on Spark も、2022 年 11 月 7 日時点にて、サポート外のようです。性能に懸念がありますが、 Pandas で実装する方針がよさそうです。

image.png

引用元:pyspark.sql.functions.encode — PySpark 3.1.3 documentation (apache.org)

image.png

引用元:pyspark.pandas.Series.str.encode — PySpark 3.3.1 documentation (apache.org)

Windows の機種依存文字がソースに含まれる場合には、shift_jis ではなく、cp932 を指定したほうがいい場合があります。

image.png

実行例手順

1. すべてのカラムが共通の長さである場合の実行例

import inspect
from pyspark.sql import DataFrame, SparkSession
from pyspark.dbutils import DBUtils
 
 
def put_files_to_storage(
    path,
    content,
    is_overwrite=True,
):
    # 最初の行と最後の行を削除
    content = inspect.cleandoc(content)
 
    spark = SparkSession.getActiveSession()
    dbutils = DBUtils(spark)
    return dbutils.fs.put(path, content, is_overwrite)

# 検証用ファイルの配置
path = "dbfs:/test/test.txt"
 
csv_data_rows = """
1                   user_aaa            user_aaa@test.com   53.5                180                 
2                   user_bbb            user_bbb@test.com   53.5                180                 
3                   user_ccc            user_ccc@test.com   53.5                180                 
"""
 
put_files_to_storage(path, csv_data_rows)
 
# ファイルを確認
file_content = dbutils.fs.head(path)
print(file_content)

image.png

# データフレームを作成
df = spark.read.format("text").load(path)
df.display()

image.png

from pyspark.sql.functions import substring, trim
 
 
def split_fixed_width_cols(
    df,
    col_num,
    col_str_width,
    base_col_name_prefix="_c",
    should_drop_first_col=True,
):
    first_col_name = df.columns[0]
    counter = 1
    for tgt_col_num in range(col_num):
        new_col_name = f"{base_col_name_prefix}{str(counter)}"
        pos = tgt_col_num * col_str_width
        df = df.withColumn(
            new_col_name, trim(substring(df[first_col_name], pos, col_str_width))
        )
        counter += 1
    if should_drop_first_col:
        df = df.drop(first_col_name)
    return df
col_num = 5
col_str_width = 20
 
df_2 = split_fixed_width_cols(df, col_num, col_str_width)

image.png

df_2.display()

image.png

2. カラムごとに長さが異なる場合の実行例

import inspect
from pyspark.sql import DataFrame, SparkSession
from pyspark.dbutils import DBUtils
 
 
def put_files_to_storage(
    path,
    content,
    is_overwrite=True,
):
    # 最初の行と最後の行を削除
    content = inspect.cleandoc(content)
 
    spark = SparkSession.getActiveSession()
    dbutils = DBUtils(spark)
    return dbutils.fs.put(path, content, is_overwrite)

# 検証用ファイルの配置
path = "dbfs:/test/test.csv"
 
csv_data_rows = """
1    user_aaa            user_aaa@test.com   53.5      180  
2    user_bbb            user_bbb@test.com   53.5      180  
3    user_ccc            user_ccc@test.com   53.5      180  
"""

_ = put_files_to_storage(path, csv_data_rows)
 
```python
# ファイルを確認
file_content = dbutils.fs.head(path)
print(file_content)

image.png

# データフレームを作成
df = spark.read.format("text").load(path)
df.display()

image.png


from pyspark.sql.functions import substring, trim
 
 
def split_fixed_width_cols_with_col_width(
    df,
    col_str_widths,
    base_col_name_prefix="_c",
    should_drop_first_col=True,
):
    first_col_name = df.columns[0]
    pos = 0
    
    counter = 1
    for tgt_col_str_width in col_str_widths:
        new_col_name = f"{base_col_name_prefix}{str(counter)}"
        df = df.withColumn(
            new_col_name, trim(substring(df[first_col_name], pos, tgt_col_str_width))
        )
        pos += tgt_col_str_width
        counter += 1
    if should_drop_first_col:
        df = df.drop(first_col_name)
    return df

image.png

col_str_widths = [5, 20, 20, 10, 5]
 
df_3 = split_fixed_width_cols_with_col_width(df, col_str_widths)

image.png

df_3.display()

image.png

3. すべてのカラムが共通のバイト数である場合の実行例(文字コードがshift_jis

import inspect
from pyspark.sql import DataFrame, SparkSession
from pyspark.dbutils import DBUtils
 
 
def put_files_to_storage(
    path,
    content,
    is_overwrite=True,
):
    # 最初の行と最後の行を削除
    content = inspect.cleandoc(content)
 
    spark = SparkSession.getActiveSession()
    dbutils = DBUtils(spark)
    return dbutils.fs.put(path, content, is_overwrite)
  
# 検証用ファイルの配置
path = "dbfs:/test/test.txt"
 
csv_data_rows = """
あ        user_あ   a         
2         user_b    b         
3         user_c    c         
"""
 
_ = put_files_to_storage(path, csv_data_rows,)
 
# ファイルを確認
file_content = dbutils.fs.head(path)
print(file_content)

image.png

# データフレームを作成
df = spark.read.format("text").load(path)
df.display()

image.png

from pyspark.sql.functions import substring, trim, encode, decode
 
def split_fixed_byte_cols_with_encoding(
    df,
    col_num,
    col_byte,
    encoding = 'shift_jis',
    errors = 'ignore',
    base_col_name_prefix="_c",
    should_drop_first_col=True,
):
    first_col_name = df.columns[0]
    counter = 1
    pdf = df.toPandas()
    for tgt_col_num in range(col_num):
        start = tgt_col_num * col_byte
        stop = counter *  col_byte
        new_col_name = f"{base_col_name_prefix}{str(counter)}"
        pdf[new_col_name] = pdf[first_col_name].str.encode(encoding)
        pdf[new_col_name] = pdf[new_col_name].str.slice(start, stop)
        pdf[new_col_name] = pdf[new_col_name].str.decode(encoding, errors)
        counter += 1
 
    df = spark.createDataFrame(pdf)
 
    counter = 1
    for tgt_col_num in range(col_num):
        tgt_col_name = f"{base_col_name_prefix}{str(counter)}"
        df = df.withColumn(
            tgt_col_name, trim(df[tgt_col_name])
        )
        counter += 1
    
    if should_drop_first_col:
        df = df.drop(first_col_name)
    return df

image.png

col_num = 3
col_byte = 10
 
df_2 = split_fixed_byte_cols_with_encoding(df, col_num, col_byte)
df_2.display()

image.png

下記のように PySpark での動作を確認済みだが、前述のようにサポート外。

from pyspark.sql.functions import substring, trim, encode, decode
 
def split_fixed_byte_cols(
    df,
    col_num,
    col_byte,
    charset = 'shift_jis',
    errors = 'ignore',
    base_col_name_prefix="_c",
    should_drop_first_col=True,
):
    first_col_name = df.columns[0]
    counter = 1
    df = df.withColumn(first_col_name, encode(df[first_col_name], charset))
    for tgt_col_num in range(col_num):
        new_col_name = f"{base_col_name_prefix}{str(counter)}"
        start = 1 + (tgt_col_num * col_byte)
 
        df = df.withColumn(new_col_name, substring(df[first_col_name], start, col_byte))
        df = df.withColumn(new_col_name, decode(df[new_col_name], charset))
        counter += 1
 
    df = df.withColumn(first_col_name, decode(df[first_col_name], charset))
    if should_drop_first_col:
        df = df.drop(first_col_name)
    return df

col_num = 3
col_byte = 10
df_2 = split_fixed_byte_cols(df, col_num, col_byte)
df_2.display()

image.png

4. カラムごとにバイト数が異なる場合の実行例(文字コードがshift_jis

import inspect
from pyspark.sql import DataFrame, SparkSession
from pyspark.dbutils import DBUtils
 
 
def put_files_to_storage(
    path,
    content,
    is_overwrite=True,
):
    # 最初の行と最後の行を削除
    content = inspect.cleandoc(content)
 
    spark = SparkSession.getActiveSession()
    dbutils = DBUtils(spark)
    return dbutils.fs.put(path, content, is_overwrite)
  
# 検証用ファイルの配置
path = "dbfs:/test/test.txt"
 
csv_data_rows = """
あ   user_あ   a  
2    user_b    b  
3    user_c    c  
"""
 
_ = put_files_to_storage(path, csv_data_rows,)
 
# ファイルを確認
file_content = dbutils.fs.head(path)
print(file_content)

image.png

# データフレームを作成
df = spark.read.format("text").load(path)
df.display()

image.png

from pyspark.sql.functions import substring, trim, encode, decode

def split_fixed_bytes_cols_with_encoding(
    df,
    col_bytes,
    encoding = 'shift_jis',
    errors = 'ignore',
    base_col_name_prefix="_c",
    should_drop_first_col=True,
):
    first_col_name = df.columns[0]
    start = 0
    stop = 0 
    
    pdf = df.toPandas()
    counter = 1
    for tgt_col_byte in col_bytes:
        stop += tgt_col_byte
        new_col_name = f"{base_col_name_prefix}{str(counter)}"
        pdf[new_col_name] = pdf[first_col_name].str.encode(encoding)
        pdf[new_col_name] = pdf[new_col_name].str.slice(start, stop)
        pdf[new_col_name] = pdf[new_col_name].str.decode(encoding, errors)
        start += tgt_col_byte
        counter += 1

    df = spark.createDataFrame(pdf)
    counter = 1
    for tgt_col_byte in col_bytes:
        tgt_col_name = f"{base_col_name_prefix}{str(counter)}"
        df = df.withColumn(
            tgt_col_name, trim(df[tgt_col_name])
        )
        counter += 1
    

    if should_drop_first_col:
        df = df.drop(first_col_name)
    return df

image.png

col_bytes = [5, 10, 3]
 
df_2 = split_fixed_bytes_cols_with_encoding(df, col_bytes)
df_2.display()

image.png

事後処理

# 本手順で作成したリソースを削除
dbutils.fs.rm(path, True)

image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?