1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks の High Concurrency クラスターから Unity Catalog のテーブルをDatabricks SQL 経由で参照する方法

Last updated at Posted at 2023-10-02

概要

Databricks の High Concurrency クラスターから Unity Catalog のテーブルを参照することが実施できないのですが、Databricks SQL 経由であれば Unity Catalog のテーブルを参照できたため、実行したコードとその結果を共有します。

High Concurrency クラスターから Unity Catalog のテーブルを参照すると次のようなエラーが発生します。

Unity Catalog is not enabled on this cluster.

image.png

そこで、JDBC 経由で Databricks SQL に接続して、Unity Catalog のテーブルを参照するようにしました。

image.png

コード

Unity Catalog のテーブルの準備

この手順は、Databricks SQL をアタッチして実行する想定です。

%sql
CREATE CATALOG IF NOT EXISTS qiita;
CREATE SCHEMA IF NOT EXISTS qiita.schema_01;
%sql
CREATE OR REPLACE TABLE qiita.schema_01.nation_delta
(
  N_NATIONKEY integer COMMENT 'This is a N_NATIONKEY'
  ,N_NAME string COMMENT 'This is a N_NAME'
  ,N_REGIONKEY integer COMMENT 'This is a N_REGIONKEY'
  ,N_COMMENT string COMMENT 'This is a N_COMMENT'
  ,update_date TIMESTAMP COMMENT 'This is a update_date'
  ,getnerated_col string GENERATED ALWAYS AS ('default_value') COMMENT 'This is a getnerated_col'
  ,identity_col BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'This is a identity_col'
)
PARTITIONED BY (
  N_REGIONKEY,
  update_date
)
COMMENT 'This is a nation_delta'
TBLPROPERTIES (
  delta.dataSkippingNumIndexedCols = 1
  ,delta.logRetentionDuration = 'interval 30 days'
  ,delta.deletedFileRetentionDuration= 'interval 7 days'
  ,delta.enableChangeDataFeed = true
  ,delta.autoOptimize.optimizeWrite = true
  ,delta.autoOptimize.autoCompact = true
  ,delta.tuneFileSizesForRewrites = true
)
;
 
-- Insert into table
INSERT INTO qiita.schema_01.nation_delta
(
  N_NATIONKEY
  ,N_NAME
  ,N_REGIONKEY
  ,N_COMMENT
  ,update_date
)
SELECT
  N_NATIONKEY
  ,N_NAME
  ,N_REGIONKEY 
  ,N_COMMENT
  ,CAST(current_timestamp AS date) AS update_date
  FROM
    read_files(
      'dbfs:/databricks-datasets/tpch/data-001/nation/nation.tbl',
      format => 'csv',
      sep => '|',
      schema => '
        N_NATIONKEY  integer
        ,N_NAME       string
        ,N_REGIONKEY  integer 
        ,N_COMMENT    string
      ')
;
databricks-logocreate_table(Python)
 Import Notebook
%sql
CREATE CATALOG IF NOT EXISTS qiita;
CREATE SCHEMA IF NOT EXISTS qiita.schema_01;
OK
%sql
CREATE OR REPLACE TABLE qiita.schema_01.nation_delta
(
  N_NATIONKEY integer COMMENT 'This is a N_NATIONKEY'
  ,N_NAME string COMMENT 'This is a N_NAME'
  ,N_REGIONKEY integer COMMENT 'This is a N_REGIONKEY'
  ,N_COMMENT string COMMENT 'This is a N_COMMENT'
  ,update_date TIMESTAMP COMMENT 'This is a update_date'
  ,getnerated_col string GENERATED ALWAYS AS ('default_value') COMMENT 'This is a getnerated_col'
  ,identity_col BIGINT GENERATED ALWAYS AS IDENTITY COMMENT 'This is a identity_col'
)
PARTITIONED BY (
  N_REGIONKEY,
  update_date
)
COMMENT 'This is a nation_delta'
TBLPROPERTIES (
  delta.dataSkippingNumIndexedCols = 1
  ,delta.logRetentionDuration = 'interval 30 days'
  ,delta.deletedFileRetentionDuration= 'interval 7 days'
  ,delta.enableChangeDataFeed = true
  ,delta.autoOptimize.optimizeWrite = true
  ,delta.autoOptimize.autoCompact = true
  ,delta.tuneFileSizesForRewrites = true
)
;
 
-- Insert into table
INSERT INTO qiita.schema_01.nation_delta
(
  N_NATIONKEY
  ,N_NAME
  ,N_REGIONKEY
  ,N_COMMENT
  ,update_date
)
SELECT
  N_NATIONKEY
  ,N_NAME
  ,N_REGIONKEY 
  ,N_COMMENT
  ,CAST(current_timestamp AS date) AS update_date
  FROM
    read_files(
      'dbfs:/databricks-datasets/tpch/data-001/nation/nation.tbl',
      format => 'csv',
      sep => '|',
      schema => '
        N_NATIONKEY  integer
        ,N_NAME       string
        ,N_REGIONKEY  integer 
        ,N_COMMENT    string
      ')
;
Table
 
num_affected_rows
num_inserted_rows
1
25
25
1 row
%sql
SELECT 
  *
  FROM
    qiita.schema_01.nation_delta;

image.png

Databricks トークンを取得

image.png

トークンを Databricks シークレットに登録

%pip install databricks-sdk --upgrade -q
dbutils.library.restartPython()
from databricks.sdk import WorkspaceClient
from databricks.sdk.core import DatabricksError

w = WorkspaceClient()
# scope 名をセット
scope_name = 'qiita'
scope_key_and_values = {
    "db_token": "dapiaff1ebb04975f4efd08eaaaaaa", # Databricks トークンを設定
}
# scope を作成
try:
    w.secrets.create_scope(scope=scope_name)
except DatabricksError:
    print(f"Scope `{scope_name}` already exists!")
 
# シークレットを登録
for key,value in scope_key_and_values.items():
    w.secrets.put_secret(
        scope=scope_name,
        key=key,
        string_value=value,
    )

image.png

JDBC 経由で Databricks SQL に接続してデータフレームを作成

host = "adb-554481832333334.4.azuredatabricks.net" # Sever hostname を設定
httpPath = "/sql/1.0/warehouses/95cac05d1333333" # HTTP path を設定
df = (
    spark.read.format("databricks")
    .option("host", host)
    .option("httpPath", httpPath)
    .option("personalAccessToken", dbutils.secrets.get(scope_name,"db_token"))
    .option("dbtable", "qiita.schema_01.nation_delta")
    .load()
)
df.display()

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?