LoginSignup
14
22

LangChain × SlackAPI × Lambda でWebページを要約してくれるSlackChatGPTサーバーレスアプリ作った

Posted at

作ったもの

SlackでWebページを渡すと要約してくれて、その後はURLを元に回答を生成してくれるアプリを作りました🤗
(難解なWebページとか読むのめんどくさい時用です)
Image from Gyazo

インフラ構成

Image from Gyazo

それぞれの役割

Image from Gyazo

こんな感じの構成になっています。
LangChain周りのパッケージが重たすぎて、LambdaLayerのデプロイパッケージの制限(解凍後サイズ 250MB)を超えてしまうので、ECRにDockerイメージを保管しそこからLambdaを展開しています。

資料

イベント登壇時の資料です

コード

超特急で作ったので細かい所や汚いところは目を瞑ってください🥲

Dockerfile
FROM public.ecr.aws/lambda/python:3.8

# Upgrade pip
RUN python3 -m pip install --upgrade pip

COPY requirements.txt ./
RUN pip install -r requirements.txt

COPY handler.py ./

CMD ["handler.lambda_handler"]
requirements.tx
aiofiles==23.1.0
aiohttp==3.8.4
aiosignal==1.3.1
altair==5.0.0
anyio==3.6.2
async-timeout==4.0.2
attrs==23.1.0
beautifulsoup4==4.12.2
boto3==1.26.96
botocore==1.29.136
certifi==2023.5.7
cffi==1.15.1
charset-normalizer==3.1.0
click==8.1.3
contourpy==1.0.7
cryptography==40.0.2
cycler==0.11.0
dataclasses-json==0.5.7
faiss-cpu==1.7.4
fastapi==0.95.2
ffmpy==0.3.0
filelock==3.12.0
fonttools==4.39.4
frozenlist==1.3.3
fsspec==2023.5.0
geojson==2.5.0
gradio==3.23.0
greenlet==2.0.2
h11==0.14.0
httpcore==0.17.1
httpx==0.24.1
huggingface-hub==0.14.1
idna==3.4
Jinja2==3.1.2
jmespath==1.0.1
jsonschema==4.17.3
kiwisolver==1.4.4
langchain==0.0.126
linkify-it-py==2.0.2
markdown-it-py==2.2.0
MarkupSafe==2.1.2
marshmallow==3.19.0
marshmallow-enum==1.5.1
matplotlib==3.7.1
mdit-py-plugins==0.3.3
mdurl==0.1.2
multidict==6.0.4
mypy-extensions==1.0.0
numexpr==2.8.4
numpy==1.24.3
openai==0.27.2
openapi-schema-pydantic==1.2.4
orjson==3.8.12
packaging==23.1
pandas==2.0.1
pdfminer.six==20221105
pdfplumber==0.9.0
Pillow==9.5.0
pycparser==2.21
pydantic==1.10.7
pydub==0.25.1
pyowm==3.3.0
pyparsing==3.0.9
pyrsistent==0.19.3
PySocks==1.7.1
python-dateutil==2.8.2
python-multipart==0.0.6
pytz==2023.3
PyYAML==6.0
regex==2023.5.5
requests==2.30.0
s3transfer==0.6.1
semantic-version==2.10.0
six==1.16.0
sniffio==1.3.0
soupsieve==2.4.1
SQLAlchemy==1.4.48
starlette==0.27.0
tenacity==8.2.2
tiktoken==0.3.3
tokenizers==0.13.3
toolz==0.12.0
tqdm==4.65.0
transformers==4.29.2
typing-inspect==0.8.0
typing_extensions==4.5.0
tzdata==2023.3
uc-micro-py==1.0.2
urllib3==1.26.15
uvicorn==0.22.0
Wand==0.6.11
websockets==11.0.3
yarl==1.9.2
handler.py
import os
import json
import tempfile
import boto3
import openai
import langchain
import requests
import pdfplumber
import re

from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chat_models import ChatOpenAI
from langchain.docstore.document import Document
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate
from langchain.vectorstores import FAISS
from langchain.chains import ConversationalRetrievalChain
from langchain.memory.chat_message_histories import DynamoDBChatMessageHistory
from langchain.chains import LLMChain
from langchain.chains.question_answering import load_qa_chain
from langchain.chains.conversational_retrieval.prompts import CONDENSE_QUESTION_PROMPT
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from io import BytesIO
from datetime import datetime

