まえがき
SageMakerノートブックインスタンスを作成し、その中に作ったJupyterNotebookを別スクリプトで動作させたい要件がありました。そのためのプログラムを作成しました。
ノートブックインスタンス及びJupyterNotebookは既に作成済みである前提として記載します。
参考サイト
プログラムの流れ
- ノートブックインスタンスを起動する
- インスタンス内にあるJupyterNotebookを実行する
- ノートブックインスタンスを終了する
全体プログラム
import boto3
from boto3.session import Session
from time import sleep
from botocore.vendored import requests
from botocore.exceptions import ClientError
import websocket
import requests
access_key = 'xxxxxxxxxxxxxxxx'
secret_key = 'xxxxxxxxxxxxxxxx'
region = 'xxxxxx(例:ap-northeast-1)'
target_instance = '○○○○○○○○○○○'
jupyter_notebook_name = '△△△△△.ipynb'
session = Session(aws_access_key_id = access_key, aws_secret_access_key = secret_key, region_name = region)
sm_client = session.client('sagemaker')
#NotebookInstanceの開始
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
sm_client.start_notebook_instance(NotebookInstanceName = notebook['NotebookInstanceName'])
while True:
sleep(20)
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
if notebook['NotebookInstanceStatus'] == 'InService':
print('notebook start!')
break
#jupyternotebookの実行
url = sm_client.create_presigned_notebook_instance_url(NotebookInstanceName=target_instance)['AuthorizedUrl']
url_tokens = url.split('/')
http_proto = url_tokens[0]
http_hn = url_tokens[2].split('?')[0].split('#')[0]
s = requests.Session()
r = s.get(url)
cookies = "; ".join(key + "=" + value for key, value in s.cookies.items())
ws = websocket.create_connection(
"wss://{}/terminals/websocket/1".format(http_hn),
cookie=cookies,
host=http_hn,
origin=http_proto + "//" + http_hn,
header = ["User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"]
)
ws.send("""[ "stdin", "jupyter nbconvert --execute --to notebook --inplace /home/ec2-user/SageMaker/{} --ExecutePreprocessor.kernel_name=python3 --ExecutePreprocessor.timeout=1500\\r" ]""".format(jupyter_notebook_name))
##jupyterが終わったかどうか確認の処理
s3 = session.resource('s3')
bucket = s3.Bucket('xxxxxxxxxxx')
key_x = "xxxx/xxxx.csv"
while True:
obj_x = list(bucket.objects.filter(Prefix=key_x))
if (len(obj_x) > 0 and obj_x[0].key == key_x) == True:
print('notebook finished')
break
ws.close()
#NotebookInstanceの終了
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
sm_client.stop_notebook_instance(NotebookInstanceName = notebook['NotebookInstanceName'])
while True:
sleep(20)
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
if notebook['NotebookInstanceStatus'] == 'Stopped':
print('notebook stop')
break
解説
①ノートブックインスタンスを起動する
SageMaker関連はboto3を使うことで操作可能です。
インスタンス名を指定し、list_notebook_instances
関数を実行することで、指定したインスタンスの状態を見ることができます。
その後、start_notebook_instance
関数を実行することでインスタンスが開始となります。
インスタンスはすぐに立ち上がらないため、while文を使ってNotebookInstanceStatusを取得し続けています。InServiceとなっていたら次の処理に進むという流れです。
access_key = 'xxxxxxxxxxxxxxxx'
secret_key = 'xxxxxxxxxxxxxxxx'
region = 'xxxxxx(例:ap-northeast-1)'
target_instance = '○○○○○○○○○○○'
session = Session(aws_access_key_id = access_key, aws_secret_access_key = secret_key, region_name = region)
sm_client = session.client('sagemaker')
#NotebookInstanceの開始
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
sm_client.start_notebook_instance(NotebookInstanceName = notebook['NotebookInstanceName'])
ここでSageMakerを確認すると、立ち上げたいノートブックインスタンスのStatusが「Pending」(準備中)となります。
インスタンスがになってからJupyterNotebookにアクセスするため、StatusがPending→InServiceになるまで待つ必要があります。
そのためwhile文でStatusチェックをしています。
while True:
sleep(20)
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
if notebook['NotebookInstanceStatus'] == 'InService':
print('notebook start!')
break
このプログラムが終わってはじめてノートブックインスタンスを使うことができます。確認するとたしかにStatusが「InService」(使用可能)となりました。
②インスタンス内にあるJupyterNotebookを実行する
JupyterNotebookを操作するためにはwebsocketライブラリを使用するようです。pip install websocket-client
でライブラリをインストールする必要があります。
流れとしては、
-
create_presigned_notebook_instance_url
関数で、先ほど立ち上げたノートブックインスタンスからJupyterサーバーに接続するために使用できるURLを取得 - 取得したURLを加工
-
request
関数でURLにアクセスしcookiesを取得 -
websocket.create_connection
関数でJupyterサーバーにアクセス -
ws.send
関数で指定したノートブックを開始 - ノートブックが終了したら
ws.close()
でノートブックを終了
です。
最後の「ノートブックが終了したかどうか」の判定ですが、私のJupyerNotebookでは「指定のS3バケットにcsvファイルを出力する」ことがゴールのため、S3ファイルが存在するかどうかをwhile文を使って判定しています。
ここで2点つまづいた点がありましたので共有です。
ImportError: cannot import name 'create_connection' from 'websocket' (unknown location)
websocketをインストールしたのですが、ちゃんとインポートできていないよというエラーが発生しました。
パッケージの競合が起こっているそうで、以下記事を参考にしてパッケージの再インストールを試みたところうまくいきました。
Handshake status 500 Internal Server Error
websocketサーバーに接続する際にエラーが出ました。
headerにユーザーエージェントを指定することでうまくいきました。
jupyter_notebook_name = '△△△△△.ipynb'
url = sm_client.create_presigned_notebook_instance_url(NotebookInstanceName=target_instance)['AuthorizedUrl']
url_tokens = url.split('/')
http_proto = url_tokens[0]
http_hn = url_tokens[2].split('?')[0].split('#')[0]
s = requests.Session()
r = s.get(url)
cookies = "; ".join(key + "=" + value for key, value in s.cookies.items())
ws = websocket.create_connection(
"wss://{}/terminals/websocket/1".format(http_hn),
cookie=cookies,
host=http_hn,
origin=http_proto + "//" + http_hn,
header = ["User-Agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.127 Safari/537.36"]
)
ws.send("""[ "stdin", "jupyter nbconvert --execute --to notebook --inplace /home/ec2-user/SageMaker/{} --ExecutePreprocessor.kernel_name=python3 --ExecutePreprocessor.timeout=1500\\r" ]""".format(jupyter_notebook_name))
##jupyterが終わったかどうか確認の処理
s3 = session.resource('s3')
bucket = s3.Bucket('xxxxxxxxxxx')
key_x = "xxxx/xxxx.csv"
while True:
obj_x = list(bucket.objects.filter(Prefix=key_x))
if (len(obj_x) > 0 and obj_x[0].key == key_x) == True:
print('notebook finished')
break
ws.close()
③ノートブックインスタンスを終了する
開始するときとほぼ同様のスクリプトです。
stop_notebook_instance
関数を実行することでインスタンスが開始となります。
先ほどと同様、インスタンスはすぐに終了しないため、while文を使ってNotebookInstanceStatusを取得し続けています。Stoppedとなっていたらプログラムを終了するという流れです。
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
sm_client.stop_notebook_instance(NotebookInstanceName = notebook['NotebookInstanceName'])
while True:
sleep(20)
notebook = sm_client.list_notebook_instances(NameContains = target_instance)['NotebookInstances'][0]
if notebook['NotebookInstanceStatus'] == 'Stopped':
print('notebook stop')
break
以上です。
これでAWSのSageMakerにわざわざブラウザからアクセスしなくても、ローカルのPythonから操作したり、Lambdaに載せてCloudWatchEventsで定期実行するなどに活用できます。自由度が広がりそうですね。