Databricksではクラスタのオートスケール機能やオートスリープ機能など、インフラコストを効率化する機能が標準で提供されています。
一方で、標準のクラスタ管理機能に加えて細かい制御を実施してたい場合もあります。例えば、Notebook上でストリーミング処理(ユーザーが停止まで動き続ける処理)のテストを実施をしていて、不意に処理を止め忘れているケースでは、クラスタも継続的に起動し続けます。起動している全てのクラスタを監視して、起動時間(uptime)が不自然に長いクラスタを発見できれば問題を解消できます。
Databricksでは、クラスタ管理のAPIを提供されているため、上記のような細かい制御も可能になっています。
クラスタを監視して、設定した閾値よりも長く起動しているクラスタをリストするスクリプトは以下のように構成できます。
import requests
import json, datetime
from urllib.parse import urljoin
WORKSPACE = 'https://xxxxxxxxxxxxx.cloud.databricks.com'
TOKEN = 'dapixxxxxxxxxxxxxxxxxxxxxxxxx'
UPTTIME_THRESHOLD = 36000 #36000 seconds = 600 min = 10 hours
url = urljoin(WORKSPACE, '/api/2.0/clusters/list')
headers={'Authorization': f'Bearer {TOKEN}' }
ret = requests.get(url, headers=headers)
if ret.status_code == 200:
long_runs = []
for cluster in ret.json()['clusters']:
if cluster['state'] == 'RUNNING':
# scan the uptime
start_time = datetime.datetime.fromtimestamp( cluster['start_time']/1000)
now = datetime.datetime.now()
uptime = now - start_time
if uptime > datetime.timedelta(seconds=UPTTIME_THRESHOLD):
long_runs.append( (cluster['cluster_name'], cluster['cluster_id'], str(uptime)) )
### output ###
print(f'UPTTIME_THRESHOLD: {UPTTIME_THRESHOLD} sec')
print('')
print('cluster_name, cluser_id, uptime(sec)')
print('------------------------------------')
for cluster in long_runs:
print( ', '.join(cluster) )
else:
print(f'Status: {ret.status_code}')
print(ret.text)
(注意: 簡単のため、セキュリティ・エラー処理は省いています。)
実行結果の出力例
UPTTIME_THRESHOLD: 36000 sec
cluster_name, cluser_id, uptime(sec)
------------------------------------
mlflow-cluster123, 0111-120999-wi0qkt9q, 21 days, 11:40:57.331791
testing, 0119-089999-u0oq54ru, 13 days, 15:16:51.334819
上記をベースに定義機変更を加えて(例えばAWS SNSなどの通知システムとの連携など)、定期的に実行することで、不用意に起動しているクラスタのシンプルな監視することができるようになります。
参考