LoginSignup
0
0

Databricks SDK for Pythonを用いてSQLウェアハウスの情報を取得する

Last updated at Posted at 2024-01-09

以下のようにDatabricks SQLでは実行されたクエリーの履歴にアクセスできます。

Screenshot 2024-01-09 at 15.51.25.png

さらに、処理時間に関する詳細情報を取得することもできます。
Screenshot 2024-01-09 at 15.51.37.png

ただ、これらはGUIでアクセスできる情報であり、これらのデータを取得して分析を行いたいという要件もあることでしょう。そこで、Databricks SDK for Pythonの出番です。

こちらでも説明しているように、PythonからDatabricksのAPIを呼び出すことができます。

今回はこちらのSDKを用いて、クエリーの履歴および実行時間に関するデータを取得してみます。ちなみに、こちらのサンプル集が非常に役立ちました。

注意
2024/2/19時点では最新バージョンでこちらのプログラムはうまく動かず、QueryHistoryのリストにアクセスする際に応答が返ってこなくなってしまいます。バージョンを0.18.0にすることで問題を回避できます。

ライブラリのインストール

ライブラリをインストールします。

%pip install databricks-sdk==0.18.0
dbutils.library.restartPython()

SQLウェアハウスの一覧

SQLウェアハウスの一覧を作成します。

from databricks.sdk import WorkspaceClient

w = WorkspaceClient()

for warehouse in w.warehouses.list():
    print(f'warehouse {warehouse.name} (SQLウェアハウスID:{warehouse.id}) has {warehouse.num_clusters} clusters')

以下のような一覧(この例では1台しかありませんが)が返ってきます。

warehouse Shared Endpoint (SQLウェアハウスID:2e1a7a349827f40c) has 1 clusters

SQLウェアハウスの情報取得

上で取得したSQLウェアハウスIDを指定します。

warehouse = w.warehouses.get("2e1a7a349827f40c")

以下のようにSQLウェアハウスの情報を取得できます。

GetWarehouseResponse(auto_stop_mins=10, channel=Channel(dbsql_version=None, name=<ChannelName.CHANNEL_NAME_CURRENT: 'CHANNEL_NAME_CURRENT'>), cluster_size='Medium', creator_name='quentin.ambard@databricks.com', enable_photon=True, enable_serverless_compute=True, health=EndpointHealth(details='Warehouse health details are visible only to admins and users with the CAN_MANAGE permission on the warehouse', failure_reason=None, message=None, status=<Status.HEALTHY: 'HEALTHY'>, summary=None), id='2e1a7a349827f40c', instance_profile_arn=None, jdbc_url='jdbc:spark://xxxx.cloud.databricks.com:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/2e1a7a349827f40c;', max_num_clusters=3, min_num_clusters=1, name='Shared Endpoint', num_active_sessions=0, num_clusters=1, odbc_params=OdbcParams(hostname='xxxx.cloud.databricks.com', path='/sql/1.0/warehouses/2e1a7a349827f40c', port=443, protocol='https'), spot_instance_policy=<SpotInstancePolicy.COST_OPTIMIZED: 'COST_OPTIMIZED'>, state=<State.RUNNING: 'RUNNING'>, tags=None, warehouse_type=<GetWarehouseResponseWarehouseType.PRO: 'PRO'>)

現時点でのSQLウェアハウスのクラスター数は以下のようにして取得します。

# 現時点でのSQLウェアハウスのクラスター数

warehouse.num_clusters
1

その他、取得できる情報に関してはこちらをご覧ください。

クエリー履歴の一覧

以下のAPIを活用します。

以下の例ではQueryFilterで履歴をフィルタリングしています。必要に応じて修正ください。

from databricks.sdk.service import sql

# データフレームのスキーマ
queryColumns = [
    "statement_id",
    "query_text",
    "total_process_time_ms",
    "compilation_time_ms",
    "queue_waiting_time_ms",
    "execution_time_ms",
]
queryHistory = []

for query_history_item in w.query_history.list(
    filter_by=sql.QueryFilter(warehouse_ids=["2e1a7a349827f40c"], user_ids=["7459477216523290"]), # SQLウェアハウスIDとユーザーIDでのフィルタリング
  
    include_metrics=True,  # メトリクスを含める
):
    queue_waiting_time_ms = 0

    # キュー待ち時間の計算
    if (
        query_history_item.metrics.overloading_queue_start_timestamp is None
        and query_history_item.metrics.provisioning_queue_start_timestamp is None
    ):
        queue_waiting_time_ms = 0
    elif (
        query_history_item.metrics.overloading_queue_start_timestamp is not None
        and query_history_item.metrics.provisioning_queue_start_timestamp is None
    ):
        queue_waiting_time_ms = (
            query_history_item.metrics.query_compilation_start_timestamp
            - query_history_item.metrics.overloading_queue_start_timestamp
        )
    elif (
        query_history_item.metrics.overloading_queue_start_timestamp is None
        and query_history_item.metrics.provisioning_queue_start_timestamp is not None
    ):
        queue_waiting_time_ms = (
            query_history_item.metrics.query_compilation_start_timestamp
            - query_history_item.metrics.provisioning_queue_start_timestamp
        )

    total_process_time_ms = (
        query_history_item.execution_end_time_ms
        - query_history_item.query_start_time_ms
    )

    print(
        # query_history_item.endpoint_id,
        # query_history_item.query_start_time_ms,
        # query_history_item.execution_end_time_ms,
        "ステートメントID",
        query_history_item.query_id,
        "クエリー",
        query_history_item.query_text,
        "処理時間合計(ms)",
        total_process_time_ms,
        "クエリー最適化時間(ms)",
        query_history_item.metrics.compilation_time_ms,
        "キュー待機時間(ms)",
        queue_waiting_time_ms,
        "クエリー実行時間(ms)",
        query_history_item.metrics.execution_time_ms,
    )
    queryHistory.append(
        (
            query_history_item.query_id,
            query_history_item.query_text,
            total_process_time_ms,
            query_history_item.metrics.compilation_time_ms,
            queue_waiting_time_ms,
            query_history_item.metrics.execution_time_ms,
        )
    )

queryHistoryDF = spark.createDataFrame(data=queryHistory, schema = queryColumns)
display(queryHistoryDF)

以下のようにデータフレームとしてデータを処理できるようになりました。
Screenshot 2024-01-09 at 16.07.13.png

可視化も簡単です。
Screenshot 2024-01-09 at 16.09.36.png
Screenshot 2024-01-09 at 16.09.24.png

取得したいメトリクスに応じて上記コードを修正ください。メトリクスの詳細はこちらにあります。

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0