4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangChainでChatGPTの返答をストリーミングレスポンスでSlackに返却する

Posted at
  1. Slack Botにメンションしてメッセージを送る
  2. まず、「Typing...」とスレッドに返信が来る
  3. ストレーミングレスポンスで徐々に返信が更新されていく
import os
import re
import time
from typing import Any

from dotenv import load_dotenv
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult

CHAT_UPDATE_INTERVAL_SEC = 0.4

load_dotenv()

# ボットトークンを使ってアプリを初期化
app = App(
    signing_secret=os.environ["SLACK_SIGNING_SECRET"],
    token=os.environ["SLACK_BOT_TOKEN"],
    process_before_response=True,
)

# ストリーミングレスポンスでSlackにメッセージを送る処理の記述
class SlackStreamingCallbackHandler(BaseCallbackHandler):
    last_send_time = time.time()
    message = ""

    # メッセージを送るチャンネルやスレッド、メッセージの更新頻度を設定
    def __init__(self, channel, ts):
        self.channel = channel
        self.ts = ts
        self.interval = CHAT_UPDATE_INTERVAL_SEC
        self.update_count = 0

    # ストリーミングレスポンスを受信しSlackのメッセージを更新
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        self.message += token
        
        now = time.time()
        if now - self.last_send_time > self.interval:
            self.last_send_time = now
            self.update_count += 1

            # Slackのchat_updateは1分間に50回のコール制限があるため
            # intervalを調整し、chat_updateのコール数を制御する
            if self.update_count / 10 > self.interval:
                self.interval = self.interval * 2

            app.client.chat_update(
                channel=self.channel, ts=self.ts, text=f"{self.message}..."
            )

    # ストリーミングレスポンスの最後の部分を更新
    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
        app.client.chat_update(
            channel=self.channel,
            ts=self.ts,
            text=self.message
        )

# @app.event("app_mention")
def handle_mention(event, say):
    channel = event["channel"]
    thread_ts = event["ts"]
    message = re.sub("<@.*>", "", event["text"])

    # まずメッセージを返す
    result = say("\n\nTyping...", thread_ts=thread_ts)
    ts = result["ts"]

    # ChatGPTにプロンプトを投げ、返答をストリーミングレスポンスでメッセージを更新していく
    callback = SlackStreamingCallbackHandler(channel=channel, ts=ts)
    llm = ChatOpenAI(
        model_name=os.environ["OPENAI_API_MODEL"],
        temperature=os.environ["OPENAI_API_TEMPERATURE"],
        streaming=True,
        callbacks=[callback],
    )
    llm.predict(message)

def just_ack(ack):
    ack()

# lasy_listenerでhandle_mentionを非同期で実行
app.event("app_mention")(ack=just_ack, lazy=[handle_mention])

# ソケットモードハンドラーを使ってアプリを起動
if __name__ == "__main__":
    SocketModeHandler(app, os.environ["SLACK_APP_TOKEN"]).start()

動作結果
test.gif

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?