4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS BedrockでストリーミングRAG

Posted at

はじめに

AWS BedrockでKnowledgeBaseを使う場合、retrieve_and_generateメソッドを利用すると思いますが、ストリーミングに対応していなさそうだったので、やり方を考えてみました。

※こんな感じになります。(KnowledgeBaseには先日OpenAIが発表した動画生成AI「sora」の紹介記事を登録しています。)

streamlit-app-2024-02-27-21-02-42-online-video-cutter.com-1.gif

やり方

ユーザーからのプロンプトからKnowledgeBaseを検索し、その結果をもとにmodelを呼び出す形にします。
(KnowledgeBaseからの返却値は適宜利用しやすい形にしてください。)
※UIはStreamlitを使っています。

import streamlit as st
import boto3
import json


def main():
    """メイン関数はStreamlitアプリでのチャットのやり取りを処理します。

    この関数はチャットインターフェースを初期化し、ユーザー入力を処理し、AIモデルからのレスポンスを表示します。
    """
    # セッション状態にチャットメッセージがない場合は初期化
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # 既存のチャットメッセージを表示
    if st.session_state.messages:
        for info in st.session_state.messages:
            with st.chat_message(info["role"]):
                st.write(info["content"])

    # 新しいユーザー入力を処理
    if prompt := st.chat_input(""):
        # ユーザーメッセージをセッション状態に追加
        st.session_state.messages.append({"role": "Human", "content": prompt})
        # ユーザーのメッセージを表示
        with st.chat_message("Human"):
            st.write(prompt)

        # AIのレスポンスを準備して表示
        with st.chat_message("Assistant"):
            message_placeholder = st.empty()  # 動的更新のためのプレースホルダー
            with st.spinner("検索中..."):
                response = invoke_model(prompt)

            full_response = ""  # レスポンスを累積するための変数を初期化
            if stream := response.get("body"):
                for event in stream:
                    if chunk := event.get("chunk"):
                        # 各チャンクのレスポンスをデコードして処理
                        full_response += json.loads(chunk.get("bytes").decode("utf-8"))[
                            "completion"
                        ]
                        # 部分レスポンスでプレースホルダーを更新
                        message_placeholder.markdown(full_response + "")

                # 完全なレスポンスを最終的に表示
                message_placeholder.write(full_response)

        # AIのレスポンスをセッション状態に追加
        st.session_state.messages.append(
            {"role": "Assistant", "content": full_response}
        )


def invoke_model(prompt):
    """AIモデルを呼び出して、ユーザーのプロンプトと既存の会話コンテキストに基づいたレスポンスを生成します。

    引数:
        prompt: ユーザーの入力プロンプト。
        message_placeholder: AIのレスポンスを動的に表示するためのStreamlitプレースホルダー。

    戻り値:
        AIモデルが生成したレスポンス。
    """
    # Bedrock runtime用のAWS Boto3クライアントを初期化
    retrieve_bedrock = boto3.client(
        service_name="bedrock-agent-runtime", region_name="us-east-1"
    )
    bedrock = boto3.client(service_name="bedrock-runtime", region_name="us-east-1")

    # 知識ベースから情報を取得
    docs_info = retrieve_bedrock.retrieve(
        knowledgeBaseId=st.secrets["KNOWLEDGE_ID"],
        retrievalQuery={"text": prompt},
    )

    # モデルプロンプト用のメッセージをコンパイル
    messages = [
        f'{message["role"]}:{message["content"]}'
        for message in st.session_state.messages
    ]

    # モデル呼び出し用のボディをフォーマット
    body = json.dumps(
        {
            "prompt": "\n\n".join(messages)
            + "\n\n"
            + "info:"
            + json.dumps(docs_info["retrievalResults"])
            + "Assistant:",
            "max_tokens_to_sample": 1000,
        }
    )

    # モデルを呼び出し、レスポンスストリームを処理
    response = bedrock.invoke_model_with_response_stream(
        modelId="anthropic.claude-v2:1", body=body
    )

    return response


if __name__ == "__main__":
    main()

おわりに

今回初めてKnowledgeBaseを使いましたが、独自ファイルをS3にアップロードして読み込むだけでベクターDB作ってくれてすぐ使えるのは便利だなと思いました。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?