13
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CData SoftwareAdvent Calendar 2024

Day 4

CData Virtuality に対するデータ操作を Databricks で実施する方法

Last updated at Posted at 2024-11-28

概要

Databricks (Spark) から jdbc にて CData Virtuality に接続して下記のデータ操作方法を説明します。3 つの処理パターンを 2 つの JDBC コネクター(CData Virtuality JDBC Driver と PostgreSQL JDBC Driver)を利用して実施する方法を紹介します。

  1. 処理パターン
    1. CData Virtuality からデータ取得
    2. CData Virtuality へのデータ書き込み
    3. SQL 実行
  2. 利用する JDBC コネクター
    1. CData Virtuality JDBC Driver
    2. PostgreSQL JDBC Driver

CData Virtuality は PostgreSQL プロトコルをサポートしています。2024年11月28日時点での検証では、データの読み込み処理は正常に動作しましたが、書き込み処理についてはエラーが発生することを確認しています。

image.png

It uses the PostgreSQL protocol and has been specially designed for heavily multi-threaded applications.

引用元:Python

PostgreSQL プロトコルを使用し、マルチスレッド処理を重視したアプリケーション向けに特別に設計されています。

上記の翻訳

JDBC Driver に関する情報

認証に利用するユーザーアカウントについて

CData Virtuality のアカウントがメールアドレスの場合には、@._に変更する必要があります。manabian@test.comというメールアドレスの場合には、manabian_test_comのようなユーザーアカウントで認証を実施します。CData Virtuality の Preferences にて確認できます。

image.png

CData Virtuality JDBC Driver について

接続方法については、 CData 社のブログにて詳細が記載されています。本記事では SaaS版での検証を行っているため後者のパターンを実施しています。

image.png

引用元:CData Virtuality - JDBCでの接続

image.png

引用元:CData Virtuality - JDBCでの接続

jdbc に渡すパラメータについては下記のドキュメントに記載されています。

image.png

引用元:JDBC Driver Installation and Connection

Spark Dataframe にて jdbc によるデータ操作を行う際のパラメータについて下記のドキュメントに記載されております。

image.png

引用元:JDBC To Other Databases - Spark 3.5.1 Documentation

事前準備

1. CData Virtuality JDBC Driver を取得

下記のサイトから JDBC ドライバーをダウンロードします。

image.png

引用元:CData Virtuality | ODBC & JDBC Drivers

2. Snowflake にてテスト用のテーブルを作成

create or replace TABLE CDATA_01.SCHEMA_1.CDATA_TABLE_01 (
	TIMESTAMP_COL TIMESTAMP_NTZ(9)
);

image.png

3. CData Virtuality にて Snowflake を Sources に登録

image.png

image.png

4. CData Virtuality の jdbc を Databricks のクラスターにインストール

image.png

データ操作方法

1-1. CData Virtuality JDBC Driver により CData Virtuality からデータ取得

# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
jdbc_protocol = "mms"
database_port = "45006"
database_name = "datavirtuality"
url = f"jdbc:datavirtuality:{database_name}@{jdbc_protocol}://{database_host}:{database_port}"

# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"

# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
dv_df = (
    spark.read.format("jdbc")
    .option("driver", driver)
    .option("url", url)
    .option("dbtable", table)
    .option("user", user)
    .option("password", password)
    .load()
)
dv_df.limit(10).display()

1-2. CData Virtuality JDBC Driver により CData Virtuality へのデータ書き込み

append モードで書き込むことが可能です。

from pyspark.sql.functions import current_timestamp

src_df = spark.createDataFrame([(1,)], ["id"]).withColumn("TIMESTAMP_COL", current_timestamp())
src_df = src_df.drop("id")

src_df.display()
# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
jdbc_protocol = "mms"
database_port = "45006"
database_name = "datavirtuality"
url = f"jdbc:datavirtuality:{database_name}@{jdbc_protocol}://{database_host}:{database_port}"

# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"

# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
(
    src_df.write.format("jdbc")
    .mode("append")
    .option("truncate", "true")
    .option("driver", driver)
    .option("url", url)
    .option("dbtable", table)
    .option("user", user)
    .option("password", password)
    .save()
)

