import os
import re
import time
from typing import Any
from dotenv import load_dotenv
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
CHAT_UPDATE_INTERVAL_SEC = 0.4
load_dotenv()
# ボットトークンを使ってアプリを初期化
app = App(
signing_secret=os.environ["SLACK_SIGNING_SECRET"],
token=os.environ["SLACK_BOT_TOKEN"],
process_before_response=True,
)
# ストリーミングレスポンスでSlackにメッセージを送る処理の記述
class SlackStreamingCallbackHandler(BaseCallbackHandler):
last_send_time = time.time()
message = ""
# メッセージを送るチャンネルやスレッド、メッセージの更新頻度を設定
def __init__(self, channel, ts):
self.channel = channel
self.ts = ts
self.interval = CHAT_UPDATE_INTERVAL_SEC
self.update_count = 0
# ストリーミングレスポンスを受信しSlackのメッセージを更新
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.message += token
now = time.time()
if now - self.last_send_time > self.interval:
self.last_send_time = now
self.update_count += 1
# Slackのchat_updateは1分間に50回のコール制限があるため
# intervalを調整し、chat_updateのコール数を制御する
if self.update_count / 10 > self.interval:
self.interval = self.interval * 2
app.client.chat_update(
channel=self.channel, ts=self.ts, text=f"{self.message}..."
)
# ストリーミングレスポンスの最後の部分を更新
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
app.client.chat_update(
channel=self.channel,
ts=self.ts,
text=self.message
)
# @app.event("app_mention")
def handle_mention(event, say):
channel = event["channel"]
thread_ts = event["ts"]
message = re.sub("<@.*>", "", event["text"])
# まずメッセージを返す
result = say("\n\nTyping...", thread_ts=thread_ts)
ts = result["ts"]
# ChatGPTにプロンプトを投げ、返答をストリーミングレスポンスでメッセージを更新していく
callback = SlackStreamingCallbackHandler(channel=channel, ts=ts)
llm = ChatOpenAI(
model_name=os.environ["OPENAI_API_MODEL"],
temperature=os.environ["OPENAI_API_TEMPERATURE"],
streaming=True,
callbacks=[callback],
)
llm.predict(message)
def just_ack(ack):
ack()
# lasy_listenerでhandle_mentionを非同期で実行
app.event("app_mention")(ack=just_ack, lazy=[handle_mention])
# ソケットモードハンドラーを使ってアプリを起動
if __name__ == "__main__":
SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"]).start()