概要
Databricks の High Concurrency クラスターから Unity Catalog のテーブルを参照することが実施できないのですが、Databricks SQL 経由であれば Unity Catalog のテーブルを参照できたため、実行したコードとその結果を共有します。
High Concurrency クラスターから Unity Catalog のテーブルを参照すると次のようなエラーが発生します。
Unity Catalog is not enabled on this cluster.
そこで、JDBC 経由で Databricks SQL に接続して、Unity Catalog のテーブルを参照するようにしました。
コード
Unity Catalog のテーブルの準備
この手順は、Databricks SQL をアタッチして実行する想定です。
%sql
CREATE CATALOG IF NOT EXISTS qiita;
CREATE SCHEMA IF NOT EXISTS qiita.schema_01;
%sql
CREATE OR REPLACE TABLE qiita.schema_01.nation_delta
(
N_NATIONKEY integer COMMENT 'This is a N_NATIONKEY'
,N_NAME string COMMENT 'This is a N_NAME'
,N_REGIONKEY integer COMMENT 'This is a N_REGIONKEY'
,N_COMMENT string COMMENT 'This is a N_COMMENT'
,update_date TIMESTAMP COMMENT 'This is a update_date'
,getnerated_col string GENERATED ALWAYS AS ('default_value') COMMENT 'This is a getnerated_col'
,identity_col BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'This is a identity_col'
)
PARTITIONED BY (
N_REGIONKEY,
update_date
)
COMMENT 'This is a nation_delta'
TBLPROPERTIES (
delta.dataSkippingNumIndexedCols = 1
,delta.logRetentionDuration = 'interval 30 days'
,delta.deletedFileRetentionDuration= 'interval 7 days'
,delta.enableChangeDataFeed = true
,delta.autoOptimize.optimizeWrite = true
,delta.autoOptimize.autoCompact = true
,delta.tuneFileSizesForRewrites = true
)
;
-- Insert into table
INSERT INTO qiita.schema_01.nation_delta
(
N_NATIONKEY
,N_NAME
,N_REGIONKEY
,N_COMMENT
,update_date
)
SELECT
N_NATIONKEY
,N_NAME
,N_REGIONKEY
,N_COMMENT
,CAST(current_timestamp AS date) AS update_date
FROM
read_files(
'dbfs:/databricks-datasets/tpch/data-001/nation/nation.tbl',
format => 'csv',
sep => '|',
schema => '
N_NATIONKEY integer
,N_NAME string
,N_REGIONKEY integer
,N_COMMENT string
')
;
databricks-logocreate_table(Python)
Import Notebook
%sql
CREATE CATALOG IF NOT EXISTS qiita;
CREATE SCHEMA IF NOT EXISTS qiita.schema_01;
OK
%sql
CREATE OR REPLACE TABLE qiita.schema_01.nation_delta
(
N_NATIONKEY integer COMMENT 'This is a N_NATIONKEY'
,N_NAME string COMMENT 'This is a N_NAME'
,N_REGIONKEY integer COMMENT 'This is a N_REGIONKEY'
,N_COMMENT string COMMENT 'This is a N_COMMENT'
,update_date TIMESTAMP COMMENT 'This is a update_date'
,getnerated_col string GENERATED ALWAYS AS ('default_value') COMMENT 'This is a getnerated_col'
,identity_col BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'This is a identity_col'
)
PARTITIONED BY (
N_REGIONKEY,
update_date
)
COMMENT 'This is a nation_delta'
TBLPROPERTIES (
delta.dataSkippingNumIndexedCols = 1
,delta.logRetentionDuration = 'interval 30 days'
,delta.deletedFileRetentionDuration= 'interval 7 days'
,delta.enableChangeDataFeed = true
,delta.autoOptimize.optimizeWrite = true
,delta.autoOptimize.autoCompact = true
,delta.tuneFileSizesForRewrites = true
)
;
-- Insert into table
INSERT INTO qiita.schema_01.nation_delta
(
N_NATIONKEY
,N_NAME
,N_REGIONKEY
,N_COMMENT
,update_date
)
SELECT
N_NATIONKEY
,N_NAME
,N_REGIONKEY
,N_COMMENT
,CAST(current_timestamp AS date) AS update_date
FROM
read_files(
'dbfs:/databricks-datasets/tpch/data-001/nation/nation.tbl',
format => 'csv',
sep => '|',
schema => '
N_NATIONKEY integer
,N_NAME string
,N_REGIONKEY integer
,N_COMMENT string
')
;
Table
num_affected_rows
num_inserted_rows
1
25
25
1 row
%sql
SELECT
*
FROM
qiita.schema_01.nation_delta;
Databricks トークンを取得
トークンを Databricks シークレットに登録
%pip install databricks-sdk --upgrade -q
dbutils.library.restartPython()
from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError
w = WorkspaceClient()
# scope 名をセット
scope_name = 'qiita'
scope_key_and_values = {
"db_token": "dapiaff1ebb04975f4efd08eaaaaaa", # Databricks トークンを設定
}
# scope を作成
try:
w.secrets.create_scope(scope=scope_name)
except DatabricksError:
print(f"Scope `{scope_name}` already exists!")
# シークレットを登録
for key,value in scope_key_and_values.items():
w.secrets.put_secret(
scope=scope_name,
key=key,
string_value=value,
)
JDBC 経由で Databricks SQL に接続してデータフレームを作成
host = "adb-554481832333334.4.azuredatabricks.net" # Sever hostname を設定
httpPath = "/sql/1.0/warehouses/95cac05d1333333" # HTTP path を設定
df = (
spark.read.format("databricks")
.option("host", host)
.option("httpPath", httpPath)
.option("personalAccessToken", dbutils.secrets.get(scope_name,"db_token"))
.option("dbtable", "qiita.schema_01.nation_delta")
.load()
)
df.display()