0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Bedrockで作ったチャットボットにMCPサーバを繋ぐ方法

Posted at

1. はじめに

 ここではBedrockで作ったチャットボットにMCPサーバを繋ぐ方法について説明します。以前の私の記事ではBedrockを使って弊社のマイクロソフト関連のヘルプデスク部門で利用するチャットボットを作成し実際にヘルプデスク部門で使って生じた問題点や新たな課題について説明しました。このチャットボットを作成する上で最も大変だった事はエージェントをマイクロソフトのMCPサーバと接続するところでした。そこで、ここではその詳細を解説します。AWS Lambda上で動作するMCPクライアントを作りました、つまり、MCPプロトコル(JSON-RPC+SSE)を使ってMCPサーバと通信し、検索結果をBedrockエージェントに返す処理を実装しました。BedrockのエージェントはMCPサーバと直接やり取りする機能を有してないため、このような実装が必要となります。
 ちなみにAgentCoreはMCPサーバとの統合をネイティブにサポートしているようなので、MCPサーバとのやり取りの実装はもっと簡単になるかと思います。ただし、エージェントはGUI+APIで構築できるのに対して、AgentCoreはGUIではなくSDKでコーディングが必要になります。

2. 作成したヘルプデスクのチャットボットの概要

詳細は私の記事をご覧ください。現在、弊社のマイクロソフト製品のサポート業務で利用しているチャットボットです。ヘルプデスク担当者は弊社の社員からマイクロソフト製品に関する質問(使い方やトラブルの解決方法など)を受けて、回答を作成して質問者へ返します。担当者は質問をチャットボットに投げて品質の良いドラフト(回答の下書き)を得て、それに僅かな修正を加えるだけで質問者に十分な品質の回答を短時間で返します。担当者がゼロから回答を作成するのではなくて生成AIにある程度の精度の高い回答(ドラフト)を作成してもらいそれを修正・確認して質問者に回答を返すことで迅速で正確な回答を返す事を実現します。作成したチャットボットを以下の図に示します。

図1.png

エージェントは質問に対してドラフトを作成するための元となる情報をナレッジベースとマイクロソフトのMCPサーバから獲得します。BedrockのエージェントにはMCPサーバと直接やり取りする機能は持っていませんので、エージェントが利用できるActionGroupを設定し、その中で実行するLambda関数の中にpython言語を使ってMCPサーバとやり取りするためのMCPクライアントのコードを実装します。また、そのコードを実行するために必要なパッケージの一部はAWS環境にないため、自分の手元に必要なパッケージをダウンロードしてzipでまとめてAWS環境にアップロードして設定する必要もあります。それらのやり方を順を追って説明します。

3. Lambda関数の作成とそのLambda関数を実行するために必要なパッケージの準備

3.1 Lambda関数の作成

 MCPサーバと連携するためにLambda関数をpython言語で作成します。以下は主なプログラムの解説です。

import json
import logging
import asyncio
import aiohttp
import os
from typing import Dict, Any, List
from aws_lambda_powertools import Logger, Tracer, Metrics
from aws_lambda_powertools.metrics import MetricUnit

必要なパッケージをインポートします。

ACTION_GROUP_NAME = "action_group_test3"
FUNCTION_NAME = "func-test3"

bedrockのエージェント側で設定する値を代入します。エージェントは複数のActionGroupを使い分けます。このLambda関数はどのActionGroupのどの関数かをここで記述します。4章で詳細を説明します。

class MCPConnector:
    def __init__(self, endpoint: str):
        self.endpoint = endpoint
        self.session = None

    async def initialize(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=25),
            headers={
                'Content-Type': 'application/json',
                'User-Agent': 'AWS-Lambda-MCP-Client/1.0'
            }
        )

__init__はコンストラクタでエンドポイントを設定し、セッションを初期化します。initialize() はメソッドで非同期のHTTPクライアントのセッションを作成し、タイムアウトは25秒に設定し、MCPサーバーへ送るリクエストに付与するヘッダの中身を記述します。

    async def search_docs(self, question: str) -> List[Dict]:

非同期のメソッド search_docsを定義します。文字列の質問を受け取って辞書のリストを返します。

	if not self.session:
            await self.initialize()