URL_SUMMARIZER_ID = os.environ["URL_SUMMARIZER_ID"]
BUCKET_NAME = os.environ["BUCKET_NAME"]
SESSION_TABLE_NAME = os.environ["SESSION_TABLE_NAME"]

# S3の準備
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(BUCKET_NAME)

# ChatGPTの準備
openai.api_key = os.environ["OPENAI_API_KEY"]
llm = ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo")
embeddings = OpenAIEmbeddings()

url = ""

summarize_prompt_template = """以下の文章を簡潔に要約してください。:

{text}

要約:"""

qa_prompt_template = """

{context}

これを踏まえて、次の質問に日本語で答えてください。

質問: {question}
回答:"""

# Webページからテキストを抽出
def get_webpage_texts(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, 'html.parser')
    text_list = []

    main_div = soup.find('div', class_='main')
    if main_div:
        for string in main_div.stripped_strings:
            text_list.append(string)
    else:
        body = soup.find('body')
        if body:
            for string in body.stripped_strings:
                text_list.append(string)

    return text_list

# PDFページからテキストを抽出
def get_pdf_text(url):
    response = requests.get(url)
    pdf_file = BytesIO(response.content)
    text_list = []
    with pdfplumber.open(pdf_file) as pdf:
        for page in pdf.pages:
            text_list.append(page.extract_text())
    return text_list

# URLをS3フォルダ名に変更 domain-yyyymmddhhmmssの形にする
def modify_url_to_s3_path(url):
    now = datetime.now()
    timestamp = now.strftime("%Y%m%d%H%M%S")
    parsed_url = url.split("/")
    domain = parsed_url[2].replace(".", "-")
    new_url = f"{domain}-{timestamp}"
    return new_url

# S3にディレクトリを格納
def upload_dir_s3(dirpath, s3bucket, s3_path):
    for root,dirs,files in os.walk(dirpath):
        for file in files:
            s3_key = os.path.join(root, file).replace(dirpath, s3_path, 1)
            s3bucket.upload_file(os.path.join(root, file), s3_key)

# S3からディレクトリを取得
def download_dir_s3(dirpath, s3bucket):
    for obj in s3bucket.objects.filter(Prefix = dirpath):
        if not os.path.exists(os.path.join('/tmp/', os.path.dirname(obj.key))):
            os.makedirs(os.path.join('/tmp/', os.path.dirname(obj.key)))
        s3bucket.download_file(obj.key, os.path.join('/tmp/', obj.key))
    return os.path.join('/tmp/', dirpath)

# DynamoDBへ保存処理
def write_to_dynamo(url, s3_path, summarize_result, channel_name, user_name):
    dynamodb = boto3.resource('dynamodb')
    table_name = os.environ["TABLE_NAME"]
    table = dynamodb.Table(table_name)
    now = datetime.now()

    # データを書き込む
    table.put_item(
        Item={
            'Url': url,
            'AttributeType': "S3Key",
            'AttributeValue': s3_path,
            'ChannelName': channel_name,
            'UserName': user_name,
            'SummarizeResult': summarize_result,
            'created_at': now.isoformat()
        }
    )

# DynamoDBからの取得処理
def get_from_dynamo(url):
    if url == "":
        return None

    dynamodb = boto3.resource('dynamodb')
    table_name = os.environ["TABLE_NAME"]
    table = dynamodb.Table(table_name)

    # データを取得する
    response = table.get_item(
        Key={
            'Url': url,
            'AttributeType': "S3Key"
        }
    )

    # 取得した項目が存在するかを確認
    if 'Item' in response:
        return response['Item']
    else:
        return None

# DynamoDBから会話履歴処理
def get_history(session_id):
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(SESSION_TABLE_NAME)

    # データを取得する
    response = table.get_item(
        Key={
            'SessionId': session_id
        }
    )

    chat_history = []
    # 取得した項目が存在するかを確認
    if 'Item' in response and response['Item'].get("History") is not None:
        # "History"の下のリストをループ処理してchat_historyを生成
        human_content = ''
        for history_item in response['Item']["History"]:
            if history_item["type"] == "human":
                human_content = history_item["data"]["content"]
            elif history_item["type"] == "ai" and human_content:
                ai_content = history_item["data"]["content"]
                chat_history.append((human_content, ai_content))
                human_content = ''
        print(chat_history)
        return chat_history
    else:
        print(chat_history)
        return chat_history

