以下のようにDatabricks SQLでは実行されたクエリーの履歴にアクセスできます。
ただ、これらはGUIでアクセスできる情報であり、これらのデータを取得して分析を行いたいという要件もあることでしょう。そこで、Databricks SDK for Pythonの出番です。
こちらでも説明しているように、PythonからDatabricksのAPIを呼び出すことができます。
今回はこちらのSDKを用いて、クエリーの履歴および実行時間に関するデータを取得してみます。ちなみに、こちらのサンプル集が非常に役立ちました。
注意
2024/2/19時点では最新バージョンでこちらのプログラムはうまく動かず、QueryHistory
のリストにアクセスする際に応答が返ってこなくなってしまいます。バージョンを0.18.0にすることで問題を回避できます。
ライブラリのインストール
ライブラリをインストールします。
%pip install databricks-sdk==0.18.0
dbutils.library.restartPython()
SQLウェアハウスの一覧
SQLウェアハウスの一覧を作成します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for warehouse in w.warehouses.list():
print(f'warehouse {warehouse.name} (SQLウェアハウスID:{warehouse.id}) has {warehouse.num_clusters} clusters')
以下のような一覧(この例では1台しかありませんが)が返ってきます。
warehouse Shared Endpoint (SQLウェアハウスID:2e1a7a349827f40c) has 1 clusters
SQLウェアハウスの情報取得
上で取得したSQLウェアハウスIDを指定します。
warehouse = w.warehouses.get("2e1a7a349827f40c")
以下のようにSQLウェアハウスの情報を取得できます。
GetWarehouseResponse(auto_stop_mins=10, channel=Channel(dbsql_version=None, name=<ChannelName.CHANNEL_NAME_CURRENT: 'CHANNEL_NAME_CURRENT'>), cluster_size='Medium', creator_name='quentin.ambard@databricks.com', enable_photon=True, enable_serverless_compute=True, health=EndpointHealth(details='Warehouse health details are visible only to admins and users with the CAN_MANAGE permission on the warehouse', failure_reason=None, message=None, status=<Status.HEALTHY: 'HEALTHY'>, summary=None), id='2e1a7a349827f40c', instance_profile_arn=None, jdbc_url='jdbc:spark://xxxx.cloud.databricks.com:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/2e1a7a349827f40c;', max_num_clusters=3, min_num_clusters=1, name='Shared Endpoint', num_active_sessions=0, num_clusters=1, odbc_params=OdbcParams(hostname='xxxx.cloud.databricks.com', path='/sql/1.0/warehouses/2e1a7a349827f40c', port=443, protocol='https'), spot_instance_policy=<SpotInstancePolicy.COST_OPTIMIZED: 'COST_OPTIMIZED'>, state=<State.RUNNING: 'RUNNING'>, tags=None, warehouse_type=<GetWarehouseResponseWarehouseType.PRO: 'PRO'>)
現時点でのSQLウェアハウスのクラスター数は以下のようにして取得します。
# 現時点でのSQLウェアハウスのクラスター数
warehouse.num_clusters
1
その他、取得できる情報に関してはこちらをご覧ください。
クエリー履歴の一覧
以下のAPIを活用します。
- クエリー履歴API: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/query_history.html#databricks.sdk.service.sql.QueryHistoryAPI
- 履歴におけるクエリーのフィルタリング: https://databricks-sdk-py.readthedocs.io/en/latest/dbdataclasses/sql.html#databricks.sdk.service.sql.QueryFilter
- クエリーのメトリクス: https://databricks-sdk-py.readthedocs.io/en/latest/dbdataclasses/sql.html#databricks.sdk.service.sql.QueryMetrics
以下の例ではQueryFilter
で履歴をフィルタリングしています。必要に応じて修正ください。
from databricks.sdk.service import sql
# データフレームのスキーマ
queryColumns = [
"statement_id",
"query_text",
"total_process_time_ms",
"compilation_time_ms",
"queue_waiting_time_ms",
"execution_time_ms",
]
queryHistory = []
for query_history_item in w.query_history.list(
filter_by=sql.QueryFilter(warehouse_ids=["2e1a7a349827f40c"], user_ids=["7459477216523290"]), # SQLウェアハウスIDとユーザーIDでのフィルタリング
include_metrics=True, # メトリクスを含める
):
queue_waiting_time_ms = 0
# キュー待ち時間の計算
if (
query_history_item.metrics.overloading_queue_start_timestamp is None
and query_history_item.metrics.provisioning_queue_start_timestamp is None
):
queue_waiting_time_ms = 0
elif (
query_history_item.metrics.overloading_queue_start_timestamp is not None
and query_history_item.metrics.provisioning_queue_start_timestamp is None
):
queue_waiting_time_ms = (
query_history_item.metrics.query_compilation_start_timestamp
- query_history_item.metrics.overloading_queue_start_timestamp
)
elif (
query_history_item.metrics.overloading_queue_start_timestamp is None
and query_history_item.metrics.provisioning_queue_start_timestamp is not None
):
queue_waiting_time_ms = (
query_history_item.metrics.query_compilation_start_timestamp
- query_history_item.metrics.provisioning_queue_start_timestamp
)
total_process_time_ms = (
query_history_item.execution_end_time_ms
- query_history_item.query_start_time_ms
)
print(
# query_history_item.endpoint_id,
# query_history_item.query_start_time_ms,
# query_history_item.execution_end_time_ms,
"ステートメントID",
query_history_item.query_id,
"クエリー",
query_history_item.query_text,
"処理時間合計(ms)",
total_process_time_ms,
"クエリー最適化時間(ms)",
query_history_item.metrics.compilation_time_ms,
"キュー待機時間(ms)",
queue_waiting_time_ms,
"クエリー実行時間(ms)",
query_history_item.metrics.execution_time_ms,
)
queryHistory.append(
(
query_history_item.query_id,
query_history_item.query_text,
total_process_time_ms,
query_history_item.metrics.compilation_time_ms,
queue_waiting_time_ms,
query_history_item.metrics.execution_time_ms,
)
)
queryHistoryDF = spark.createDataFrame(data=queryHistory, schema = queryColumns)
display(queryHistoryDF)
以下のようにデータフレームとしてデータを処理できるようになりました。
取得したいメトリクスに応じて上記コードを修正ください。メトリクスの詳細はこちらにあります。