セッションがまだ用意されてないならば初期化を行います。

        request_data = {
            "jsonrpc": "2.0",
            "id": "search-request",
            "method": "tools/call",
            "params": {
                "name": "microsoft_docs_search",
                "arguments": {
                    "question": question
                }
            }
        }

request_dataはMCPが採用しているJSON‑RPC 2.0 形式のリクエストの中身です。idは識別子です。methodはMCPサーバでツールを実行することを指示します。paramsは実行したいツール名とそのツールに渡す引数です。

            async with self.session.post(self.endpoint, json=request_data) as response:

request_dataをボディにしたHTTPのPOSTリクエストをself.endpoint宛に送ります。

                if response.status == 200 and response.headers.get("Content-Type", "").startswith("text/event-stream"):

もし、statusが200(成功)かつヘッダのContent-Typeがevent-streamであるならば、受信したパケットはSSEパケットです。次にSSEプロトコルに従ってパケットの中身を分解して整理します。

                    results = []
                    buffer = ""
                    async for line in response.content:
                        decoded = line.decode("utf-8").strip()
                        if decoded.startswith("data:"):
                            buffer += decoded[5:].strip()
                        elif decoded == "":
                            if buffer:
                                try:
                                    logger.info(f"Raw SSE event: {buffer}")
                                    event = json.loads(buffer)
                                    if event.get("isError", False):
                                        logger.error("MCP Server returned top-level error event.")
                                        break
                                    if event.get("done"):
                                        break
                                    results.extend(self._process_search_results(event))
                                except json.JSONDecodeError:
                                    logger.warning(f"JSON decode error: {buffer}")
                                buffer = ""
                    return results

SSEはUTF-8でデコードする仕様です。また、SSEは「data:」で始まって空行までが一塊です。従いまして、if 文で「data:」と空行を見つけています。SSEではdata:の後ろにペイロードがありますので、data:の5文字は削除してペイロード部分だけbufferに格納します。
ペイロードはJSON形式なのでeventにロードします。エラーが無いならば、_process_search_results(event) を呼びます。

    def _process_search_results(self, mcp_response: Dict) -> List[Dict]:
        if mcp_response.get("error"):
            logger.error(f"MCP Server returned error: {mcp_response['error']}")
            return []

        raw_results = mcp_response.get("result", {}).get("content", [])
        processed_results = []

mcp_responseにはJSON形式のeventが入っています。
mcp_responseの中のresultキーの中身を取り出し、さらにその中のcontentキーの中身を取り出してraw_resultsに格納します。

        for item in raw_results:
            if item.get("type") != "text":
                continue

raw_resultsのうちtextのものだけを以下の処理対象とします。

            try:
                parsed_items = json.loads(item.get("text", "[]"))
                for entry in parsed_items:
                    title = entry.get("title", "Microsoft Docs Search Result")
                    content = entry.get("content", "").strip()
                    url = entry.get("contentUrl", "")
                    urls = [url] if url else []

                    processed_results.append({
                        "title": title,
                        "content": content,
                        "url": url,
                        "all_urls": urls,
                        "source": "Microsoft Learn",
                        "relevance": 1.0
                    })

textの中身を取り出して title, content, url, all_urls, source, relevanceに分解して整理します。

def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:

起点となる関数です。eventは起動元から渡されるイベントデータで、contextは実行環境のメタ情報です。

        parameters = event.get("parameters", [])
        question = next((p["value"] for p in parameters if p["name"] == "query"), None)

イベント情報の中の parameters の中の query を取り出して question に格納します。

return asyncio.run(execute_search(question))

questionを引数に入れて execute_search関数を実行します。

async def execute_search(question: str) -> Dict[str, Any]:

関数 execute_searchを定義します。

    connector = MCPConnector(os.environ.get("MCP_ENDPOINT"))

環境変数MCP_ENDPOINTを使ってMCPサーバへのコネクタを生成します。

        results = await connector.search_docs(question)

search_docsメソッドを実行しresultsに結果を代入します。

            text_body = "<br><br>".join(
                f"{r['title']}<br>{r['content']}<br>URL: {r['url']}" for r in results
            )

