11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CData SoftwareAdvent Calendar 2024

Day 5

CData Virtuality に対して Spark DataFrame にて overwrite により書き込む際のエラーへの対応方法

Posted at

概要

CData Virtuality に対して Databricks (Spark) から jdbc により overwrite する場合に発生する下記のエラーへの対応方法を共有します。その対処方法として、 DELETE の処理を実行してから APPEND する方法を紹介します。

Py4JJavaError: An error occurred while calling o3043.save.
: org.teiid.jdbc.TeiidSQLException: com.datavirtuality.dv.core.replicator.ReplicatorException: In order to create or drop tables (or create a physical index) in the current data source, it is needed to specify the default schema using the data source parameter importer.defaultSchema.

image.png

Spark DataFrame で JDBC を使用してデータを overwrite モードでデフォルトの設定(truncate オプションがfalse )で書き込む場合にはテーブルを DROP してから書き込みを行うため、CData Virtuality に対する処理がエラーとなりました。また、truncate オプションを true に設定しても、ドキュメントの記載通り正常に動作しないことが確認されています。この問題を回避するため、DELETE 文でデータを削除してから APPEND モードでデータを書き込む代替手法を説明します。

MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this while PostgresDialect and default JDBCDirect doesn't.

引用元:JDBC To Other Databases - Spark 3.5.1 Documentation

MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect、および OracleDialect はこれをサポートしていますが、PostgresDialect とデフォルトの JDBCDirect はサポートしていません。

上記翻訳

事前準備

1. Snowflake にてテスト用のテーブルを作成

create or replace TABLE CDATA_01.SCHEMA_1.CDATA_TABLE_01 (
	TIMESTAMP_COL TIMESTAMP_NTZ(9)
);

image.png

2. CData Virtuality にて Snowflake を Sources に登録

image.png

image.png

3. CData Virtuality の jdbc を Databricks のクラスターにインストール

image.png

エラーの再現方法

1. 書き込み用データを保持したデータフレームを作成

from pyspark.sql.functions import current_timestamp

# 書き込み用データを保持したデータフレームを作成
src_df = spark.createDataFrame([(1,)], ["id"]).withColumn("TIMESTAMP_COL", current_timestamp())
src_df = src_df.drop("id")

src_df.display()

2. overwrite の処理がエラーとなることを確認

# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
jdbc_protocol = "mms"
database_port = "45006"
database_name = "datavirtuality"
url = f"jdbc:datavirtuality:{database_name}@{jdbc_protocol}://{database_host}:{database_port}"

# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"

# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
(
    src_df.write.format("jdbc")
    .mode("overwrite")
    .option("driver", driver)
    .option("url", url)
    .option("dbtable", table)
    .option("user", user)
    .option("password", password)
    .save()
)

Py4JJavaError: An error occurred while calling o3043.save.
: org.teiid.jdbc.TeiidSQLException: com.datavirtuality.dv.core.replicator.ReplicatorException: In order to create or drop tables (or create a physical index) in the current data source, it is needed to specify the default schema using the data source parameter importer.defaultSchema.

image.png

3. truncatetrueに設定してもエラーとなることを確認

(
    src_df.write.format("jdbc")
    .mode("overwrite")
    .option("truncate", "true")
    .option("cascadeTruncate", "true")
    .option("driver", driver)
    .option("url", url)
    .option("dbtable", table)
    .option("user", user)
    .option("password", password)
    .save()
)

image.png

エラーへの対応方法

1. テーブルに対する DELETE 処理を実行

%scala
import java.sql.DriverManager
import java.sql.Connection

var connection: Connection = null

try {
  val executed_sql = "DELETE FROM snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"

  val database_host = "aws-ap-east-1.platform.datavirtuality.com"
  val jdbc_protocol = "mms"
  val database_port = "45006"
  val database_name = "datavirtuality"
  val jdbcUsername = "manabina_test_co_jp"
  val jdbcPassword = "2!zzzzzz"
  val jdbcUrl = s"jdbc:datavirtuality:${database_name}@${jdbc_protocol}://${database_host}:${database_port};"

  connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
  val statement = connection.createStatement()
  statement.execute(executed_sql)

} finally {
  if (connection != null) {
    connection.close()
  }
}

image.png

2. append により書き込み処理

# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-northeast-1.platform.datavirtuality.com"
jdbc_protocol = "mms"
database_port = "45006"
database_name = "datavirtuality"
url = f"jdbc:datavirtuality:{database_name}@{jdbc_protocol}://{database_host}:{database_port}"

# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"

# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
(
    src_df.write.format("jdbc")
    .mode("append")
    .option("truncate", "true")
    .option("driver", driver)
    .option("url", url)
    .option("dbtable", table)
    .option("user", user)
    .option("password", password)
    .save()
)

image.png

3. CData Virtuality にてデータが書き込まれたことを確認

SELECT
  "TIMESTAMP_COL"
FROM
  "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01";;

image.png

11
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?