概要
Databricks (Spark) にて CSV 形式のファイルを参照する際のカラム数の推測仕様の調査結果を共有します。Databricks (Spark)では、ドキュメントにて次のように記載されている通り、1行目のデータに基づきカラムの長さ(カラム数)を推論することが記載されています。想定外に推論される CSV 形式のファイル例を示した上で、そのデータの検知方法を提示します。
- The first row of the file (either a header row or a data row) sets the expected row length.
引用元:CSV file | Databricks on AWS
- ファイルの最初の行 (ヘッダー行またはデータ行) は、予想される行の長さを設定します。
引用元:CSV ファイル | Databricks on AWS
事象の確認
想定外に推論される CSV 形式のファイル例
想定外に推論される CSV 形式のファイルとは、1 行目では 2 つのカラムがあるが 3 行目に 3 カラムとなっているようなファイルです。期待する結果としては 3 カラムになることですが、実際の結果は 2 カラムとなり2020-02-01
のデータが損失してしまいます。
str_col,int_col
"a",1
"b",2,"2020-02-01"
"c"
動作確認コードと結果
1. リソースの準備
次のコードにて CSV 形式のファイルを配置します。
src_path = "/FileStore/copy_into"
path = f"{src_path}/test_data_01.txt"
data = """
str_col,int_col
"a",1
"b",2,"2020-02-01"
"c"
""".strip()
dbutils.fs.put(path, data, True)
print(dbutils.fs.head(path))
2. データフレームの作成
df = spark.read.format("csv").option("header", "true").load(path)
df.display()
検知方法
データフレーム操作を実施する際の検知方法
_corrupt_record
列を追加したスキーマによりデータフレームを作成
カラム数が一致していないレコードでは、_corrupt_record
列に値が格納されます。
schema_df = spark.read.format("csv").option("header", "true").load(path)
schema = ",".join([f'{col_type[0]} {col_type[1]}' for col_type in schema_df.dtypes])
schema += ',_corrupt_record string'
df = (
spark.read.format("csv")
.schema(schema)
.option("header", "true")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.load(path)
)
df.display()
_corrupt_record
列の仕様については次のドキュメントに記載されています。
引用元:CSV ファイル | Databricks on AWS
COPY INTO 利用時の検知方法
暫定対応方法
COPY INTO にてスキーマを指定する方法を確認できていないため、_corrupt_record
列に値を格納する方法を採用できません。検知する方法としては、次のような暫定対応方法であります。
-
badRecordsPath
にて json 形式のファイルとして出力する方法 - 単一カラムとして読み取り
from_csv
関数にてパースする方法 - COPY INTO で取り込んだファイルを特定して replaceWhere による選択的上書きする方法
本記事で作成したリソースの削除
%sql
DROP SCHEMA IF EXISTS copy_into CASCADE;
dbutils.fs.rm('/FileStore/copy_into', True)