検索結果 results の各要素から、タイトル・本文・URLを整形した文字列を作り、二重の改行(空行)でつないでひとつの大きなテキストにします。なぜならば、ActtionGroupのインターフェースでは一つの文字列を受け取る仕様なので、これに合わせる必要があります。ちなみに、JSON形式で返す方法もあるようです。

        return {
            "messageVersion": "1.0",
            "response": {
                "actionGroup": ACTION_GROUP_NAME,
                "function": FUNCTION_NAME,
                "functionResponse": {
                    "responseBody": {
                        "TEXT": {
                            "body": text_body
                        }
                    }
                }
            }
        }

適切なActionGroup名と関数名を入れて、MCPサーバからの応答を一つの文字列に整形されたtext_bodyを返します。

このように、ActionGroupから情報をもらってMCPサーバが解釈できるJSON-RPCパケットを作成してHTTPで送信し、受信したパケットをSSEのプロトコルに従って分解し、ActionGroupの応答のインターフェースに合うように整形してリターンする、というプログラムを作成しなければならず、難易度は高いかと思います。

3.2 必要なパッケージの準備

 Lambda関数を実行するために必要なパッケージは一部はAWSで用意されていません。そこで、AWS環境に足りないパッケージは自分で用意してAWS環境にアップロードします。

% mkdir lambda_package
% cd lambda_package
% pip install aiohttp -t .
% pip install aws_lambda_powertools -t .
% pip install aws_xray_sdk -t .

3.1で作った lambda_function.py もこのディレクトリにコピーします。例えば、lambda_function.pyが/path/to/というディレクトリの下に保存してある場合は以下のように実行します。

% cp /path/to/lambda_function.py .

上記ダウンロードしたパッケージとlambda_function.pyをまとめて一つのzipファイルにします。ここではlambda_package.zip という名前にします。

 % zip -r lambda_package.zip 

以上で、Lambda関数の作成とそれを実行するパッケージの準備が完了しました。次に、コンソールを使ってエージェントやActionGroupの設定を行います。

4. コンソールの設定

 3章でLambda関数とその関数を実行するために必要なパッケージが揃いましたので、次にBedrockのコンソールの画面でエージェントの設定を行います。

図2.png

コンソールのホームを開きます。BedrockをクリックしてBedrockを開きます。または検索画面で「bedrock」と入力してクリックします。

図3.png

「エージェント」をクリックします。

図4.png

エージェントを作成」をクリックします。

図5.png

ここは「agent-test3」という名前を付けることにします。「作成」をクリックします。

図6.png

エージェントの構築の画面に移動します。

鉛筆.png

スクロールして「モデルを選択」の画面を表示します。鉛筆マークをクリックします。

図8.png

「Claude 3.7 Sonnet」を選びます。ちなみに「Bedrock Agents Optimized」のチェックを外すとClaude 4.5なども選択できるようになります。

図9.png

Claude 3.7 Sonnetに変更できました。「エージェント向けの指示」には「あなたはマイクロソフト製品の専門家です。マイクロソフトのホームページを良く見て、日本語で詳しく回答してください。また、引用元も明記してください。回答は action_group_test3 で設定したlambda関数の実行結果を優先的に見て回答して下さい。」と書きます。Lambda関数を使う旨を必ず書いてください。

図10.png

ここで、「保存」しておきましょう。編集したらマメに保存した方が良いです。

図11.png

画面をスクロールして、「Memory」を表示します。「有効」をクリックしておきましょう。有効にすると、エージェントに対する過去の質問応答を保存しておき、その内容を考慮して回答を作成してくれます。

図12.png

上にスクロールして、「アクショングループ」を表示します。「追加」を押します。

図13.png

3章のLambda関数で書いたアクショングループ名を入力します。ここでは「action_group_test3」と入力します。

図14.png

スクロールして、「アクショングループ関数1」を表示します。名前を付けますが、ここでもLambda関数内で名付けた「func-test3」としました。

図15.png

スクロールして「パラメータ」を表示します。「パラメータを追加」を押します。

図16.png

「名前」は「query」、「説明」は適当な文章を入れてください。「説明」は空欄にしておくとエラーになりますので、何か書いてください。ここでは「write your query」と書きました。

図17.png

右の移動して、「必須」を「True」に変更してください。そして、「作成」をクリックします。

図18.png

スクロールして、「アクショングループ」を表示します。今作った「action_group_test3」をクリックします。

図19.png

