Watson Studio CloudでJupyter Notebookをジョブ化できることを以下の記事で紹介しました。
https://qiita.com/kawada2017/items/fd802ec9c0b5c5a55e47
この機能はスケジュールもついていて気軽につかえて便利なのですが、簡易的なものであるため、失敗したときなどに通知する機能がなかったり、祝日カレンダーなどもありません。起動トリガーも時間だけなので、他システムのジョブの成功の後に実行するような連携もやりやすくはありません。
本格的に自動運用をシステム化する場合にはIBM Workload SchedulerやJP1などのジョブ・スケジューラーから呼び出したくなります。
そこで、Jupyter Notebookジョブは外部のプログラムからWatson Data APIでも呼び出す機能を使います。
@ttsuzuku さんのCloud Pak for DataでJupyter NotebookジョブをAPI実行する以下の記事を参考に、Watson Studio CloudでAPIでの実行方法をご紹介します。
Cloud Pak for Dataの分析プロジェクトのJobを環境変数付きでAPI実行する - Qiita
https://qiita.com/ttsuzuku/items/3a2267f90c9c184cf07a
概要
外部プログラムからはIBM CloudのAPI KEYでアクセスして、トークンを取得します。その後ジョブを起動します。するとジョブからNotebookが呼び出され、そのNotebookからデータ資産への書き出しを行います。
ジョブの作成
最初にデータ資産への書き出しを行う簡単なサンプルジョブを作成します。
このサンプルではprojectオブジェクトをつかってデータ資産を登録したいので、Projectのトークンを作ります。
設定タブのなかでアクセス・トークンの「新規作成+」をクリックしてください。
任意の名前を設定し、プロジェクトのアクセス・ロールとしてエディターを選び、作成をクリックしてください。
資産タブに移り、「プロジェクトに追加+」をクリックし、Notebookをクリックして、新規Notebookを作成します。
「URLから」を選び、名前に「Jobtest」と入力します。NotebookURLには以下を入力してください。
https://github.com/hkwd/200616WSCJobExecution/blob/master/Jobtest_pub.ipynb
入力ができたら作成ボタンをクリックしてください。
Notebookが読み込まれたら、プロジェクト・トークンの挿入をクリックしてください。
以下のようにプロジェクト・トークンを作成するコードが自動的に挿入されます。
全てのセルを実行すると、以下のようにjobtestout.csvというファイルにタイムスタンプと環境変数が書き出されます。
FileメニューからSave Versionでバージョンを保存してください。ジョブとして実行するためには必ずバージョンが保存されている必要があります。
プロジェクトの資産のタブに戻り、jobtestout.csvをクリックして開いてください。
プロジェクトのジョブタブに移り、「新規ジョブ+」をクリックして、ジョブを作成します。
ジョブ名にJobTestJobと任意で説明を入力し、「資産の選択」をクリックしてください。
少し待つと実行が完了します。
このNotebookジョブをAPIから実行してみたいと思います。
API keyの取得
APIから実行するためには、以下からIBM CloudのAPI Keyを作成します。「IBM Cloud APIキーの作成」ボタンを押してください。
https://cloud.ibm.com/iam/apikeys
ちなみに一覧の中には以下のようなWatson Studio Jobsをスケジュール実行するためのAPI Keyがありました。Watson Studio内のスケジューラーでも同じように使われているようです。なお、このAPI Keyを使いまわすことはできません。API Keyは生成時にしか表示されないためです。
IBMid-50H9DGS2KP - Jobs api delegate key - do not delete
Created by Watson Studio Jobs to run scheduled jobs
名前と説明には任意のものを設定し、作成ボタンを押してください。ここでは以下のように設定してみました。
名前: 200618API Key for Watson Studio Job API
説明: Watson Studio Job APIを外部プログラムから呼び出すためのAPI Key
このAPI Keyは悪用されると第三者にご自身のIBM Cloud環境の操作をされてしまうので、厳重に管理してください。
APIによるJobの起動
以下のプログラムで実行ができます。
https://github.com/hkwd/200616WSCJobExecution/blob/master/JobExec2_pub.py
プログラム先頭の以下のパラメーターを環境に合わせて修正してください。
params = {}
params['apikey'] = 'xxxxxxxx'
params['project_id'] = "xxxxxxxx"
先にダウンロードしたAPI Keyからparams['apikey']をセットします。
次にparams['project_id']もセットします。
プロジェクト名をクリックするとブラウザのURLにproject_idが表示されます。
プログラムも簡単に見てみます。
以下でWatson Studioに外部のプログラムからAPIアクセスするためのトークンを取得しています。
#IAMのトークン取得
iam_response = requests.post( params['token_url'], headers=params['token_headers'], data=params['token_data'] )
bearer_token = "Bearer " + iam_response.json()["access_token"]
headers = {
'Authorization': bearer_token,
'Content-Type': 'application/json'
}
参考
How to Access Watson Studio Assets by API | IBM
https://www.ibm.com/cloud/blog/how-to-access-watson-studio-assets-by-api
以下でプロジェクト内のジョブのリストを取得して、JobTestJobという名前を持つJob_idを取得しています。
# Job list取得
project_id =params['project_id']
url=params['url']
response = requests.get(url+"/v2/jobs?project_id="+project_id, headers=headers).json()
MyJobName="JobTestJob"
job_id=next(v['metadata']['asset_id'] for v in response['results'] if v['metadata']['name'] ==MyJobName)
ちなみにJob_idはブラウザでジョブを選べばURL内に書かれていますので、ここからとってくることもできます。ただ、可読性がさがるのでリストから取得するほうがよいと思います。
環境変数を設定して、ジョブを実行し、そのジョブのRunIDを取得しています。
#ジョブ実行のパラメーター
jobrunpost = {
"job_run": {
"configuration" : {
"env_variables" : ["MYENV1=100"]
}
}
}
# Run job
response = requests.post(url+"/v2/jobs/"+job_id+"/runs?project_id="+project_id, headers=headers, json=jobrunpost).json()
# Job run id
job_run_id = response['metadata']['asset_id']
ジョブは非同期で実行されるため、ジョブのRunIDをつかってその終了を確認します。ここでは3秒おきに'Starting', 'Running'のStatusではなくなったかどうかを調べ終了を判定しています。また、12回チェックをしても'Starting', 'Running'のStatsuの場合はTime Upとしました。実際の処理にあわせてこの回数やスリープ時間を調節したり、永久ループで判定するようにしてください。
#Jobの終了を確認
import time
retry_times=12
retry_sleep=3
for i in range(retry_times):
response = requests.get(url+"/v2/jobs/"+job_id+"/runs/"+job_run_id+"?project_id="+project_id, headers=headers).json()
jobstatus = response['entity']['job_run']['state']
print(jobstatus)
if jobstatus not in ['Starting', 'Running']:
break
time.sleep(retry_sleep)
if i==retry_times-1:
print("Time up")
以下のようなイメージで動作します。
ジョブを開いてみると実行の履歴が記録されています。
資産タブにうつり、jobtestout.csvをクリックしてください。
APIで実行したタイムスタンプと与えた環境変数でファイルが更新されています。
#参考
Watson Data API - IBM Cloud API Docs
https://cloud.ibm.com/apidocs/watson-data-api-cpd#get-list-of-jobs-under-a-project
Watson Data API のマニュアルです。
Jobs API
https://api.dataplatform.cloud.ibm.com/v2/jobs/docs/swagger/#/
JobAPIの仕様です。出力例もあります。