概要
この記事では、Azure Data Factory を使用して Oracle Database から Azure Databricks Delta Lake コネクターに書き込む際に発生する特定のエラーへの対応方法を説明します。具体的には、Oracle Database の NLS_CHARACTERSET
が JA16SJISTILDE
であり、データに改行(\n
)が含まれている場合に次のようなエラーが発生します。対応策として、Azure Databricks Delta Lake コネクターのシンク時の設定に additionalFormatOptions
を追加します。
ErrorCode=AzureDatabricksCommandError,Hit an error when running the command in Azure Databricks. Error details: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 18) (10.139.64.10 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F".
additionalFormatOptions
に関する詳細な説明については、以下の記事を参照してください。本記事では、手順のみ紹介します。
エラーコードと対応策
事前準備
Oracle Database のNLS_CHARACTERSET
が JA16SJISTILDE
であることを確認します。
SELECT
*
FROM
nls_database_parameters
WHERE
parameter = 'NLS_CHARACTERSET';
Oracle Database でテーブルを作成し、データを挿入します。
/*
DROP TABLE control_character_table;
*/
CREATE TABLE multiline_table (
text VARCHAR2(255),
tgt_string VARCHAR2(10)
)
;
TRUNCATE TABLE multiline_table;
--
INSERT INTO multiline_table (text, tgt_string)
SELECT 'CR', 'C' || CHR(13) || 'R' FROM dual
UNION ALL
SELECT 'LF', 'L' || CHR(10) || 'F' FROM dual
UNION ALL
SELECT'CRLF', 'CR' || CHR(13) || CHR(10) || 'LF' FROM dual
;
COMMIT;
SELECT * FROM multiline_table
;
Databricks で書き込み先のテーブルを作成します。
%sql
CREATE SCHEMA IF NOT EXISTS multi_line_test;
CREATE OR REPLACE TABLE multi_line_test.multiline_from_oracle (
text string,
tgt_string string
);
エラーとなるプログラムを確認
Oracle Database から Databricks への書き込みパイプラインを作成し、実行します。その後、エラーが発生することを確認します。
ErrorCode=AzureDatabricksCommandError,Hit an error when running the command in Azure Databricks. Error details: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 18) (10.139.64.10 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F".
対応策を実施
Azure Data Factory でパイプラインの右上に表示されている このソースの json コード表現を表示する
をクリックして、定義の json を表示します。
次に、sink の項目を探し、importSettings に additionalFormatOptions
を追加します。
"additionalFormatOptions": {
"multiline": true
}
設定前後を比較すると、次のようになります。
"sink": {
"type": "AzureDatabricksDeltaLakeSink",
"importSettings": {
"type": "AzureDatabricksDeltaLakeImportCommand"
}
},
"sink": {
"type": "AzureDatabricksDeltaLakeSink",
"importSettings": {
"type": "AzureDatabricksDeltaLakeImportCommand",
"additionalFormatOptions": {
"multiline": true
}
}
},
パイプラインを実行し、正常終了することを確認します。
最後に、Databricks でテーブルに書き込まれていることを確認します。
df = spark.table("multi_line_test.multiline_from_oracle")
print(df.collect())
df.display()
[
Row(text="CR", tgt_string="C\rR"),
Row(text="LF", tgt_string="L\nF"),
Row(text="CRLF", tgt_string="CR\r\nLF"),
]