1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

boto3 の invoke_agent ストリーミングを別スレッドで非同期呼び出し

Last updated at Posted at 2025-02-05

Amazon Bedrock Agent を実行する boto3 の invoke_agent のストリーミングを非同期呼び出ししたのでその方法を紹介します。なお、ここでいう非同期呼び出しは Python の asyncio のことです。非同期呼び出しの手段には aioboto3 パッケージなどもありますが今回は、boto3 を別スレッドで実行する方法を採用しました。

動機と問題設定

LiteLLM は、OpenAI プロキシ サーバー (LLM ゲートウェイ) として統一されたインターフェイスで 100 以上の LLM を呼び出すことができますCustom API Server (Custom Format) 機能で独自の処理を組み込むこともできます。

今回は、その機能を用いて Amazon Bedrock Agent を呼び出しましたがそのために非同期呼び出しが必要でした。でも boto3 の invoke_agent は非同期呼び出しに対応していません。そのまま同期的に呼び出すと asyncio のイベントループは応答が返されるまでブロックされてしまいます。

そんなときは asyncio.to_thread でイベントループのブロックを回避できます。asyncio.to_thread は同期関数を別スレッドで実行し非同期的に呼び出します。

まず、同期的に boto3 を呼び出す関数 completion() を定義します。

import asyncio
import logging
import boto3

AGENT_ID = "<エージェントID>"
AGENT_ALIAS_ID = "<エージェントエイリアスID>"
AWS_REGION = "<エージェントのリージョン>"


def completion(input_text, session_id):
    """boto3 invoke_agent() を同期的に呼び出し"""
    client = boto3.client('bedrock-agent', region_name=AWS_REGION)

    response = client.invoke_agent(
        agentId=AGENT_ID,
        agentAliasId=AGENT_ALIAS_ID,
        sessionId=session_id,
        inputText=input_text,
        enableTrace=False,
    )

    for event in response["completion"]:
        chunk = event.get("chunk")
        if chunk:
            output_text = chunk["bytes"].decode()
            return output_text

この関数 completion()asyncio.to_thread() を用いて別スレッドで非同期的に実行します。そうすればイベントループのブロックを回避できます。イベントループとは異なるスレッドで同期的呼び出しはブロックされるからです。

async def acompletion(input_text, session_id):
    """関数 completion() を別スレッドで非同期的に呼び出す"""
    output_text = await asyncio.to_thread(completion, input_text, session_id)
    logging.info("output_text: %s", output_text)

でも、この方法は一回限りの応答を得る場合に有効で、ストリーミングのように断続的な応答を得ることはできません。それがここで取り上げる問題です。

invoke_agent のストリーミング

invoke_agent の引数 streamingConfigurationsstreamFinalResponseTrue を指定すると invoke_agent は可能ならストリーミング応答を返します。ストリーミングが可能な条件については invoke_agent のドキュメント を参照ください。エージェントにパーミッション bedrock:InvokeModelWithResponseStream も必要です。

ストリーミングを有効にした場合、次の例のようにジェネレータ等を用いて応答を都度、返したいです。

def streaming(input_text, session_id):
    """boto3 invoke_agent() をストリーミングで呼び出し"""
    client = boto3.client('bedrock-agent', region_name=AWS_REGION)

    response = client.invoke_agent(
        agentId=AGENT_ID,
        agentAliasId=AGENT_ALIAS_ID,
        sessionId=session_id,
        inputText=input_text,
        enableTrace=False,
        streamingConfigurations={
            'streamFinalResponse': True  # ストリーミングを有効化
        }
    )

    for event in response["completion"]:
        chunk = event.get("chunk")
        if chunk:
            output_text = chunk["bytes"].decode()
            # ジェネレータで都度、応答を返す
            yield output_text

しかし、この例のように同期的なジェネレータを返すと次のコード例のようにジェネレータの forループ はイベントループと同一のスレッドで実行されることとなり、ストリーミングの応答を待つ間イベントループはブロックされてしまいます。

async def astreaming(input_text, session_id):
    """関数 streaming() を別スレッドで非同期的に呼び出す"""
    generator = await asyncio.to_thread(streaming, input_text, session_id)
    for stream_text in generator:  # asyncio のイベントループはブロックされる
        logging.info("stream_text: %s", stream_text)

イベントループをブロックせずに boto3 の invoke_agent() からストリーミングの応答を得るためには、boto3 を実行するスレッドとイベントループのスレッド間で非同期呼び出しの通信が必要です。

ここでは asyncio キュー(asyncio.Queue)を用いてストリーミングの応答を受け渡しします。つまりストリーミングの応答を一度、キューへ書き込み、非同期的にそのキューから応答を読み出します。

asyncio.Queue は通常、同一のイベントループ(つまり同一のスレッド)からでないとアクセスできませんが、asyncio.run_coroutine_threadsafe を使えば外部スレッドからイベントループに非同期処理をスケジュールできます。これを使って別スレッドから asyncio.Queue へ書き込みます。

コード例

async def astreaming(input_text, session_id):
    # キュー
    queue = asyncio.Queue()

    # 非同期:キューへ書込み
    async def write_queue(data):
        await queue.put(data)

    # 同期:ストリーミング応答を受信したらキューへ書込み
    def relay(loop: asyncio.AbstractEventLoop):
        for chunk in streaming(input_text, session_id):
            # 非同期キューへの書込みをイベントループ loop でスケジュール
            future = asyncio.run_coroutine_threadsafe(write_queue(chunk['text']), loop)
            future.result()
        # キューを終了
        queue.shutdown(immediate=False)
        return

    # 同期呼び出しを別スレッドで実行
    task = asyncio.create_task(asyncio.to_thread(relay, asyncio.get_running_loop()))
    try:
        while True:
            stream_text = await queue.get()
            logging.info("stream_text: %s", stream_text)
    except asyncio.QueueShutDown:
        logging.info("ストリーム終了")
    finally:
        await task
    return

処理の説明

  1. キューを作成します
  2. キューへ書き込む非同期関数 write_queue() を作成します
  3. 同期関数 relay() はジェネレータstreaming() から得たストリーミングの応答をキューへ書き込みます。その際、asyncio.run_coroutine_threadsafe() を使って非同期関数 write_queue() をイベントループでスケジュールします。「実行」ではなく「スケジュール」なのは直ちに実行されるとは限らないからです。戻り値の future.result() で完了を待つことができます
  4. 別スレッドのタスクとして 同期関数 relay() を実行します。その際、asyncio.get_running_loop() で現在のイベントループを渡します
  5. キューから値を読み出します

以上です。
最後までお読みいただきありがとうございます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?