スクロールして「アクショングループの呼び出し」を表示します。Lambda関数を編集したいので、「表示」を押します。

図20.png

Lambda関数を編集する画面が新たに立ちがります。

図21.png

スクロールして「ランタイム設定」を表示します。「ハンドラ」が dummy_lambdaになっているので「編集」を押して変更します。

図22.png

「ハンドラ」が dummy_handler になっているので、「lambda_function」に変更します。「保存」をクリックします。

図23.png

「コード」を表示します。ここに lambda_function.py を含むコードを入力したいです。「アップロード元」をクリックして「zipファイル」をクリックします。

図24.png

「アップロード」をクリックします。

図25.png

3章で作っておいた lambda_package.zip を選びます。

図26.png

「保存」をクリックします。

図27.png

lambda_function.pyやパッケージ など、必要なファイルがアップロードされます。

図28.png

「設定」タブをクリックします。次に「環境変数」をクリックします。

図29.png

「編集」をクリックします。

図30.png

「環境変数の追加」をクリックします。

図31.png

「キー」に「MCP_ENDPOINT」、「値」に (https://learn.microsoft.com/api/mcp) を入力したら「保存」をクリックします。

図32.png

「テスト」タブをクリックします。「イベント名」に名前を入れます。ここでは「event-test3」としました。

図33.png

スクロールして、「イベントJSON」を表示します。全て削除します。

図34.png

ここにテスト用の送信パケットを書き込みます。次の内容をコピペしてください。

{
  "type": "CallToolRequest",
  "requestId": "m365-docs-search-001",
  "toolId": "microsoft_docs_search",
  "actionGroup": "docs_search",
  "apiPath": "/search",
  "httpMethod": "POST",
  "parameters": [
    { "name": "query", "value": "Intune のライセンスをデバイスに割り当てる手順は?URLも教えて。" },
    { "name": "category", "value": "Microsoft 365" },
    { "name": "limit", "value": "5" }
  ]
}

この例では、「Intune のライセンスをデバイスに割り当てる手順は?URLも教えて。」という質問がMCPサーバへ送られてMCPサーバが関連する情報をマイクロソフト公式ページから探し出して応答を返してきます。

図35.png

上にスクロールして「保存」を押します。

図36.png

「正常に保存されました」と表示されたことを確認します。次に「テスト」を押します。

図37.png

「実行関数:成功」と表示されれば、関数は正常に実行されました。「詳細」をクリックします。

図38.png

MCPサーバから返された応答の内容が表示されます。これで、Lambda関数は単体で正常動作できるようになりました。次にエージェントが Lambda関数を起動して、MCPサーバからの応答内容を使って回答を作ることを確認します。

図39.png

アクショングループの編集画面に戻ってください。スクロールして一番下へ移動して、「保存して終了」を押します。

図40.png

エージェントの編集画面に戻るので、「保存して終了」を押します。

図41.png

右側の「テスト」の画面で「準備」がアクティベートされます。「準備」を押します。

図42.png

「正常に準備されました」と表示されたらテストの準備完了です。

図43.png

クエリの入力画面に質問を入力します。ここでは例として「Intune のライセンスをデバイスに割り当てる手順は?URLも教えて。」と入力し「実行」を押します。

図44.png

エージェントが回答を作成してくれました。この回答をスクロールしてみてください。

図45.png

「参考URL」を見ると、https://learn.microsoft.com から情報を取ってきていることが分かります。これにて、エージェントでMCPサーバからマイクロソフトの公式情報をもらって回答を作成する設定が完了しました。

5. まとめ

 BedrockのエージェントでMCPサーバと接続する方法を解説しました。AWS Lambda上で動作するMCPクライアントを作りました、つまり、MCPプロトコル(JSON-RPC+SSE)を使ってMCPサーバと通信し、検索結果をBedrockエージェントに返す処理を実装しました。これは、BedrockのエージェントはMCPサーバと直接やり取りする機能を有してないため、ActionGroupの中のLambda関数をpythonで作成しなければならないのです。なかなか大変だと思いました。ちなみに別記事 でLangChainを使ったヘルプデスクの解説も書きましたが、LangChainのエージェントはMCPサーバと直接通信する機能を有していますので、MCPプロトコルの処理を実装する必要はありません。ぜひ、こちらもご覧ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?