# Slackへ返信する
def send_slack_message(channel, text, thread_ts):
    SLACK_BOT_TOKEN = os.environ["SLACK_BOT_TOKEN"]
    headers = {
        "Content-Type": "application/json; charset=utf-8",
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}"
    }

    data = {
        "token": SLACK_BOT_TOKEN,
        "channel": channel,
        "text": text,
        "thread_ts": thread_ts
    }

    response = requests.post("https://slack.com/api/chat.postMessage", json=data, headers=headers)
    return response

# VectorIndexを元にQA処理を行う
def exec_qa(item, query, thread_ts, channel, thread_head_ts):
    print("DynamoDB item: " + json.dumps(item))
    s3_download_path = item["AttributeValue"]
    print("s3_download_path: " + s3_download_path)

    # S3から保存済みのVectorIndexを取得
    index_dir = download_dir_s3(s3_download_path, s3_bucket)
    dbDownload = FAISS.load_local(index_dir, embeddings)

    # QA処理
    qa_prompt = PromptTemplate(
        template=qa_prompt_template, input_variables=["context", "question"]
    )

    question_generator = LLMChain(llm=llm, prompt=CONDENSE_QUESTION_PROMPT)
    doc_chain = load_qa_chain(llm, chain_type="stuff", prompt=qa_prompt)

    # スレッドの過去発言を取得
    message_history = DynamoDBChatMessageHistory(table_name=SESSION_TABLE_NAME, session_id=thread_head_ts)
    chat_history = get_history(thread_head_ts)

    # 無視という単語が入っていたらベクトルデータを参考にせずに質問させる
    if "無視" in query:
        dummy_index = download_dir_s3("dummy-empty-vectorindex", s3_bucket)
        dummy_db = FAISS.load_local(dummy_index, embeddings)
        qa = ConversationalRetrievalChain.from_llm(llm, dummy_db.as_retriever())
        qa_result = qa({"question": query, "chat_history": []})
    else:
        qa = ConversationalRetrievalChain(retriever=dbDownload.as_retriever(), question_generator=question_generator, combine_docs_chain=doc_chain)
        qa_result = qa({"question": query, "chat_history": chat_history})

    print("qa_result: " + qa_result["answer"])

    # DynamoDBにChatGPTの回答を保存
    message_history.add_ai_message(qa_result["answer"])
    send_slack_message(channel, qa_result["answer"], thread_ts)

# Slackのスレッドから元となる質問元となるURLを取得する
def get_thread_url(thread_ts, channel):
    headers = {
        'Authorization': 'Bearer ' + os.environ['SLACK_BOT_TOKEN'],
        'Content-type': 'application/json; charset=utf-8'
    }

    # メッセージからURLを取得するために正規表現を用意
    user_id_pattern = re.compile(re.escape(URL_SUMMARIZER_ID))
    url_pattern = r'(https?://[^|>]+)'

    next_cursor = None
    while True:
        params = {
            'channel': channel,
            'ts': thread_ts,
            'cursor': next_cursor  # ページネーションのためのcursorパラメータ
        }
        response = requests.get(
            'https://slack.com/api/conversations.replies',
            params=params,
            headers=headers
        )
        response.raise_for_status()  # 応答のステータスコードが200以外の場合にエラーを発生させる
        data = response.json()
        if not data.get('ok'):
            raise RuntimeError(f"Slack API request failed: {data.get('error')}")

        # 該当スレッドのメッセージ
        messages = data['messages']

        # メッセージを一つずつチェックしていく
        for message in messages:
            if re.search(user_id_pattern, message['text']) and re.search(url_pattern, message['text']):
                url = re.search(url_pattern, message['text']).group(0)
                print(f"Found URL: {url}")
                return url

        # 次のページが存在する場合、cursorを更新して再度リクエストを行う
        next_cursor = data.get('response_metadata', {}).get('next_cursor')
        if not next_cursor:
            break

    # URLが見つからなかった場合は空文字列を返却
    return ""

