0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Azure Data Factory で Oracle Database の改行データを Azure Databricks Delta Lakeコネクターで書き込む際のエラー対応策

Posted at

概要

この記事では、Azure Data Factory を使用して Oracle Database から Azure Databricks Delta Lake コネクターに書き込む際に発生する特定のエラーへの対応方法を説明します。具体的には、Oracle Database の NLS_CHARACTERSETJA16SJISTILDE であり、データに改行(\n)が含まれている場合に次のようなエラーが発生します。対応策として、Azure Databricks Delta Lake コネクターのシンク時の設定に additionalFormatOptions を追加します。

ErrorCode=AzureDatabricksCommandError,Hit an error when running the command in Azure Databricks. Error details: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 18) (10.139.64.10 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F".

additionalFormatOptionsに関する詳細な説明については、以下の記事を参照してください。本記事では、手順のみ紹介します。

Azure Data Factory にて改行を含む区切りテキストファイルを Databricks Delta Lake コネクターにより書き込むパイプラインの実装方法 #Databricks - Qiita

エラーコードと対応策

事前準備

Oracle Database のNLS_CHARACTERSETJA16SJISTILDE であることを確認します。

SELECT
    *
FROM
    nls_database_parameters
WHERE
    parameter = 'NLS_CHARACTERSET';

image.png

Oracle Database でテーブルを作成し、データを挿入します。

/*
DROP TABLE control_character_table;
*/
CREATE TABLE multiline_table (
     text VARCHAR2(255),
     tgt_string VARCHAR2(10)
 )
;
TRUNCATE TABLE multiline_table; 
--
INSERT INTO multiline_table (text, tgt_string)
SELECT 'CR',  'C' || CHR(13) || 'R' FROM dual
UNION ALL
SELECT 'LF', 'L' || CHR(10) || 'F'  FROM dual
UNION ALL
SELECT'CRLF', 'CR' || CHR(13) || CHR(10) || 'LF' FROM dual
; 
COMMIT;
SELECT * FROM multiline_table 
;

image.png

Databricks で書き込み先のテーブルを作成します。

%sql
CREATE SCHEMA IF NOT EXISTS multi_line_test;
CREATE OR REPLACE TABLE multi_line_test.multiline_from_oracle (
  text string,
  tgt_string string
);

image.png

エラーとなるプログラムを確認

Oracle Database から Databricks への書き込みパイプラインを作成し、実行します。その後、エラーが発生することを確認します。

image.png

ErrorCode=AzureDatabricksCommandError,Hit an error when running the command in Azure Databricks. Error details: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 18) (10.139.64.10 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/multi_line_test.db/multiline_from_oracle.
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@{storage_name}.blob.core.windows.net/2bceab90-1216-4ea9-a6d2-4b846fea93a6/AzureDatabricksDeltaLakeImportCommand/SSMA.MULTILINE_TABLE.txt.
Caused by: org.apache.spark.SparkException: [MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION] Malformed records are detected in record parsing: [F",null].
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F"
Caused by: org.apache.spark.SparkRuntimeException: [MALFORMED_CSV_RECORD] Malformed CSV record: F".

対応策を実施

Azure Data Factory でパイプラインの右上に表示されている このソースの json コード表現を表示する をクリックして、定義の json を表示します。

image.png

次に、sink の項目を探し、importSettings に additionalFormatOptions を追加します。

"additionalFormatOptions": {
    "multiline": true
}

image.png

設定前後を比較すると、次のようになります。

"sink": {
    "type": "AzureDatabricksDeltaLakeSink",
    "importSettings": {
        "type": "AzureDatabricksDeltaLakeImportCommand"
    }
},

image.png

"sink": {
    "type": "AzureDatabricksDeltaLakeSink",
    "importSettings": {
        "type": "AzureDatabricksDeltaLakeImportCommand",
        "additionalFormatOptions": {
            "multiline": true
        }
    }
},

image.png

パイプラインを実行し、正常終了することを確認します。

image.png

最後に、Databricks でテーブルに書き込まれていることを確認します。

df = spark.table("multi_line_test.multiline_from_oracle")

print(df.collect())
df.display()
[
    Row(text="CR", tgt_string="C\rR"),
    Row(text="LF", tgt_string="L\nF"),
    Row(text="CRLF", tgt_string="CR\r\nLF"),
]

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?