image.png

overwrite モードで書き込むことは実施できないため、下記記事で紹介している方法を利用してデータを書き込むことができます。

image.png

引用元:CData Virtuality に対して Spark DataFrame にて overwrite により書き込む際のエラーへの対応方法 #Python - Qiita

1-3. CData Virtuality JDBC Driver により SQL 実行

%scala
import java.sql.DriverManager
import java.sql.Connection

var connection: Connection = null

try {
  val executed_sql = "INSERT INTO snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01 SELECT NOW();;"

  val database_host = "aws-ap-east-1.platform.datavirtuality.com"
  val jdbc_protocol = "mms"
  val database_port = "45006"
  val database_name = "datavirtuality"
  val jdbcUsername = "manabina_test_co_jp"
  val jdbcPassword = "2!zzzzzz"
  val jdbcUrl = s"jdbc:datavirtuality:${database_name}@${jdbc_protocol}://${database_host}:${database_port};"

  connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
  val statement = connection.createStatement()
  statement.execute(executed_sql)

} finally {
  if (connection != null) {
    connection.close()
  }
}

image.png

2-1. PostgreSQL JDBC Driver により CData Virtuality からデータ取得

# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
database_port = "45007"
database_name = "datavirtuality"

# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"

# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
dv_df = (
    spark.read.format("postgresql")
    .option("dbtable", table)
    .option("host", database_host)
    .option("port", database_port)
    .option("database", database_name)
    .option("user", user)
    .option("password", password)
    .load()
)

dv_df.display()

image.png

2-2. PostgreSQL JDBC Driver により CData Virtuality へのデータ書き込み

2024年11月28日時点ではエラーとなってしまいました。

2-3. PostgreSQL JDBC Driver により SQL 実行

%scala
import java.sql.DriverManager
import java.sql.Connection

var connection: Connection = null

try {
  val executed_sql = "INSERT INTO snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01 SELECT NOW();;"

  val database_host = "aws-ap-northeast-1.platform.datavirtuality.com"
  val database_port = "45007"
  val database_name = "datavirtuality"
  val jdbcUsername = "manabina_test_co_jp"
  val jdbcPassword = "2!zzzzzz"
  val jdbcUrl = s"jdbc:postgresql://${database_host}:${database_port}/${database_name}"
  connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
  val statement = connection.createStatement()
  statement.execute(executed_sql)

} finally {
  if (connection != null) {
    connection.close()
  }
}

image.png

補足

1. PostgreSQL JDBC Driver により Spark Dataframe から書き込み処理を実行するとエラーが発生

from pyspark.sql.functions import current_timestamp

src_df = spark.createDataFrame([(1,)], ["id"]).withColumn("TIMESTAMP_COL", current_timestamp())
src_df = src_df.drop("id")

src_df.display()

image.png

# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
database_port = "45007"
database_name = "datavirtuality"

# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"

# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"

image.png

(
    src_df.write.format("postgresql")
    .mode("append")
    .option("dbtable", table)
    .option("host", database_host)
    .option("port", database_port)
    .option("database", database_name)
    .option("user", user)
    .option("password", password)
    .save()
)

Py4JJavaError: An error occurred while calling o531.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 36.0 failed 4 times, most recent failure: Lost task 3.3 in stage 36.0 (TID 97) (10.139.64.10 executor 0): org.postgresql.util.PSQLException: ERROR: TEIID31100 Parsing error: Encountered "[]SET[] SESSION CHARACTERISTICS" at line 1, column 1.

image.png

(
    src_df.write.format("postgresql")
    .mode("overwrite")
    .option("truncate", "true")
    .option("dbtable", table)
    .option("host", database_host)
    .option("port", database_port)
    .option("database", database_name)
    .option("user", user)
    .option("password", password)
    .save()
)

Py4JJavaError: An error occurred while calling o546.save.
: org.postgresql.util.PSQLException: ERROR: TEIID31100 Parsing error: Encountered "[]TRUNCATE[] TABLE ONLY" at line 1, column 1.
Was expecting: | "begin" | "{"

image.png

13
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?