はじめに
この記事はQualiArts Advent Calendar 2024の7日目の記事になります。
前回、私はChatGPTを利用してバックオフィスへの問い合わせを削減した話を書きました。
その最後に課題として、結果を出すまでの時間がSlackPlatformのタイムアウト時間を超えてしまうという問題を挙げました。
今回は、それに対する回答の一つとして、CloudRunJobsを利用した構成をつくってみましたので紹介します。
Slackと連携させて、生成AIなどの重い処理をさせたいと考えている人の一助となればと思います。
今回作るシステムの要件
今回のシステムの要件は以下のとおりです。
- slackでユーザのアクションに応じて、質問の回答や画像生成を行う
- botに対するメンションをトリガーとする
ここで問題になるのが先述のタイムアウトの問題です。
基本的に、slack側からアクセスがあり、サーバ側が3秒以内にレスポンスを返さないと失敗(タイムアウト)となります1。ちょっと3秒は早すぎませんかね。
なお、イベントAPIの場合、タイムアウトになるとリトライが発生します(タイムアウト直後、1分後、5分後の計3回のリトライをしてくれます)。
実はメンションに反応するだけであれば、slack側でタイムアウトとなっていても、あまりユーザには気づかれないのですが、ボタンなどを利用する場合はタイムアウトするとその旨はユーザに表示されてしまいます。
しかし、実際上、chatgptで画像を生成したり、画像を生成しないにしても何度も結果を生成する必要が出てくると、まぁまず3秒では無理でしょう。
そこで、slackには一旦即座にレスポンスを返し、裏側では処理を続けるという構成にする必要があります。
システム構成
処理の流れは以下のようになります
- ユーザがslackにメッセージを投稿する
- slackがCloudRunサービスに対してメッセージがあったことを通知する
- CloudRunサービスはCloudRunジョブを起動するだけして、slackにレスポンスを返す
- CloudRunジョブはサービスから渡されたメッセージをもとにプロンプトを構築。その後、ChatGPTにそのプロンプトを渡し、結果を得る
- CloudRunジョブはChatGPTから得られた情報を整形し、slackに投稿する
実装
実装はPythonで行いました。
以下は使用したライブラリとバージョンです。
fastapi: 0.115.5
google-cloud-run: 0.10.13
slack-sdk: 3.27.1
SERVICE
以下がSERVICEの実装になります。
import datetime
import hashlib
import hmac
import logging
import os
from typing import Optional
from fastapi import FastAPI, Request
from google.cloud import run_v2
from pydantic import BaseModel
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
slack_client = WebClient(token=os.environ['SLACK_BOT_TOKEN'])
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
client = run_v2.JobsClient()
# 処理したメッセージのtsの集合
# 3秒でインスタンスが起動しなかったときにはリトライが発生するため、多重実行を避けるために必要
executed_set: set[str] = set()
class Query(BaseModel):
message: str
thread_ts: str
channel: str
class SlackEventRequest(BaseModel):
token: str
type: str
challenge: Optional[str] = None
event: Optional[dict] = None
team_id: Optional[str] = None
api_app_id: Optional[str] = None
event_context: Optional[str] = None
event_id: Optional[str] = None
event_time: Optional[int] = None
authorizations: Optional[list[dict]] = None
is_ext_shared_channel: Optional[bool] = None
context_team_id: Optional[str] = None
context_enterprise_id: Optional[str] = None
@app.get("/healthcheck")
async def healthcheck():
return "OK"
def ask(req: SlackEventRequest):
channel: str = req.event["channel"]
thread_ts: str = req.event["ts"]
# リアクションがないとbotが動いているか不安になるので、一言投稿しておく
post_to_slack(channel, thread_ts, "えーっと...")
query = Query(
message=req.event["text"],
thread_ts=thread_ts,
channel=channel
)
# ジョブには環境変数としてユーザのメッセージなどの情報を渡す
job_container_envs = [
run_v2.types.EnvVar(name='QUERY_JSON', value=query.model_dump_json()),
]
job_container_overrides= run_v2.types.RunJobRequest.Overrides.ContainerOverride(
env=job_container_envs,
)
job_overrides = run_v2.types.RunJobRequest.Overrides(
container_overrides=[job_container_overrides],
)
job_name: str = os.environ['CLOUD_RUN_JOB_NAME']
request = run_v2.RunJobRequest(
name=job_name,
overrides=job_overrides
)
client.run_job(request=request)
@app.post("/event")
async def slack_event_api(req: SlackEventRequest, request: Request):
try :
if req.type == "url_verification":
return {
"challenge": req.challenge
}
body = (await request.body()).decode("utf-8")
if not verify(dict(request.headers), body, os.environ["SLACK_SIGNING_SECRET"]):
logger.warning("Invalid request")
return "error"
ts = req.event["ts"]
if ts in executed_set:
# 既に処理したメッセージの場合は無視
return "success"
if req.event["type"] == "app_mention":
# メッセージが送られてきた場合
ask(req)
executed_set.add(ts)
except Exception as e:
logger.error(e)
return "error" #エラーだがリトライされても困るので200で返しておく
return "success"
def verify(headers: dict, body: str, slack_key: str) -> bool:
"""
slackからのmessageかを判別する
see: https://api.slack.com/authentication/verifying-requests-from-slack
"""
request_ts = int(headers["x-slack-request-timestamp"])
now_ts = int(datetime.datetime.now().timestamp())
if abs(request_ts - now_ts) > (60 * 5):
logger.warning(f"Invalid timestamp {request_ts} {now_ts}")
return False
signature = headers["x-slack-signature"]
message = f"v0:{headers['x-slack-request-timestamp']}:{body}"
message_hmac = hmac.new(
bytes(slack_key, "UTF-8"), bytes(message, "UTF-8"), hashlib.sha256
)
expected = f"v0={message_hmac.hexdigest()}"
result = hmac.compare_digest(expected, signature)
if not result:
logger.warning(f"Invalid signature {expected} {signature}")
return result
def post_to_slack(channel: str, thread_ts: str, message: str):
try:
response = slack_client.chat_postMessage(channel=channel, text=message, thread_ts=thread_ts)
logger.info(response)
except SlackApiError as e:
print(f"Got an error: {e.response['error']}")
SERVICEではslackのイベント通知を受け取り、JOBを起動するように実装します。
イベントはメンションのみを受け付ける実装にしています。
また、通知が来た際は署名を検証し、slackからのアクセスであることを確認します。
通知をもとに、JOBには以下の3つの情報を渡します。
- 投稿された内容
- 投稿されたチャンネル
- 投稿のタイムスタンプ
JOBに情報を渡す方法はいくつかあるようですが、今回は環境変数として設定することで情報を渡すことにしました。
job_container_envs = [
run_v2.types.EnvVar(name='QUERY_JSON', value=query.model_dump_json()),
]
job_container_overrides= run_v2.types.RunJobRequest.Overrides.ContainerOverride(
env=job_container_envs,
)
job_overrides = run_v2.types.RunJobRequest.Overrides(
container_overrides=[job_container_overrides],
)
job_name: str = os.environ['CLOUD_RUN_JOB_NAME']
request = run_v2.RunJobRequest(
name=job_name,
overrides=job_overrides
)
client.run_job(request=request)
ここが環境変数設定から起動の実装になります。
QUERY_JSON
という名前で環境変数に入れています。
今回はContainerOverrideのenvを設定していますので環境変数になっていますが、argsを設定することで実行時の引数として渡すこともできるようです。
RunJobRequestのnameはジョブ名を設定します。
設定される文字列は以下のようなフォーマットになります。
projects/{プロジェクト名}/locations/{リージョン}/jobs/{ジョブ名}
JOB
以下がJOBの実装になります。
import os
import logging
from openai import OpenAI
from pydantic import BaseModel
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Query(BaseModel):
message: str
thread_ts: str = None
channel: str = None
def ask(query: Query) -> str:
openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
chat_completion = openai_client.chat.completions.create(
messages=[
{
'role': 'user',
'content': query.message
}
],
model="gpt-4o",
)
return chat_completion.choices[0].message.content
def post_to_slack(channel: str, thread_ts: str, message: str) -> None:
try:
slack_client = WebClient(token=os.environ['SLACK_BOT_TOKEN'])
response = slack_client.chat_postMessage(channel=channel, text=message, thread_ts=thread_ts)
logger.info(response)
except SlackApiError as e:
logger.error(f"Got an error: {e.response['error']}")
def is_image_request(query: Query) -> bool:
openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
chat_completion = openai_client.chat.completions.create(
messages=[
{
'role': 'system',
'content': "次のユーザの発言が、あなたに画像生成を依頼するものか、そうでないかを判定してください。画像生成を依頼するものであれば「はい」、そうでないものであれば「いいえ」とだけ返答してください。"
},
{
'role': 'user',
'content': query.message
}
],
model="gpt-4o",
)
return chat_completion.choices[0].message.content == "はい"
def generate_image(query: Query) -> str:
openai_client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
result = openai_client.images.generate(
prompt=query.message,
model="dall-e-3",
size="1024x1024",
n=1,
quality="standard",
response_format="url",
style="natural"
)
return result.data[0].url
def file_upload_to_slack(channel: str, thread_ts: str, url: str):
file_path = f"/tmp/image.{channel}.{thread_ts}.jpg"
with open(file_path, 'wb') as f:
f.write(requests.get(url).content)
try:
slack_client = WebClient(token=os.environ['SLACK_BOT_TOKEN'])
response = slack_client.files_upload_v2(channel=channel, thread_ts=thread_ts, file=file_path)
logger.info(response)
except SlackApiError as e:
logger.error(f"Got an error: {e.response['error']}")
def main():
query_json = os.getenv('QUERY_JSON', None)
if query_json is None:
raise Exception('env: QUERY_JSON has to be given.')
try:
query = Query.model_validate_json(query_json)
except Exception as e:
logger.error(f'QUERY_JSON is not JSON format. : {query_json}')
raise e
if is_image_request(query):
img_url = generate_image(query)
file_upload_to_slack(query.channel, query.thread_ts, img_url)
else:
message = ask(query)
post_to_slack(query.channel, query.thread_ts, message)
if __name__ == '__main__':
main()
JOBではSERVICEから受け取った投稿の情報をもとに返答を生成し、slackに投稿します。
- メッセージの内容を見て、画像の生成を望んでいそうか判定する
- 画像の生成を望んでいる場合
- DALL-Eを利用して画像を生成(画像のURLを取得)
- 画像のURLにアクセスし、画像を取得。取得した画像をローカルに保存
- ローカルに保存した画像をslackに投稿
- 画像生成を望んでいない場合
- ChatGPTを利用して返答を生成
- 生成した返答をslackに投稿
なお、QUERY_JSON
以外の環境変数も使っていますが、こちらはSERVICE側から設定したものではなく、JOB側で設定しているものになります。
今回の処理では対して時間もかからないはずなので、ジョブのタイムアウト設定は10分に設定してありますが、最大で168時間まで設定できるようです。
slackのタイムアウトも気にしなくていいので、好きなだけ重い処理をできますね。
結果
実際にslackに投稿するとこのようになります。
確認したところ、slackへのレスポンスまではおよそ2秒。
画像の生成が完了し投稿されるまではおよそ30秒。
当初の目的は達成できたかなと思います。
ただ、それでも度々APIのインスタンスが起動するまでに時間がかかり、レスポンスを返すまでに3秒を超えることがあるようで、リトライが走ってしまいます。
なので、念の為、同じメッセージに対するリクエストがあった場合は無視する実装を入れておいたほうが良いかと思います。
まとめ
今回はCloudRunとCloudRunJobsを利用して、slackの求めるレスポンスタイムを満たす構成をつくり、試してみました。
結果として、一応レスポンスまで2秒程度に収め、重い処理をしても特に問題ない状態にできました。
この構成はあくまで一例で、もっと他に良い案もあるとは思いますが、もしあなたにとって役立つ情報であったなら幸いです。
-
Slack EventsAPI https://api.slack.com/apis/events-api#failure ↩