前回の記事では、AWS上のEC2 GPUインスタンス上にStable Diffusion Web UI環境を構築する手順を紹介しました。
こちらは、利用料金の関係から使う時だけインスタンスを立ち上げて、使い終わったら停止させておくような運用方針です。この環境は、手元にGPU環境がない場合やチームで機能検証したい場合などに利用できると考えています。月40時間の利用で約3,740円ほどのランニングコストという試算でした。
今回は、このインスタンスをもう少し使いやすくするために、Slackと連携して以下のようなことをできるようにしたいと思います。
- Slackのコマンドでサーバーを立ち上げ
- 立ち上がったらアクセス先のURLをslackに通知(Elastic IPをケチっているので毎回アドレスが変わる)
- しばらく利用していなければ自動的に停止する
構成
仕組みとしては以下です。
Slackと連携してインスタンスを起動するところは、一般的な方法に乗っ取りLambdaと関数URLを利用します。ただ、インスタンスの起動をwaitするとSlack側の呼び出しがタイムアウトしてしまうため、Slackからリクエストを受け付ける関数を前段に起き、即時レンスポンスを返すと共に、インスタンス起動処理自体は別のLambda関数に任せます。
また、自動停止部分はCloudWatch Logsを利用します。インスタンス内のサービスからCloudWatch Logにログを送信するようにし、その最終時刻を見てインスタンスを停止させるかどうかを判断します。そのログを定期的にチェックする部分はLambdaとEventBridgeを利用します。
以下、前回の記事の続きで、同様の環境が出来上がっている前提で手順を説明していきます。同様にオレゴンリージョンでの作業です。
Stable Diffusion Web UIサーバーからのログ収集
最初のステップとして、前回、構築したサーバーの設定を変更して、Stable Diffusion Web UIの動作ログをCloudWatch Logsに送信するようにします。このログ収集は、自動停止機能のための前準備になります。
ロググループの作成
まず、CloudWatchのサービスページから、新規にロググループを作成しておきます。名前は/ec2/stable-diffusion-web-ui/appとしています。
IAMロールの作成とアタッチ
新規にIAMロールを作成します。名前はstable-diffusion-web-ui-instance-roleとしCloudWatchAgentServerPolicyをアタッチします。
IAMロールが作成できたら、インスタンスにIAMロールをアタッチします。
ログの出力設定
インスタンスを起動し、コンソールログインします。以降は/home/ubuntu/stable-diffusion-webuiディレクトリ以下での作業になります。
cd stable-diffusion-webui/
最初に、以下のStable Diffusion Web UIの起動用のスクリプトをsd-webui-app.shとして作っておきます。一旦、標準出力をログとしてlog以下にファイルで出力し、後程CloudWatchAgentと連携します。また、ログファイルサイズもちょっと気になるので起動の度に削除しちゃいます。(Gradioアプリとして他にいい方法があるかもしれませんが今回は簡単にこれで。)
#!/usr/bin/env bash
mkdir -p log
rm log/*
./webui.sh --listen --xformers --enable-insecure-extension-access --theme dark --gradio-queue 2>&1 | tee -a log/stable-diffusion-web-ui.log
作成した起動スクリプトはパーミッション変更しときます。
chmod 755 sd-webui-app.sh
前回の記事で作成した、起動スクリプトを修正します。
sudo vim /etc/systemd/system/stable-diffusion-web-ui.service
以下のように、ExecStartに先ほど作成したスクリプトを指定します。
[Unit]
Description=Stable Diffusion Web UI
After=network.target
[Service]
User=ubuntu
WorkingDirectory=/home/ubuntu/stable-diffusion-webui
ExecStart=/home/ubuntu/stable-diffusion-webui/sd-webui-app.sh
Restart=always
[Install]
WantedBy=multi-user.target
修正ができたら、サービスをrestartします。
sudo systemctl daemon-reload
sudo systemctl restart stable-diffusion-web-ui.service
以下のように、指定した場所にログが出力されれば成功です。
$ tail -f log/stable-diffusion-web-ui.log
Commit hash: 22bcc7be428c94e9408f589966c2040187245d81
Installing requirements for Web UI
Launching Web UI with arguments: --listen --xformers --enable-insecure-extension-access --theme dark --gradio-queue
[AddNet] Updating model hashes...
0it [00:00, ?it/s]
[AddNet] Updating model hashes...
0it [00:00, ?it/s]
CloudWatchAgentの設定
利用しているAMIにはすでにCloudWatchAgentがインストールされています。
以下コマンドで、CloudWatchAgentの設定ファイルを作成します。
sudo vim /opt/aws/amazon-cloudwatch-agent/etc/amazon-cloudwatch-agent.json
log_group_nameの項目には冒頭で作成したロググループの名前をいれます。また、file_pathは先ほど出力設定を行ったログファイルのパスを指定します。
{
"agent": {
"metrics_collection_interval": 60,
"logfile": "/opt/aws/amazon-cloudwatch-agent/logs/amazon-cloudwatch-agent.log"
},
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/home/ubuntu/stable-diffusion-webui/log/stable-diffusion-web-ui.log",
"log_group_name": "/ec2/stable-diffusion-web-ui/app",
"log_stream_name": "{instance_id}-stable_diffusion_web_ui"
}
]
}
}
}
}
設定が完了したら、CloudWatchAgentを有効化します。
sudo systemctl daemon-reload
sudo systemctl enable amazon-cloudwatch-agent.service
sudo systemctl start amazon-cloudwatch-agent.service
AWSコンソールのCloudWatch Logにて当該のロググループ、ログイベントが確認できればOKです。
Slackコマンドでのインスタンス起動
次にLmabda側の作業に移ります。最初はSlackコマンドでインスタンスを起動する部分です。
ここでは、Slack Appを作成し、インスタンスの起動を行うStart関数と、Slackからのリクエストを受け付けるGateway関数を実装します。さらに、それらを連携できるような設定を行います。
Slack Appの作成
最初にSlackからコマンドを受け付けられるようにSlack Appを作成しておきます。また後で連携のための設定を行いますが、最初に必要なフレーム作成と必要な情報を取得しておきます。
https://api.slack.com/apps/ からNew Appで新しいアプリを作成します。
一旦ここまでです。 Signing Secretをこの後のLambda関数の実装で使うので、控えておいてください。
Lambda関数実行用IAMロール作成
Lambda関数の実装の前に、専用のIAMロールを作成しておきます。こちらはポリシーから作成します。
IAMのページのポリシー作成から以下のステートメントを持つポリシーを作成します。名前は、lambda-ec2-instance-basic-control-policyとし、EC2インスタンスのDescribeやStart/Stop操作、CloudWatch Logsへのアクセス、他のLambda関数の実行を許可しています。XXXXXXXXXXXXの部分はアカウントIDに置き換えてください。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:DescribeInstances",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"ec2:StartInstances",
"ec2:StopInstances"
],
"Resource": [
"arn:aws:ec2:*:XXXXXXXXXXXX:instance/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:GetLogEvents",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:XXXXXXXXXXXX:*"
},
{
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:*:XXXXXXXXXXXX:function:*"
}
]
}
ポリシーの作成ができたら、stable-diffusion-web-ui-lambda-roleという名前でロールを作成し、lambda-ec2-instance-basic-control-policyをアタッチします。
Start Lambda関数の実装
EC2インスタンスをスタートさせるLambda関数を実装します。こちらは、この後実装するGatewayとなる別のLambda関数からコールされ、実際にEC2インスタンスの起動を制御します。さらに、渡されたSlackのresponse_urlに対して起動した旨とStable Diffusion Web UIのURLを通知します。
以下のように名前をstable_diffusion_web_ui_startとし、Lambda関数を新規作成します。
この時、実行ロールに先ほど作成したstable-diffusion-web-ui-lambda-roleをアタッチします。
Lambda関数のコードは以下になります。細かい実装内容の解説は今回は割愛します。
import os
import json
import boto3
import urllib3
REGION = os.environ['REGION']
INSTANCE_ID = os.environ['INSTANCE_ID']
def lambda_handler(event, context):
response_url = event["response_url"]
ec2 = boto3.client('ec2', region_name=REGION)
ec2_status = get_instance_status(ec2)
print('The instance is ' + ec2_status + ' now.')
if ec2_status == "shutting-down" or ec2_status == "terminated":
response_to_slack(response_url, "The instance has been terminated.")
return
# 停止中であれば念のため停止が完了するまで待つ
if ec2_status == "stopping":
stop_waiter = ec2.get_waiter('instance_stopped')
stop_waiter.wait(InstanceIds=[INSTANCE_ID])
ec2_status = get_instance_status(ec2)
print('The instance is ' + ec2_status + ' now.')
if ec2_status == "stopped":
ec2.start_instances(InstanceIds=[INSTANCE_ID])
# 起動完了を待機
running_waiter = ec2.get_waiter('instance_running')
running_waiter.wait(InstanceIds=[INSTANCE_ID])
instance = ec2.describe_instances(InstanceIds=[INSTANCE_ID])['Reservations'][0]['Instances'][0]
response_to_slack(response_url, "Enjoy! http://" + instance['PublicIpAddress'] + ":7860")
def get_instance_status(ec2):
response = ec2.describe_instances(InstanceIds=[INSTANCE_ID])
return response['Reservations'][0]['Instances'][0]['State']['Name']
# slackへの通知
def response_to_slack(response_url, text):
response_payload = {
"text": text
}
headers = {"Content-Type": "application/json"}
http = urllib3.PoolManager()
http.request(
"POST",
response_url,
body=json.dumps(response_payload).encode("utf-8"),
headers=headers
)
ソースコード内ではリージョンとインスタンスIDは環境変数として切り出してあります。設定ページからそれらの値を設定します。
また、インスタンス立ち上げまで多少時間がかかるため、 Lambda関数のタイムアウトも伸ばしておきます。ここでは3分にしています。
SlackGateway Lambda関数の実装
次にSlackから実際に叩かれるLambda関数を実装します。ここでは、念のためリクエストがSlackから送信されたことをSigning Secretを使ってチェックし、送信元が正しければ先ほど実装したStart関数を呼び出します。また、Slack側へ即時レスポンスすることで、呼び出したユーザーにちょっと待ってもらうことを通知します。
新規に、stable_diffusion_web_ui_slack_gatewayといいう名前で新規作成します。
こちらも、実行ロールを指定します。
また、今回は関数URLを有効し、外部からLambda関数を呼び出せるようにします。
こちらの実装コードは以下になります。
import json
from urllib.parse import parse_qs
import base64
import boto3
import os
import hashlib
import hmac
import time
SLACK_SIGNING_SECRET = os.environ["SLACK_SIGNING_SECRET"]
INVOKE_FUNCTION_NAME = os.environ["INVOKE_FUNCTION_NAME"]
def lambda_handler(event, context):
try:
body = base64.b64decode(event["body"]).decode("utf-8")
except Exception:
return {
"statusCode": 400,
"body": json.dumps({"message": "Bad Request"})
}
if verify_request(event["headers"], body):
return handle_request(body)
else:
return {
"statusCode": 401,
"body": json.dumps({"message": "Unauthorized"})
}
# 署名の検証
def verify_request(headers, body):
timestamp = headers["x-slack-request-timestamp"]
signature = headers["x-slack-signature"]
if abs(time.time() - int(timestamp)) > 60 * 5:
return False
message = f"v0:{timestamp}:{body}"
# Calculate the HMAC-SHA256 hash
request_hash = 'v0=' + hmac.new(
key=bytes(SLACK_SIGNING_SECRET, "UTF-8"),
msg=bytes(message, "utf-8"),
digestmod=hashlib.sha256
).hexdigest()
return hmac.compare_digest(request_hash, signature)
def handle_request(body):
body = parse_qs(body)
response_url = body['response_url'][0]
lambda_client = boto3.client("lambda")
lambda_client.invoke(
FunctionName=INVOKE_FUNCTION_NAME,
InvocationType="Event",
Payload=json.dumps({"response_url": response_url})
)
response = {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"text": "Starting the Stable Diffusion Web UI server. Please wait a moment!"})
}
return response
処理としては、verify_request関数内で署名を検証し、Slackからのリクエストということが確認できれば、先ほど作成したStart関数を呼び出します。リクエストbody内のresponse_urlに対してリクエストを行うと、呼び出したスレッドに対して返答ができるため、そちらのurlをStart関数に渡します。なお、response_urlは期限、上限つきの一時的に発行されたURLです。
コードの実装ができたら、こちらも環境変数を設定します。INVOKE_FUNCTION_NAMEにはstable_diffusion_web_ui_start関数の名前を、SLACK_SIGNING_SECRETではSlack Appを作成した時にメモしたシークレットを入力します。
Slack Appの設定
次にSlack Appからのコマンド作成と呼び出しの設定を行います。
Slack Appの設定画面のFeaturesの項目のSlash Commandsという項目から新規コマンドを作成します。Command自体はなんでもいいですが、ここではsd-webuiとしています。Request URLには先ほど実装した、Gatewayの関数URLを指定します(Lambdaのページトップに表示されているのですぐにわかるかと)。
この状態でBasic InfomationからAppを目的のWorkspaceに対してInstallして、動作を試してみましょう。
Install後、Slackの任意チャンネルで作成したコマンドを入力後、以下のような表示がされればOKです。また、AWSコンソールからも対象のInstanceが起動中になっていることを確かめてください。
ログ監視によるインスタンスの自動停止
最後にログを定期的に監視することで、インスタンスを自動的に停止させる機構を実装します。こちらはCloudWatch LogsをEventBridgeとLambdaで定期的に監視し、一定期間アプリのアクティビティがなければインスタンスの停止を行います。
Auto Stop Lambda関数の実装
新規にLambda関数をstable_diffusion_web_ui_auto_stopという名前で作成します。ポリシーのアタッチは上記二つの関数と同様です。関数URLは不要です。
コードは以下になります。
import boto3
import datetime
import json
import os
REGION = os.environ['REGION']
INSTANCE_ID = os.environ['INSTANCE_ID']
LOG_GROUP_NAME = os.environ['LOG_GROUP_NAME']
SLEEP_SECONDS = float(os.environ['SLEEP_SECONDS'])
def lambda_handler(event, context):
ec2 = boto3.client('ec2', region_name=REGION)
response = ec2.describe_instances(InstanceIds=[INSTANCE_ID])
ec2_status = response['Reservations'][0]['Instances'][0]['State']['Name']
if ec2_status == "shutting-down" or ec2_status == "terminated":
print("The instance has been terminated.")
return
elif ec2_status == "stopping" or ec2_status == "stopped":
print("The instance has been stopped.")
return
elif ec2_status == "pending":
print("The instance is being launched")
return
elapsed = get_elapsed_time_from_last_access()
print("elapsed: " + str(elapsed))
if elapsed >= SLEEP_SECONDS:
print("stop instance")
ec2.stop_instances(InstanceIds=[INSTANCE_ID])
# ログを捜査し最終アクティビティからの経過時間を取得する
def get_elapsed_time_from_last_access():
logs = boto3.client('logs')
response = logs.describe_log_streams(
logGroupName=LOG_GROUP_NAME,
orderBy='LastEventTime',
descending=True,
limit=1
)
# ロググループのLastEventTimeは多少時差があるためログイベントまで見る
last_log_stream_name = response['logStreams'][0]['logStreamName']
response = logs.get_log_events(
logGroupName=LOG_GROUP_NAME,
logStreamName=last_log_stream_name,
limit=1,
startFromHead=False
)
# ミリ秒単位に揃える
last_event_time = response['events'][0]['timestamp'] / 1000
current_time = datetime.datetime.utcnow().timestamp()
return current_time - last_event_time
こちらも環境変数を設定します。インスタンスIDとロググループNameをそれぞれ指定します。また、SLEEP_SECONDSはインスタンスを自動停止させるまでの秒数です。今回は900秒で15分にしています。必要に応じて調整してください。
また、この関数も少しタイムアウトしやすいのでデフォルトの値から多少秒数を伸ばしておいた方がよさそうです。
EventBridgeの設定
最後に、Stop関数の自動定期実行を設定します。この関数のトップページにある「トリガーを追加」をクリックしEventBridgeを選択します。
次に、新規ルールを作成を選択し、ルール名にstable-diffusion-web-ui-auto-stop-interval-checkを、ルールタイプはスケジュール式にしrate(15 minutes)
を入力します。このチェック間隔も適宜調整してください。
停止ロジックの注意事項
現在の構成では、「最終アクティビティが15分以上かどうかを15分おきにチェック」というロジックであるため、噛み合わせが悪いと最長で29分程度稼働することに注意してください。
また、なんらかのエラーでStop関数が作動しない状態になった場合、お高めなインスタンスが稼働しっぱなしになるため、停止ロジックの動作確認や、インスタンスやコストの状態監視などには気を払ったほうがよいでしょう。