概要
Databricks(Spark)にてDataFrameReaderのmode機能による区切りテキスト(CSV・TSV等)のソースファイルの読み取りエラーの動作確認内容を共有します。
CSVにおけるDataFrameReaderのmode機能とは、下記のようにドキュメントの記載があり、エラーレコード時の動作を変更するパラメーターです。_corrupt_record
を用意することで、エラーレコードを格納してくれるようです。
引用元:CSV Files - Spark 3.3.0 Documentation (apache.org)
引用元:Configuration - Spark 3.3.0 Documentation (apache.org)
検証結果としては、下記の動作であり、ドキュメントに記載の通りでした。
検証のデータ(string
列がB
のレコードにおけるvalue
に数値以外の文字(,
)を保持)
string | value |
---|---|
A | 100 |
B | 2,000 |
C | 300 |
検証結果
番号 | mode | 動作 |
---|---|---|
1 | permissive |
string がB レコードにおけるvalue 列がNULL |
2 | dropmalformed |
string がB レコードが非表示 |
3 | failfast | エラー終了 |
個人的な見解としては、failfast
のエラーメッセージからエラーレコードがあることを確認できないため、利用をおすすめしません。
SparkException: Job aborted due to stage failure: Task 0 in stage 89.0 failed 4 times, most recent failure: Lost task 0.3 in stage 89.0 (TID 118) (10.139.64.5 executor 0): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/FileStore/qiita/verify_correctness_of_the_data_in_csv/part-00000-tid-5748047470537696475-f3987640-a136-44e1-b5e8-0676855d100c-104-1-c000.csv.
詳細は下記のGithub pagesのページをご確認ください。
コードを実行したい方は、下記のdbcファイルを取り込んでください。
検証手順
事前準備
file_path = '/FileStore/qiita/verify_correctness_of_the_data_in_csv'
dbutils.fs.rm(file_path, True)
df = spark.createDataFrame([("A", "100"),("B", "2,000"),("C", "300")],["string", "value"])
df.display()
## dbfs上にcsvファイルとして格納
df.coalesce(1).write.format("CSV").mode("overwrite").option("header", "true").save(file_path)
# csvのパス
file_path_to_csv = f'{file_path}/*.csv'
# スキーマ定義をセット
schema = """
`string` string
,`value` integer
"""
# `_corrupt_record`列を追加したスキーマを作成
shema_with_col_record = f"""
{schema}
,`_corrupt_record` string
""".strip()
# ベースとなるデータフレーム読み込みの実施
df = (spark.read
.format("csv") #ファイル形式を指定
.schema(shema_with_col_record) #スキーマを指定
.option("header", "true") #ヘッダーの有無を指定
.option("inferSchema", "False") #スキーマの自動読み取りの有無を設定
)
mode:PERMISSIVEの場合におけるデータフレームの読み込み検証
df_display = df.option("mode", "permissive")
df_display.load(file_path_to_csv).display()
mode:DROPMALFORMEDの場合におけるデータフレームの読み込み検証
df_display = df.option("mode", "dropmalformed")
df_display.load(file_path_to_csv).display()
mode:FAILFASTの場合におけるデータフレームの読み込み検証
df_display = df.option("mode", "failfast")
df_display.load(file_path_to_csv).display()
関連記事
ノートブックをGithub Pageによる共有方法
Databricks(Azure Databricks)でGithub経由でノートブックを共有する方法 - Qiita