# Slackチャンネル名を取得
def get_channel_name(channel_id):
    headers = {
        'Authorization': 'Bearer ' + os.environ['SLACK_BOT_TOKEN'],
        'Content-type': 'application/json; charset=utf-8'
    }
    params = {
        'channel': channel_id
    }
    response = requests.get(
        'https://slack.com/api/conversations.info',
        params=params,
        headers=headers
    )
    response.raise_for_status()  # 応答のステータスコードが200以外の場合にエラーを発生させる
    data = response.json()
    if data.get('ok'):
        return data['channel'].get('name', 'Unknown channel')
    else:
        return 'Unknown channel'

# Slackユーザー名を取得
def get_user_name(user_id):
    headers = {
        'Authorization': 'Bearer ' + os.environ['SLACK_BOT_TOKEN'],
        'Content-type': 'application/json; charset=utf-8'
    }
    params = {
        'user': user_id
    }
    response = requests.get(
        'https://slack.com/api/users.info',
        params=params,
        headers=headers
    )
    response.raise_for_status()  # 応答のステータスコードが200以外の場合にエラーを発生させる
    data = response.json()
    if data.get('ok'):
        user = data['user']
        return user.get('real_name', user.get('name', 'Unknown user'))
    else:
        return 'Unknown user'

def lambda_handler(event, context):
    print(f"Event: {json.dumps(event)}")
    body = json.loads(event['body'])

    # Slackの疎通確認用
    if body['type'] == 'url_verification':
        print("Slack challenge.")
        return {
            'statusCode': 200,
            'body': body['challenge']
        }

    headers = event['headers']
    # 処理に時間がかかりSlackに3秒以内にリクエストを返せないとリトライされる。Slackからのリトライは無視。
    if 'x-slack-retry-reason' in headers or 'x-slack-retry-num' in headers:
        print("Slack retry.")
        return {
            'statusCode': 200,
            'body': "Slack retry. ok."
        }

    TEAM_ID = os.environ["TEAM_ID"]
    API_APP_ID = os.environ["API_APP_ID"]

    team_id = body['team_id']
    api_app_id = body['api_app_id']

    # 変なリクエストは弾く
    if team_id != TEAM_ID or api_app_id != API_APP_ID:
        print("Not authorized.")
        print("team_id: " + team_id)
        print("api_app_id: " + api_app_id)
        return {
            'statusCode': 400,
            'body': json.dumps({
                'error': {
                    'message': 'Invalid team ID or API app ID.',
                    'code': 'INVALID_REQUEST'
                }
            })
        }

    # メンション時
    if body['event']['type'] == 'app_mention':
        try:
            # 呼び出しもとSlack情報
            channel = body['event']['channel']
            thread_ts = body['event']['ts']
            text = body['event']['text']
            channel_id = body["event"]["channel"]
            user_id = body["event"]["user"]
            print("text: " + text)

            # Slack本文から正規表現を使ってURLを切り出し
            url_pattern = r'(https?://[^|>]+)'
            urls = re.findall(url_pattern, text)
            if urls:
                url = urls[0]
                url_contain = True
                print("url: " + url)
            else:
                url_contain = False

            if url_contain:

                # 対話を記憶させるためのテーブル準備 IDはSlackのスレッド頭のthread_tsにする
                message_history = DynamoDBChatMessageHistory(table_name=SESSION_TABLE_NAME, session_id=thread_ts)
                # 質問からメンションを削除してDynamoDBにユーザーの発言を保存
                user_message = re.sub(re.escape(URL_SUMMARIZER_ID), '', text)
                message_history.add_user_message(user_message)

                item = get_from_dynamo(url)
                if item is not None:
                    print("Already summarized.")
                    message = item["SummarizeResult"]
                    message_history.add_ai_message(message)
                    send_slack_message(channel, message, thread_ts)
                else:
                    # 投稿にurlが含まれておりので要約を行う
                    print("Summarize start.")

                    processing_message = "データを学習して要約作成中です。"
                    send_slack_message(channel, processing_message, thread_ts)

                    # テキストを取得してtext_listに格納
                    text_list = []
                    if urlparse(url).path.endswith('.pdf'):
                        text_list.extend(get_pdf_text(url))
                    else:
                        text_list.extend(get_webpage_texts(url))
                    print("text_list: " + text_list[0])

                    # テキストを学習用に分割する
                    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
                    texts = text_splitter.split_text(text_list)
                    docs = [Document(page_content=t) for t in texts[:3]]

                    # 要約処理
                    summarize_template = PromptTemplate(
                                template=summarize_prompt_template, input_variables=["text"])
                    chain = load_summarize_chain(llm, chain_type="map_reduce", map_prompt=summarize_template, combine_prompt=summarize_template)
                    summarize_result = chain.run(docs)
                    print("summarize_result: " + summarize_result)

                    # ベクトルデータ化してindexを作成する
                    dbCreate = FAISS.from_documents(docs, embeddings)
                    dbCreate.save_local("/tmp/tmp_index")
                    print("Vectorize done!")

                    # S3へアップロードする
                    s3_upload_path = modify_url_to_s3_path(url)
                    upload_dir_s3("/tmp/tmp_index", s3_bucket, s3_upload_path)
                    print("S3 uploaded: " + s3_upload_path)

                    # チャンネル名とユーザー名を取得
                    channel_name = get_channel_name(channel_id)
                    user_name = get_user_name(user_id)

                    # DynamoDBへアップロードする
                    write_to_dynamo(url, s3_upload_path, summarize_result, channel_name, user_name)
                    print("DynamoDB write done!")

                    # DynamoDBにChatGPTの回答を保存
                    message_history.add_ai_message(summarize_result)
                    send_slack_message(channel, summarize_result, thread_ts)
            else:
                # 投稿自体にurlが含まれていないので質問と判断
                print("QA start.")
                # 質問からメンションを削除する
                query = re.sub(re.escape(URL_SUMMARIZER_ID), '', text)
                print("query: " + query)

                # 後から使うかもなので空VectorIndexでConversationalRetrievalChainを用意しておく
                dummy_index = download_dir_s3("dummy-empty-vectorindex", s3_bucket)
                dummy_db = FAISS.load_local(dummy_index, embeddings)
                qa = ConversationalRetrievalChain.from_llm(llm, dummy_db.as_retriever())

                # スレッドから対象となるURLを取得する
                if 'thread_ts' in body['event']:
                    thread_head_ts = body['event']['thread_ts']
                    url = get_thread_url(thread_head_ts, channel)
                    print("QA url: " + url)

                    # 対話を記憶させるためのテーブル準備 IDはSlackのスレッド頭のthread_tsにする
                    message_history = DynamoDBChatMessageHistory(table_name=SESSION_TABLE_NAME, session_id=thread_head_ts)
                    # DynamoDBにユーザーの発言を保存
                    message_history.add_user_message(query)

                else:
                    # スレッド内ではないメンション時にURLが入っていないことを想定
                    # DynamoDBにユーザーの発言を保存
                    message_history = DynamoDBChatMessageHistory(table_name=SESSION_TABLE_NAME, session_id=thread_ts)
                    message_history.add_user_message(query)

                    qa_result = qa({"question": query, "chat_history": []})

                    print("qa_result: " + qa_result["answer"])

                    # DynamoDBにChatGPTの回答を保存
                    message_history.add_ai_message(qa_result["answer"])
                    send_slack_message(channel, qa_result["answer"], thread_ts)
                    return

                # DynamoDBから該当urlのベクトルデータが保存されているS3の情報を取得する
                item = get_from_dynamo(url)
                if item is not None:
                    # ChatGPTへQA処理
                    exec_qa(item, query, thread_ts, channel, thread_head_ts)

                else:
                    # スレッド内でURL要約が行われていないのにメンションされたことを想定
                    print("DynamoDB item not found.")
                    chat_history = get_history(thread_head_ts)
                    qa_result = qa({"question": query, "chat_history": chat_history})
                    print("qa_result: " + qa_result["answer"])
                    
                    send_slack_message(channel, qa_result["answer"], thread_ts)
        
        except Exception as e:
            # めんどくさいので一旦全てのエラーを拾う
            print(e)
            message = "予期せぬエラーが発生しました。エンジ森にお知らせください。"
            send_slack_message(channel, message, thread_ts)

お世話になった参考資料

まだ記事が少ない中大変参考になりました!大感謝🤗

14
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
22