はじめに
前回の記事では、Command R+モデルとRAGを組み合わせて質問に回答するコードを作成しました。今回は、それを応用してSlackチャットボットを作成しました。
Command R / Command R+ではレスポンスに参照ドキュメントの情報を含みます。今回のチャットボットでは、以下の仕様目指しました。
- 応答テキストをストリーミングで出力する
- 応答テキスト中に参照番号を付ける
- 参照番号に紐付く参照ドキュメント情報を出力する
デモ動画
ここでは、「クリックジャッキングの被害に遭わないために、サイト利用者が注意すべき点はどのようなものがありますか?」という質問に対して「以下の応答テキストが生成されました。
クリックジャッキングの被害に遭わないために、サイト利用者が注意すべき点として、以下のようなものがあります。
- ログイン機能を設けているウェブサイトでは、ログインした利用者からのリクエストが意図したリクエストであるかどうかを識別する仕組みを持っているかどうかを確認する。
- 重要な処理は、一連の操作をマウスのみで実行できないようにする。
- ブラウザのセキュリティ機能を活用する。 例えば、Internet Explorer 7 以外のブラウザでは、HTTP レスポンスヘッダに「X-Frame-Options: DENY」と出力することで、frame 要素や iframe 要素によるページ読み込みを制限できる。
- 重要な操作を行った際に、その旨を登録済みのメールアドレスに自動送信する。
- ウェブサイトがセッション ID を発行し、セッション管理を行っている場合は、セッション ID の発行や管理に不備がないか確認する。
応答テキストの生成後、15秒ほどして参照番号付きの応答テキストと参照番号に参照ドキュメント情報が出力されました。
いちど応答テキスト全体をストリーミングで出力後、テキストに参照番号を付与する処理を経て全体を置換しています。
実装を工夫すればもっと速く出力出来るかもしれません。
構成図
参考情報
開発環境構築
作業環境のOSバージョン
Windows 11上のWSLでUbuntu 23.04を動かしています。
$ cat /etc/os-release | grep PRETTY_NAME
PRETTY_NAME="Ubuntu 23.10"
開発環境構築
構築手順は、以前の記事と同様です。
bedrock-slack-backlog-rag-app/app.pyの構成
app.pyのみ修正しています。
bedrock-slack-app/app.p (長いので折りたたんでいます。クリックして展開)
import ast
import json
import logging
import os
import re
import time
import boto3
from botocore.exceptions import ClientError
from slack_bolt import App
from slack_bolt.adapter.aws_lambda import SlackRequestHandler
SlackRequestHandler.clear_all_log_handlers()
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
level=logging.INFO
)
logger = logging.getLogger(__name__)
class SecretsManager:
"""
Class to retrieve secrets from Secrets Manager
Attributes:
secret_name (str): The name of the secret
region_name (str): The name of the region
client (boto3.client): The client for Secrets Manager
"""
def __init__(self, secret_name, region_name):
self.secret_name = secret_name
self.region_name = region_name
self.client = boto3.client(
service_name='secretsmanager',
region_name=region_name
)
def get_secret(self, key):
"""
Retrieves the value of a secret based on the provided key.
Args:
key (str): The key of the secret to retrieve.
Returns:
str: The value of the secret.
Raises:
ClientError: If there is an error retrieving the secret.
"""
try:
get_secret_value_response = self.client.get_secret_value(
SecretId=self.secret_name
)
except ClientError as e:
raise e
secret_data = get_secret_value_response['SecretString']
secret = ast.literal_eval(secret_data)
return secret[key]
secrets_manager = SecretsManager(
secret_name=os.environ.get("SECRET_NAME"),
region_name=os.environ.get("REGION_NAME")
)
app = App(
signing_secret=secrets_manager.get_secret("SlackSigningSecret"),
token=secrets_manager.get_secret("SlackBotToken"),
process_before_response=True,
)
def insert_citations_in_order(documents, text, citations):
"""
A helper function to pretty print citations.
"""
offset = 0
document_id_to_number = {}
citation_number = 0
# Process citations, assigning numbers based on unique document_ids
for citation in citations:
citation_numbers = []
for document_id in sorted(citation["document_ids"]):
if document_id not in document_id_to_number:
citation_number += 1 # Increment for a new document_id
document_id_to_number[document_id] = citation_number
citation_numbers.append(document_id_to_number[document_id])
# Adjust start/end with offset
start, end = citation['start'] + offset, citation['end'] + offset
placeholder = ''.join([f'[{number}]' for number in citation_numbers])
# Bold the cited text and append the placeholder
modification = f'**{text[start:end]}**{placeholder}'
# Replace the cited text with its bolded version + placeholder
text = text[:start] + modification + text[end:]
# Update the offset for subsequent replacements
offset += len(modification) - (end - start)
# Prepare citations for listing at the bottom, ensuring unique document_ids are listed once
unique_citations = {number: doc_id for doc_id, number in document_id_to_number.items()}
citation_list = '\n'.join([f'[{doc_id}] <{documents[doc_id - 1]["FileURL"]}|{documents[doc_id - 1]["FileTitle"]}> {documents[doc_id - 1]["snippet"][:50]}...' for doc_id, number in sorted(unique_citations.items(), key=lambda item: item[1])])
text_with_citations = f'{text}\n\n{"-" * 10}\n{citation_list}'
# logging.info("text_with_citations: %s", text_with_citations)
return text_with_citations
def generate_message(bedrock, agent, model_id, knowledge_base_id, input_text, channel, ts):
# Step 1: ユーザー入力テキストを基に検索クエリーを生成する
request_body = json.dumps({
"temperature": 0.3, # default 0.3
"p": 0.75, # default 0.75
"k": 0, # default 0
"max_tokens": 500,
# "chat_history" is not used for "search_queries_only" with empty []
"message": input_text,
"search_queries_only": True,
})
response = bedrock.invoke_model(
body=request_body,
contentType="application/json",
accept="application/json",
modelId="cohere.command-r-v1:0",
)
response_body = json.loads(response.get("body").read())
search_queries = response_body["search_queries"]
logging.info("input_text: %s", input_text)
logging.info("Search queries: %s", search_queries, )
# Step 2: Retrieverに対してクエリーを実行して検索結果を得る
documents = []
for query in search_queries:
query_text = query["text"]
response = agent.retrieve(
knowledgeBaseId=knowledge_base_id,
retrievalQuery={
"text": query_text
},
retrievalConfiguration={
"vectorSearchConfiguration": {
"numberOfResults": 5
}
},
)
retrieval_results = response['retrievalResults']
logging.debug("response: %s", response)
logging.debug("retrieval_results: %s", retrieval_results)
for result in retrieval_results:
snippet = result['content']['text']
PageTitle = result['metadata']['PageTitle']
PageURL = result['metadata']['PageURL']
FileTitle = result['metadata']['FileTitle']
FileURL = result['metadata']['FileURL']
documents.append(
{
"title": query_text,
"snippet": snippet,
"PageTitle": PageTitle,
"PageURL": PageURL,
"FileTitle": FileTitle,
"FileURL": FileURL
}
)
logging.debug("documents: %s", documents)
# Step 3: ユーザー入力テキストと検索結果を基に応答テキストを生成する
system_instruction = """
Let's think step by step
Take a deep breath
あなたは知識豊富なチャットアシスタントです。あなたにはユーザーが知りたい情報に関連する複数のドキュメントの抜粋が提供されます。
これらの情報をもとに、ユーザーの質問に対する回答を提供してください。質問に答えるための情報がない場合は、「情報が不十分で回答できません」と答えてください。
回答のルールが<rules></rules>にあります。 ルールを理解してください。ルールは必ず守ってください。
<rules>
<rule>日本語で回答を提供すること。</rule>
<rule>質問者は、Webサイトの開発や運用に携わるエンジニアです。</rule>
<rule>質問者に対して詳細な説明をエンジニア向けの内容、用語で提供すること。</rule>
<rule>質問に対する回答に複数の可能性が含まれる場合、それぞれの可能性について検討し詳細な回答を提供すること。</rule>
<rule>回答には、解決策や対応策の例を含ること。例は質問者の理解を助けます。</rule>
<rules>
"""
request_body = json.dumps({
"preamble": system_instruction,
"message": input_text,
"documents": documents,
"temperature": 0.3, # default 0.3
"p": 0.75, # default 0.75
"k": 0, # default 0
"max_tokens": 1000,
})
logging.info("request_body: %s", request_body)
response = bedrock.invoke_model_with_response_stream(
body=request_body,
contentType="application/json",
accept="application/json",
modelId=model_id,
)
# ストリーミング出力
for event in response["body"]:
chunk = json.loads(event["chunk"]["bytes"])
if chunk['event_type'] == 'text-generation':
yield chunk['text']
logging.info("chunk: %s", chunk)
# 回答テキストに参照番号を付与し、チャットに出力した内容を上書き更新
app.client.chat_update(
channel=channel,
ts=ts,
text=insert_citations_in_order(documents, chunk["response"]["text"], chunk["response"]["citations"]),
)
def create_message_blocks(text):
"""
Creates the message blocks for updating the Slack message.
Args:
text (str): The updated text for the Slack message.
Returns:
list: The message blocks for updating the Slack message.
"""
message_context = "生成される情報は不正確な場合があります。"
message_blocks = [
{"type": "section", "text": {"type": "mrkdwn", "text": text}},
{"type": "divider"},
{"type": "context", "elements": [{"type": "mrkdwn", "text": message_context}]},
]
return message_blocks
def handle_app_mentions(event, say):
"""
Handle app mentions in Slack.
Args:
event (dict): The event data containing information about the mention.
say (function): The function used to send a message in Slack.
Returns:
None
"""
channel = event["channel"]
thread_ts = event["ts"]
input_text = re.sub("<@.*> ", "", event["text"])
userid = event["user"]
interval = 0.4
update_count = 0
last_send_time = time.time()
result = say("\n\nお待ちください...", thread_ts=thread_ts)
ts = result["ts"]
last_post_text = ""
text = "<@" + userid + "> "
bedrock = boto3.client(service_name='bedrock-runtime', region_name="us-east-1")
agent = boto3.client(service_name="bedrock-agent-runtime", region_name="us-east-1")
model_id = 'cohere.command-r-plus-v1:0'
knowledge_base_id = os.environ.get("KNOWLEDGE_BASE_ID")
for sentence in generate_message(bedrock, agent, model_id, knowledge_base_id, input_text, channel=channel, ts=ts):
text += sentence
now = time.time()
if (now - last_send_time) > interval:
last_send_time = now
update_count += 1
if update_count / 10 > interval:
interval = interval * 2
last_post_text = text
# message_blocks = create_message_blocks(text)
app.client.chat_update(
channel=channel,
ts=ts,
text=text,
# blocks=message_blocks
)
if last_post_text != text:
# message_blocks = create_message_blocks(text)
app.client.chat_update(
channel=channel,
ts=ts,
text=text,
# blocks=message_blocks
)
def respond_to_slack_within_3_seconds(ack):
"""
Responds to a Slack message within 3 seconds.
Parameters:
- ack: A function to acknowledge the Slack message.
Returns:
None
"""
ack()
app.event("app_mention")(
ack=respond_to_slack_within_3_seconds,
lazy=[handle_app_mentions]
)
def lambda_handler(event, context):
"""
Lambda function handler for processing Slack events.
Args:
event (dict): The event data passed to the Lambda function.
context (object): The runtime information of the Lambda function.
Returns:
dict: The response data to be returned by the Lambda function.
"""
print(event)
retry_counts = event.get("multiValueHeaders", {}).get("X-Slack-Retry-Num", [0])
if retry_counts[0] != 0:
logging.info("Skip slack retrying(%s).", retry_counts)
return {}
slack_handler = SlackRequestHandler(app=app)
return slack_handler.handle(event, context)
metadataを追加
Bedrockでは、ナレッジベース内の各ファイルに対してカスタムメタデータファイルを指定できます。メタデータファイルはソースデータファイルと同じ名前に .metadata.json のサフィックスを付けたメタデータファイルで、ソースデータファイルと一緒に配置します。
今回、たとえば000017316.pdfというソースデータファイルに対して以下のような000017316.pdf.metadata.jsonを作成し、ソースデータファイルと同一バケットに配置しました。
{
"metadataAttributes": {
"PageURL": "https://www.ipa.go.jp/security/vuln/websecurity/about.html",
"PageTitle": "安全なウェブサイトの作り方",
"FileURL": "https://www.ipa.go.jp/security/vuln/websecurity/ug65p900000196e2-att/000017316.pdf",
"FileTitle": "安全なウェブサイトの作り方 (全115ページ)(PDF:2.2 MB)",
"Category": "websecurity"
}
}
これにより、ナレッジベースに対するretrieveの結果にmetadataの内容が含まれるようになります。metadataを以下のようにdocumentsへ追加し、参照ドキュメントを作成する関数に渡すことでデモ動画にあるような参照ドキュメント情報の一覧を出力することができます。
{
"preamble": "\nLet's think step by step\nTake a deep breath\nあなたは知識豊富なチャットアシスタントです。",
"message": "携帯サイトの構築ではどのような点に注意すべきか。",
"documents": [
{
"title": "携帯サイトの構築",
"snippet": "このような変化の中で、古くから存在する携帯ウェブ独自のノウハウを適用してウェブサイトを作ることは、ウェブサイトの安全を損なう原因になることがあります。",
"PageTitle": "安全なウェブサイトの作り方",
"PageURL": "https://www.ipa.go.jp/security/vuln/websecurity/about.html",
"FileTitle": "安全なウェブサイトの作り方 (全115ページ)(PDF:2.2 MB)",
"FileURL": "https://www.ipa.go.jp/security/vuln/websecurity/ug65p900000196e2-att/000017316.pdf"
}
]
}
出力例) FileTitle, FileURL, snippetの先頭50文字を出力しています。
[1] 安全なウェブサイトの作り方 (全115ページ)(PDF:2.2 MB) .....................................................
[2]安全なウェブサイトの作り方 (全115ページ)(PDF:2.2 MB) このような変化の中で、古くから存在する携帯ウェブ独自のノウハウを適用してウェブサイトを作ること ...
[3] 安全なウェブサイトの作り方 (全115ページ)(PDF:2.2 MB) この差異を悪 用されると、脆弱性を悪用した攻撃を WAF で検出できない場合があります。 ...
[4] ウェブサイト運営のファーストステップ~ウェブサイト運営者がまず知っておくべき脅威と責任~(PDF:1.7 MB) ウェブサイト運営のファーストステップ ~ウェブサイト運営者がまず知っておくべき 脅威と責任...
[5] ウェブサイト運営者のための脆弱性対応ガイド(PDF:1.2 MB) ウェブサイトで起こるトラブル .....................................
あまり期待したとおりに動作しなかった点
- 参照ドキュメントの情報を付与した応答テキストの文字数が多すぎると msg_too_longエラーになる。
- もとの応答テキストの長さは"max_tokens": 2000などで制御できたものの、参照ドキュメントの情報はさらに文字列を追加するので、それを加味して出力処理を実装する必要がある。
- 参照ドキュメントの情報を付与した応答テキストの出力が遅い。
- これは多分、ストリーミング出力(invoke_model_with_response_stream)のレスポンスを使い回しているからかもしれない。
- 参照ドキュメント情報を付与した応答テキストの一部が文字化けする
- 文字列を置換する際にデータが壊れるのかもしれない。この処理は、cohere-aiのリポジトリにあるサンプルコードを使いました。
まとめ
boto3のinvoke_model_with_response_streamや、cohere-aiのリポジトリにあるサンプルコードを利用することで、ある程度期待した動作をするチャットボットをつくることができました。
ストリーミング出力にこだわったことで実装にやや苦労しました。様々なモデルにおいて処理速度が向上しているので、今後はストリーミングで体感速度を速くするようなことは気にしなくて良いのかもしれません。