1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks (Spark) にて CSV 形式のファイルを参照する際のカラム数の推測仕様

Posted at

概要

Databricks (Spark) にて CSV 形式のファイルを参照する際のカラム数の推測仕様の調査結果を共有します。Databricks (Spark)では、ドキュメントにて次のように記載されている通り、1行目のデータに基づきカラムの長さ(カラム数)を推論することが記載されています。想定外に推論される CSV 形式のファイル例を示した上で、そのデータの検知方法を提示します。

  • The first row of the file (either a header row or a data row) sets the expected row length.

引用元:CSV file | Databricks on AWS

  • ファイルの最初の行 (ヘッダー行またはデータ行) は、予想される行の長さを設定します。

引用元:CSV ファイル | Databricks on AWS

事象の確認

想定外に推論される CSV 形式のファイル例

想定外に推論される CSV 形式のファイルとは、1 行目では 2 つのカラムがあるが 3 行目に 3 カラムとなっているようなファイルです。期待する結果としては 3 カラムになることですが、実際の結果は 2 カラムとなり2020-02-01のデータが損失してしまいます。

str_col,int_col
"a",1
"b",2,"2020-02-01"
"c"

期待結果
abc[^fig]

実際の結果
image.png

動作確認コードと結果

1. リソースの準備

次のコードにて CSV 形式のファイルを配置します。

src_path = "/FileStore/copy_into"
path = f"{src_path}/test_data_01.txt"
data = """
str_col,int_col
"a",1
"b",2,"2020-02-01"
"c"
""".strip()
 
dbutils.fs.put(path, data, True)
 
print(dbutils.fs.head(path))

image.png

2. データフレームの作成

df = spark.read.format("csv").option("header", "true").load(path)
 
df.display()

image.png

検知方法

データフレーム操作を実施する際の検知方法

_corrupt_record列を追加したスキーマによりデータフレームを作成

カラム数が一致していないレコードでは、_corrupt_record列に値が格納されます。

schema_df = spark.read.format("csv").option("header", "true").load(path)

schema = ",".join([f'{col_type[0]} {col_type[1]}' for col_type in schema_df.dtypes])
schema += ',_corrupt_record string'

df = (
    spark.read.format("csv")
    .schema(schema)
    .option("header", "true")
    .option("columnNameOfCorruptRecord", "_corrupt_record")
    .load(path)
)

df.display()

image.png

_corrupt_record列の仕様については次のドキュメントに記載されています。

image.png

引用元:CSV ファイル | Databricks on AWS

COPY INTO 利用時の検知方法

暫定対応方法

COPY INTO にてスキーマを指定する方法を確認できていないため、_corrupt_record列に値を格納する方法を採用できません。検知する方法としては、次のような暫定対応方法であります。

  1. badRecordsPathにて json 形式のファイルとして出力する方法
  2. 単一カラムとして読み取りfrom_csv関数にてパースする方法
  3. COPY INTO で取り込んだファイルを特定して replaceWhere による選択的上書きする方法

本記事で作成したリソースの削除

%sql
DROP SCHEMA IF EXISTS copy_into CASCADE;
dbutils.fs.rm('/FileStore/copy_into', True)

image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?