動作イメージ
こんな感じのチャットボットアプリを AWS App Runner でサクッとデプロイできます。Chainlit × LangChain × Bedrock (Claude 3) の例があまり見当たらなかったので作成しました。
画像入力は Claude 3 モデルのみの対応ですが、Chainlit の Chat settings 機能を通じて Bedrock が提供する他社の Text モデルに切り替えて利用することもできます。会話履歴はセッション内でのみ保持しています。
ソースコードは以下で公開しています。
実際には以下の記事で紹介しているコードを最新のライブラリバージョンや LCEL (LangChain Expression Language) などに対応させた形でアップデートしたものです。
動作環境
以下の環境で動作確認をしています。
- AWS リージョン: us-east-1
- AWS Copilot CLI: v1.33.3
- Python: v3.11.9
- boto3: v1.34.87
- langchain: 0.1.16
- langchain-aws: 0.1.0
- pillow: 10.3.0
ソースコード
Chainlit を使用しています。Chainlit は会話型 AI を構築するためのオープンソースの Python フレームワークです。
Chainlit は OpenAI のライブラリや LangChain, LlamaIndex のような人気のフレームワークとの Integration を提供しており、簡単にチャットボットアプリケーションに組み込むことができます。
作成したアプリケーションコードは以下です。
解説は ソースコードの補足 を参照してください。
import io
import os
import re
from base64 import b64encode
from operator import itemgetter
import boto3
from PIL import Image
import chainlit as cl
from chainlit.input_widget import Select, Slider
from langchain.memory import ConversationBufferMemory
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda
from langchain.schema.runnable.config import RunnableConfig
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage
AWS_REGION = os.environ["AWS_REGION"]
PATTERN = re.compile(r'v\d+(?!.*\d[kK]$)')
PROVIDER = ""
TOKEN_PARAM_BY_PROVIDER = {
"ai21": "maxTokens",
"amazon": "maxTokenCount",
"meta": "max_gen_len",
"default": "max_tokens" # Anthropic, Cohere, Mistral
}
@cl.on_chat_start
async def main():
bedrock = boto3.client("bedrock", region_name=AWS_REGION)
response = bedrock.list_foundation_models(
byOutputModality="TEXT"
)
# オンデマンドスループットのモデル ID のみをリスト化
model_ids = [
item['modelId']
for item in response["modelSummaries"]
if PATTERN.search(item['modelId'])
]
settings = await cl.ChatSettings(
[
Select(
id="Model",
label="Amazon Bedrock - Model",
values=model_ids,
initial_index=model_ids.index(
"anthropic.claude-3-sonnet-20240229-v1:0"
),
),
Slider(
id="Temperature",
label="Temperature",
initial=0.3,
min=0,
max=1,
step=0.1,
),
Slider(
id="MAX_TOKEN_SIZE",
label="Max Token Size",
initial=1024,
min=256,
max=8192,
step=256,
),
]
).send()
await setup_runnable(settings)
@cl.on_settings_update
async def setup_runnable(settings):
cl.user_session.set(
"memory", ConversationBufferMemory(return_messages=True)
)
memory = cl.user_session.get("memory")
bedrock_model_id = settings["Model"]
llm = ChatBedrock(
model_id=bedrock_model_id,
model_kwargs={"temperature": settings["Temperature"]}
)
# モデルによってトークンサイズの指定方法が異なる
global PROVIDER
PROVIDER = bedrock_model_id.split(".")[0]
token_param = TOKEN_PARAM_BY_PROVIDER.get(
PROVIDER, TOKEN_PARAM_BY_PROVIDER["default"]
)
llm.model_kwargs[token_param] = int(settings["MAX_TOKEN_SIZE"])
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful chatbot"),
MessagesPlaceholder(variable_name="history"),
MessagesPlaceholder(variable_name="human_message")
]
)
runnable = (
RunnablePassthrough.assign(
history=RunnableLambda(memory.load_memory_variables) | itemgetter("history")
)
| prompt
| llm
| StrOutputParser()
)
cl.user_session.set("runnable", runnable)
def encode_image_to_base64(image, image_format):
buffer = io.BytesIO()
image.save(buffer, format=image_format)
return b64encode(buffer.getvalue()).decode("utf-8")
@cl.on_message
async def on_message(message: cl.Message):
memory = cl.user_session.get("memory")
runnable = cl.user_session.get("runnable")
# Anthropic モデルの場合のみ、画像を処理する
if PROVIDER == "anthropic":
content = []
for file in (message.elements or []):
if file.path and "image" in file.mime:
image = Image.open(file.path)
bs64 = encode_image_to_base64(
image,
file.mime.split('/')[-1].upper() # 画像フォーマットを渡す
)
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": file.mime,
"data": bs64
}
})
content_text = {"type": "text", "text": message.content}
content.append(content_text)
runnable_message_data = {"human_message": [HumanMessage(content=content)]}
else:
runnable_message_data = {"human_message": [HumanMessage(content=message.content)]}
res = cl.Message(content="", author=f'Chatbot: {PROVIDER.capitalize()}')
async for chunk in runnable.astream(
runnable_message_data,
config=RunnableConfig(callbacks=[cl.LangchainCallbackHandler()]),
):
await res.stream_token(chunk)
await res.send()
memory.chat_memory.add_user_message(message.content)
memory.chat_memory.add_ai_message(res.content)
開始方法
事前準備
事前に Bedrock のモデルアクセスの有効化は済ませておく必要があります。
AWS Copilot CLI を使用してデプロイします。
sudo curl -Lo /usr/local/bin/copilot https://github.com/aws/copilot-cli/releases/latest/download/copilot-linux && sudo chmod +x /usr/local/bin/copilot
デプロイ
冒頭の GitHub リポジトリにはチャットボットアプリケーションを AWS App Runner にデプロイするためのマニフェストファイルが含まれています。以下の手順でデプロイできます。Copilot CLI を使用せずに Dockerfile で手動でイメージをビルドし、任意のインフラにデプロイすることも可能です。
git clone https://github.com/hayao-k/Bedrock-AIChatbot-Sample
cd Bedrock-AIChatbot-Sample
export AWS_REGION=us-east-1
copilot app init bedrockchat-app
copilot deploy --name bedrockchat --env dev
デプロイが成功したらメッセージに出力された URL へアクセスします。以下のような画面が表示されたらデプロイ成功です!
アプリケーションの削除
環境を削除する際は以下のコマンドを実行します。CloudWatch Logs group は残存してしまうので手動で削除してください。
copilot app delete
ローカルでの起動方法
以下のコマンドを実行し、http://localhost:8000
にアクセスします。
pip install -r requirements.txt
export AWS_REGION=us-east-1
chainlit run app.py
ソースコードの補足
main() 関数
on_chat_start
デコレータによって、チャットが開始されたときに呼び出されます。ユーザーが Chainlit へ接続するたびに新しいチャットセッションが作成されます。
Bedrock の ListFoundationModels API から TEXT モデルの ID リストを取得し、Chainlit の ChatSettings クラスを通じてモデル ID や Temperature といったパラメーターをユーザーが UI から設定できるようにしています。
@cl.on_chat_start
async def main():
bedrock = boto3.client("bedrock", region_name=AWS_REGION)
response = bedrock.list_foundation_models(
byOutputModality="TEXT"
)
# オンデマンドスループットのモデル ID のみをリスト化
model_ids = [
item['modelId']
for item in response["modelSummaries"]
if PATTERN.search(item['modelId'])
]
settings = await cl.ChatSettings(
[
Select(
id="Model",
label="Amazon Bedrock - Model",
values=model_ids,
initial_index=model_ids.index(
"anthropic.claude-3-sonnet-20240229-v1:0"
),
),
Slider(
id="Temperature",
label="Temperature",
initial=0.3,
min=0,
max=1,
step=0.1,
),
Slider(
id="MAX_TOKEN_SIZE",
label="Max Token Size",
initial=1024,
min=256,
max=8192,
step=256,
),
]
).send()
await setup_runnable(settings)
setup_runable() 関数
on_settings_update
デコレータによって、チャットボットの設定が更新されたときに呼び出されます。
@cl.on_settings_update
async def setup_runnable(settings):
cl.user_session.set(
"memory", ConversationBufferMemory(return_messages=True)
)
memory = cl.user_session.get("memory")
bedrock_model_id = settings["Model"]
llm = ChatBedrock(
model_id=bedrock_model_id,
model_kwargs={"temperature": settings["Temperature"]}
)
# モデルによってトークンサイズの指定方法が異なる
global PROVIDER
PROVIDER = bedrock_model_id.split(".")[0]
token_param = TOKEN_PARAM_BY_PROVIDER.get(
PROVIDER, TOKEN_PARAM_BY_PROVIDER["default"]
)
llm.model_kwargs[token_param] = int(settings["MAX_TOKEN_SIZE"])
prompt = ChatPromptTemplate.from_messages(
[
("system", "You are a helpful chatbot"),
MessagesPlaceholder(variable_name="history"),
MessagesPlaceholder(variable_name="human_message")
]
)
runnable = (
RunnablePassthrough.assign(
history=RunnableLambda(memory.load_memory_variables) | itemgetter("history")
)
| prompt
| llm
| StrOutputParser()
)
cl.user_session.set("runnable", runnable)
setup_runnable() 関数では以下のような処理が実行されます。
- ConversationBufferMemory オブジェクトを作成し、ユーザーセッションに保存
- ChatSettings で指定されたモデル ID と Temperature を指定して、ChatBedrock を生成
- モデルプロバイダに応じて、最大トークンサイズのパラメータ名を設定
- プロンプトテンプレートを作成
- RunnablePassthrough と RunnableLambda を使って、チャットの履歴とプロンプトを結合し、言語モデルに渡すための Runnnable インスタンスをセットアップ
- 作成した runnable をユーザーセッションに保存
langchain-community では ChatModel の名称は BedrockChat でしたが、先日 langchain-aws という AWS とのIntegration パッケージがリリースされ、名称も ChatBedrock に変更されています。
今後、langchain-community に含まれる AWS コンポーネントは、langchain-aws 置き換えられ、拡張されていくようです。
2024/4/23 時点で ChatBedrock では AI21 Labs と Cohere がサポートされていません。これらのモデルを選択してメッセージを送信するとエラーが応答されます。
on_message() 関数
on_message
デコレータによって、ユーザーがメッセージを送信したときに呼び出されます。
@cl.on_message
async def on_message(message: cl.Message):
memory = cl.user_session.get("memory")
runnable = cl.user_session.get("runnable")
# Anthropic モデルの場合のみ、画像を処理する
if PROVIDER == "anthropic":
content = []
for file in (message.elements or []):
if file.path and "image" in file.mime:
image = Image.open(file.path)
bs64 = encode_image_to_base64(
image,
file.mime.split('/')[-1].upper() # 画像フォーマットを渡す
)
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": file.mime,
"data": bs64
}
})
content_text = {"type": "text", "text": message.content}
content.append(content_text)
runnable_message_data = {"human_message": [HumanMessage(content=content)]}
else:
runnable_message_data = {"human_message": [HumanMessage(content=message.content)]}
res = cl.Message(content="", author=f'Chatbot: {PROVIDER.capitalize()}')
async for chunk in runnable.astream(
runnable_message_data,
config=RunnableConfig(callbacks=[cl.LangchainCallbackHandler()]),
):
await res.stream_token(chunk)
await res.send()
memory.chat_memory.add_user_message(message.content)
memory.chat_memory.add_ai_message(res.content)
on_message() 関数では以下のような処理が実行されます。
- ユーザーセッションから memory と runnable オブジェクトを取得
- モデルプロバイダが Anthropic の場合、画像が添付されていれば Base64 エンコーディングし、テキストメッセージと一緒に content へ格納
- モデルプロバイダが Anthropic 以外の場合はテキストメッセージのみを利用
- runnable.astream() を使って、作成したデータを言語モデルに渡し、応答を生成
- 生成された応答をチャンクごとにストリーミングしながら表示
- 応答が完了したら、ユーザーのメッセージと生成された応答を memory に追加
Chainlit にはマルチモーダルの機能が組み込まれており、UI からファイルを添付できます。
Claude 3 Haiku に添付画像から俳句を詠んでもらいました。。超高速にストリーミングレスポンスが返ってきています。
応答完了後に対話の履歴を memory に保存することで、次の質問でその履歴を参照できるようにしています。
以上です。
参考になれば幸いです。