概要
Databricks (Spark) から jdbc にて CData Virtuality に接続して下記のデータ操作方法を説明します。3 つの処理パターンを 2 つの JDBC コネクター(CData Virtuality JDBC Driver と PostgreSQL JDBC Driver)を利用して実施する方法を紹介します。
- 処理パターン
- CData Virtuality からデータ取得
- CData Virtuality へのデータ書き込み
- SQL 実行
- 利用する JDBC コネクター
- CData Virtuality JDBC Driver
- PostgreSQL JDBC Driver
CData Virtuality は PostgreSQL プロトコルをサポートしています。2024年11月28日時点での検証では、データの読み込み処理は正常に動作しましたが、書き込み処理についてはエラーが発生することを確認しています。
It uses the PostgreSQL protocol and has been specially designed for heavily multi-threaded applications.
引用元:Python
PostgreSQL プロトコルを使用し、マルチスレッド処理を重視したアプリケーション向けに特別に設計されています。
上記の翻訳
JDBC Driver に関する情報
認証に利用するユーザーアカウントについて
CData Virtuality のアカウントがメールアドレスの場合には、@
や.
が_
に変更する必要があります。manabian@test.com
というメールアドレスの場合には、manabian_test_com
のようなユーザーアカウントで認証を実施します。CData Virtuality の Preferences にて確認できます。
CData Virtuality JDBC Driver について
接続方法については、 CData 社のブログにて詳細が記載されています。本記事では SaaS版での検証を行っているため後者のパターンを実施しています。
引用元:CData Virtuality - JDBCでの接続
引用元:CData Virtuality - JDBCでの接続
jdbc に渡すパラメータについては下記のドキュメントに記載されています。
引用元:JDBC Driver Installation and Connection
Spark Dataframe にて jdbc によるデータ操作を行う際のパラメータについて下記のドキュメントに記載されております。
引用元:JDBC To Other Databases - Spark 3.5.1 Documentation
事前準備
1. CData Virtuality JDBC Driver を取得
下記のサイトから JDBC ドライバーをダウンロードします。
引用元:CData Virtuality | ODBC & JDBC Drivers
2. Snowflake にてテスト用のテーブルを作成
create or replace TABLE CDATA_01.SCHEMA_1.CDATA_TABLE_01 (
TIMESTAMP_COL TIMESTAMP_NTZ(9)
);
3. CData Virtuality にて Snowflake を Sources に登録
4. CData Virtuality の jdbc を Databricks のクラスターにインストール
データ操作方法
1-1. CData Virtuality JDBC Driver により CData Virtuality からデータ取得
# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
jdbc_protocol = "mms"
database_port = "45006"
database_name = "datavirtuality"
url = f"jdbc:datavirtuality:{database_name}@{jdbc_protocol}://{database_host}:{database_port}"
# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"
# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
dv_df = (
spark.read.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
)
dv_df.limit(10).display()
1-2. CData Virtuality JDBC Driver により CData Virtuality へのデータ書き込み
append モードで書き込むことが可能です。
from pyspark.sql.functions import current_timestamp
src_df = spark.createDataFrame([(1,)], ["id"]).withColumn("TIMESTAMP_COL", current_timestamp())
src_df = src_df.drop("id")
src_df.display()
# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
jdbc_protocol = "mms"
database_port = "45006"
database_name = "datavirtuality"
url = f"jdbc:datavirtuality:{database_name}@{jdbc_protocol}://{database_host}:{database_port}"
# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"
# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
(
src_df.write.format("jdbc")
.mode("append")
.option("truncate", "true")
.option("driver", driver)
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.save()
)
overwrite モードで書き込むことは実施できないため、下記記事で紹介している方法を利用してデータを書き込むことができます。
引用元:CData Virtuality に対して Spark DataFrame にて overwrite により書き込む際のエラーへの対応方法 #Python - Qiita
1-3. CData Virtuality JDBC Driver により SQL 実行
%scala
import java.sql.DriverManager
import java.sql.Connection
var connection: Connection = null
try {
val executed_sql = "INSERT INTO snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01 SELECT NOW();;"
val database_host = "aws-ap-east-1.platform.datavirtuality.com"
val jdbc_protocol = "mms"
val database_port = "45006"
val database_name = "datavirtuality"
val jdbcUsername = "manabina_test_co_jp"
val jdbcPassword = "2!zzzzzz"
val jdbcUrl = s"jdbc:datavirtuality:${database_name}@${jdbc_protocol}://${database_host}:${database_port};"
connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val statement = connection.createStatement()
statement.execute(executed_sql)
} finally {
if (connection != null) {
connection.close()
}
}
2-1. PostgreSQL JDBC Driver により CData Virtuality からデータ取得
# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
database_port = "45007"
database_name = "datavirtuality"
# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"
# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
dv_df = (
spark.read.format("postgresql")
.option("dbtable", table)
.option("host", database_host)
.option("port", database_port)
.option("database", database_name)
.option("user", user)
.option("password", password)
.load()
)
dv_df.display()
2-2. PostgreSQL JDBC Driver により CData Virtuality へのデータ書き込み
2024年11月28日時点ではエラーとなってしまいました。
2-3. PostgreSQL JDBC Driver により SQL 実行
%scala
import java.sql.DriverManager
import java.sql.Connection
var connection: Connection = null
try {
val executed_sql = "INSERT INTO snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01 SELECT NOW();;"
val database_host = "aws-ap-northeast-1.platform.datavirtuality.com"
val database_port = "45007"
val database_name = "datavirtuality"
val jdbcUsername = "manabina_test_co_jp"
val jdbcPassword = "2!zzzzzz"
val jdbcUrl = s"jdbc:postgresql://${database_host}:${database_port}/${database_name}"
connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val statement = connection.createStatement()
statement.execute(executed_sql)
} finally {
if (connection != null) {
connection.close()
}
}
補足
1. PostgreSQL JDBC Driver により Spark Dataframe から書き込み処理を実行するとエラーが発生
from pyspark.sql.functions import current_timestamp
src_df = spark.createDataFrame([(1,)], ["id"]).withColumn("TIMESTAMP_COL", current_timestamp())
src_df = src_df.drop("id")
src_df.display()
# CData Virtualtiy への接続情報を指定
driver = "com.datavirtuality.dv.jdbc.Driver"
database_host = "aws-ap-east-1.platform.datavirtuality.com"
database_port = "45007"
database_name = "datavirtuality"
# ユーザー名とパスワードを指定
user = "manabina_test_co_jp"
password = "2!zzzzzz"
# データ取得元テーブル名の指定
table = "snowflake_01.CDATA_01.SCHEMA_1.CDATA_TABLE_01"
(
src_df.write.format("postgresql")
.mode("append")
.option("dbtable", table)
.option("host", database_host)
.option("port", database_port)
.option("database", database_name)
.option("user", user)
.option("password", password)
.save()
)
Py4JJavaError: An error occurred while calling o531.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 36.0 failed 4 times, most recent failure: Lost task 3.3 in stage 36.0 (TID 97) (10.139.64.10 executor 0): org.postgresql.util.PSQLException: ERROR: TEIID31100 Parsing error: Encountered "[]SET[] SESSION CHARACTERISTICS" at line 1, column 1.
(
src_df.write.format("postgresql")
.mode("overwrite")
.option("truncate", "true")
.option("dbtable", table)
.option("host", database_host)
.option("port", database_port)
.option("database", database_name)
.option("user", user)
.option("password", password)
.save()
)
Py4JJavaError: An error occurred while calling o546.save.
: org.postgresql.util.PSQLException: ERROR: TEIID31100 Parsing error: Encountered "[]TRUNCATE[] TABLE ONLY" at line 1, column 1.
Was expecting: | "begin" | "{"