1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OpenAI Agents SDK と MCP で Elasticsearch を自然言語操作してみた

Posted at

はじめに

OpenAI Agents SDK は、いわゆる「AI エージェント」を作るための SDK です。この記事では、OpenAI Agents SDK と MCP (Message Communication Protocol) を利用して、自然言語で Elasticsearch を操作するエージェントを作成し、その動作を確認してみます。

OpenAI Agents SDK の紹介

OpenAI Agents SDK は、軽量で使いやすく、抽象化を最小限に抑えたエージェントベースの AI アプリケーションを構築できる SDK です。エージェント (LLM + 指示 + ツール)、ハンドオフ (委任)、ガードレール (入出力の検証) といった基本的なコンポーネントを組み合わせることで、複雑なタスクを実行するエージェントを構築できます。

MCP (Message Communication Protocol) とは

MCP は、OpenAI Agents SDK が外部ツールやプロセスと通信するためのプロトコルです。エージェントは MCP Server として動作する外部プロセスとメッセージを交換し、関数の実行や情報の取得を行います。今回は、Elasticsearch を操作するための MCP Server を利用します。

背景と目的

データベースの操作は、通常、特定のクエリ言語 (SQL など) や API を使用する必要があります。しかし、「このデータを検索して」「このテーブルを削除して」といった自然言語で操作することはできません。LLM を介してデータベースを操作できれば、より直感的で効率的な開発や運用が可能になるかもしれません。

そこで今回は、OpenAI Agents SDK と、Elasticsearch 操作用の MCP Server を組み合わせて、自然言語で Elasticsearch を操作してみます。Docker で Elasticsearch 環境を構築し、Python コードからエージェントを介してインデックス(RDBMSでいうところテーブルに相当)の作成、削除、データ投入、検索などを行ってみます。

方法

Docker で Elasticsearch と Kibana を起動し、Python コードから OpenAI Agents SDK を使って Elasticsearch を操作します。エージェントと Elasticsearch の連携には、既存の Elasticsearch 用 MCP Server を利用します。

今回利用した Elasticsearch MCP Server は公式のリポジトリをエンハンスしたものです。

環境構築

Docker Compose で Elasticsearch と Kibana を起動

まず、Elasticsearch (v8.17.4) と Kibana を Docker Compose で起動します。

Kibana は Elasticsearch を管理するためのフロントエンドとして利用します。

docker-compose.yaml
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.17.4
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
      # HTTPS と認証を有効化 (デフォルト)
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true
      - xpack.security.transport.ssl.enabled=true
    volumes:
      - esdata:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
    networks:
      - elastic

  kibana:
    image: docker.elastic.co/kibana/kibana:8.17.4
    environment:
      # Elasticsearch の URL を指定 (HTTPS)
      ELASTICSEARCH_URL: https://elasticsearch:9200
      # Elasticsearch のホスト名を指定
      ELASTICSEARCH_HOSTS: '["https://elasticsearch:9200"]'
      # Elasticsearch の証明書を検証しない (コンテナ間の自己署名証明書のため)
      # 本番環境では適切な証明書設定が必要です
      ELASTICSEARCH_SSL_VERIFICATIONMODE: none
    ports:
      - "5601:5601"
    networks:
      - elastic
    depends_on:
      - elasticsearch

networks:
  elastic:
    driver: bridge

volumes:
  esdata:
    driver: local

以下のコマンドでコンテナを起動します。

docker compose up -d

Elasticsearch 初期設定

Elasticsearch との HTTPS 通信と認証に必要な設定を行います。

# Elasticsearch コンテナから CA 証明書をコピー
mkdir -p certs
docker compose cp elasticsearch:/usr/share/elasticsearch/config/certs/http_ca.crt certs/

# 'elastic' ユーザーのパスワードをリセット (対話的に新しいパスワードを設定)
# このパスワードは後で .env ファイルに設定します
docker compose exec -it elasticsearch /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic

# Kibana が Elasticsearch に接続するための登録トークンを作成
docker compose exec -it elasticsearch /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana

Kibana ログイン

ブラウザで http://localhost:5601/ にアクセスします。

表示された画面に、先ほど elasticsearch-create-enrollment-token コマンドで生成されたトークンを入力します。

次に、Elasticsearch と Kibana を接続するための認証コードの入力を求められます。以下のコマンドを実行して認証コードを取得し、ブラウザに入力します。

# Kibana コンテナで認証コードを取得
docker compose exec kibana /usr/share/kibana/bin/kibana-verification-code

表示例:

Kibana is currently running with legacy OpenSSL providers enabled! For details and instructions on how to disable see https://www.elastic.co/guide/en/kibana/8.17/production.html#openssl-legacy-provider
Your verification code is:  015 454

最後に、elastic ユーザーと、ステップ 2 で設定したパスワードで Kibana にログインします。

環境変数ファイル (.env) の作成

Python コードから Elasticsearch に接続するための情報を .env ファイルに記述します。
コード スニペット

.env
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=<ステップ2で設定したパスワード>
ELASTICSEARCH_CA_CERT=certs/http_ca.crt
ELASTICSEARCH_URL=https://localhost:9200

必要な Python パッケージのインストール

OpenAI Agents SDK をインストールします。

pip install openai-agents

Elasticsearch 用 MCP Server の準備

以下のコマンドでリポジトリをクローンします。(Python コード src/mcp_elasticsearch.py に対して ../elasticsearch-mcp-server/ にクローンします)

git clone https://github.com/cr7258/elasticsearch-mcp-server.git ../elasticsearch-mcp-server

この MCP Server は、エージェントからの指示 (例: "インデックス一覧を見せて") を受け取り、それを Elasticsearch の API コールに変換して実行し、結果をエージェントに返します。

コード解説

Elasticsearch を操作するエージェントを実行する Python コードです。

src/mcp_elasticsearch.py
import asyncio
import os
import shutil

from agents import Agent, Runner, gen_trace_id, trace
from agents.mcp import MCPServer, MCPServerStdio
from dotenv import load_dotenv

load_dotenv()


async def run(mcp_server: MCPServer):
    """エージェントの定義と実行"""
    agent = Agent(
        name="アシスタント",
        # エージェントへの指示 (自然言語)
        instructions="ユーザーのリクエストにしたがってElasticsearch を操作して",
        # 連携する MCP Server を指定
        mcp_servers=[mcp_server],
    )

    # 最初の命令
    message = "インデックス一覧を見せて"
    print(f"命令: {message}")
    result = await Runner.run(starting_agent=agent, input=message)
    print(result.final_output)

    # 対話ループ
    while True:
        message = input("Elasticsearch で何がしたいですか? > ")
        if message.lower() in ["exit", "quit"]:
            print("終了します。")
            break
        print(f"命令: {message}")
        result = await Runner.run(starting_agent=agent, input=message)
        print(result.final_output)


async def main():
    """メイン処理"""

    # MCPServerStdio を使って Elasticsearch MCP Server をサブプロセスとして起動・連携
    async with MCPServerStdio(
        name="MCP Elasticsearch",
        params={
            "command": "uv", # MCP Server の実行コマンド (ここでは uv を使用)
            "args": [
                "--directory",
                "../elasticsearch-mcp-server", # MCP Server のディレクトリ
                "run",
                "elasticsearch-mcp-server", # MCP Server の実行ファイル/モジュール名
            ],
            # MCP Server に渡す環境変数 ( .env から読み込んだ Elasticsearch 接続情報)
            "env": {
                "ELASTICSEARCH_HOST": os.getenv("ELASTICSEARCH_URL"),
                "ELASTICSEARCH_USERNAME": os.getenv("ELASTICSEARCH_USERNAME"),
                "ELASTICSEARCH_PASSWORD": os.getenv("ELASTICSEARCH_PASSWORD"),
                "ELASTICSEARCH_CA_CERT": os.getenv("ELASTICSEARCH_CA_CERT"),
            },
        },
    ) as server:
        trace_id = gen_trace_id()
        # OpenAI Platform でトレースを確認するための設定
        with trace(workflow_name="MCP Elasticsearch", trace_id=trace_id):
            print(
                "トレース情報: https://platform.openai.com/traces/trace"
                f"?trace_id={trace_id}\n"
            )
            # エージェント実行ループを開始
            await run(server)


if __name__ == "__main__":
    # MCP Server 実行に必要な npx コマンドが存在するかチェック
    # elasticsearch-mcp-server が内部で npx を使う場合があるため
    if not shutil.which("npx"):
        raise RuntimeError(
            "npx がインストールされていません。`npm install -g npx` "
            "コマンド等でインストールして下さい。"
        )

    asyncio.run(main())

コードのポイント

  • Agent 定義
    • name: エージェントの名前を設定します
    • instructions
      • エージェントの基本的な役割や振る舞いを自然言語で指示します
      • 「ユーザーのリクエストにしたがってElasticsearch を操作して」としています
    • mcp_servers: 連携する MCP Server を指定します
  • Runner.run
    • 定義したエージェントとユーザーからの入力 (input) を渡して、エージェントを実行します。
    • エージェントは instructions とユーザー入力に基づき、必要であれば mcp_servers で指定された MCP Server と通信してタスクを実行します。
  • MCPServerStdio
    • 外部プロセスとして MCP Server を起動し、標準入出力を介してエージェントと通信するためのクラスです。
    • params で MCP Server の実行コマンド、引数、環境変数を指定します。これにより、Python コードから独立して MCP Server を管理できます。
    • async with を使うことで、スクリプト終了時に MCP Server プロセスも自動的に終了させることができます。
      トレース:
    • gen_trace_id と trace を使うことで、エージェントの実行過程を OpenAI Platform の Trace 機能で可視化・デバッグできます。

実行方法

以下のコマンドで Python コードを実行します。

python src/mcp_elasticsearch.py

結果

スクリプトを実行すると、MCP Server が初期化され、トレース用の URL が表示された後、最初の命令「インデックス一覧を見せて」が実行されます。その後、対話的に Elasticsearch への指示を入力できます。

  1. 起動とインデックス一覧表示
# MCP Server の初期化ログ (一部抜粋)
[04/12/25 00:03:16] INFO     Initializing elasticsearch_mcp_server...       server.py:20
                     INFO     Elasticsearch client initialized with hosts:       base.py:44
                               ['https://localhost:9200']                             
                     INFO     Initialized the elasticsearch client           client.py:25
                     INFO     Registering tools from IndexTools              register.py:33
                     INFO     Registering tools from DocumentTools           register.py:33
                     INFO     Registering tools from ClusterTools            register.py:33
                     INFO     Registering tools from AliasTools              register.py:33

# トレース情報
トレース情報: https://platform.openai.com/traces/trace?trace_id=trace_016c6cb9f76649bca3e49e1ce8c6a85b

# 最初の命令
命令: インデックス一覧を見せて
                     INFO     Processing request of type ListToolsRequest      server.py:534
[04/12/25 00:03:17] INFO     Processing request of type CallToolRequest       server.py:534
[04/12/25 00:03:18] INFO     GET https://localhost:9200/_cat/indices      _transport.py:349
                               [status:200 duration:0.029s]                             
# エージェントの応答
現在のインデックス一覧は以下の通りです:

1. .internal.alerts-transform.health.alerts-default-000001
2. .internal.alerts-observability.logs.alerts-default-000001
3. .internal.alerts-observability.uptime.alerts-default-000001
# ... (中略) ...
6. **sample_index** # <-- 事前に作成していたサンプルインデックス
# ... (中略) ...
13. .internal.alerts-stack.alerts-default-000001

必要な情報があればお知らせください。
Elasticsearch で何がしたいですか? > 

エージェントが MCP Server を介して Elasticsearch の GET /_cat/indices API を呼び出し、結果を整形して表示してくれました:sparkles:

  1. インデックスの削除

事前に作成していた sample_index を削除してみます。

Elasticsearch で何がしたいですか? >  sample_index を削除して
命令: sample_index を削除して
[04/12/25 00:03:44] INFO     Processing request of type ListToolsRequest      server.py:534
[04/12/25 00:03:45] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     DELETE https://localhost:9200/sample_index   _transport.py:349
                               [status:200 duration:0.093s]                             
`sample_index`が正常に削除されました。
Elasticsearch で何がしたいですか? > 

エージェントは指示を理解し、DELETE /sample_index を実行しました。

  1. 再度インデックス一覧表示 (削除確認)

削除されたか確認します。

Elasticsearch で何がしたいですか? > ユーザーインデックス一覧みせて
命令: ユーザーインデックス一覧みせて
[04/12/25 00:03:56] INFO     Processing request of type ListToolsRequest      server.py:534
[04/12/25 00:03:57] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     GET https://localhost:9200/_cat/indices      _transport.py:349
                               [status:200 duration:0.027s]                             
現在のインデックスの一覧は以下の通りです:

1. `.internal.alerts-transform.health.alerts-default-000001`
# ... (中略) ...
12. `.internal.alerts-stack.alerts-default-000001`

何か他にお手伝いできることはありますか?
Elasticsearch で何がしたいですか? > 

sample_index がリストから消えていることが確認できました。

  1. 新規インデックス作成とデータ投入

新しいインデックス books を作成し、スキーマを指定してサンプルデータを投入させます。ここではベクトル検索用のフィールド (dense_vector) も含めてみます。

Elasticsearch で何がしたいですか? > 新しいインデックス「books」を作って。適当なサンプルデータを20個入れて。フィールドは、id:int, name:str, pages:int, densvector[dim:2,cosine]
命令: 新しいインデックス「books」を作って。適当なサンプルデータを20個入れて。フィールドは、id:int, name:str, pages:int, densvector[dim:2,cosine]
[04/12/25 00:05:36] INFO     Processing request of type ListToolsRequest      server.py:534
[04/12/25 00:05:37] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     PUT https://localhost:9200/books [status:200 _transport.py:349
                               duration:0.286s]                                       
[04/12/25 00:05:44] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     POST https://localhost:9200/books/_doc       _transport.py:349
                               [status:201 duration:0.087s]                             
                     INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     POST https://localhost:9200/books/_doc       _transport.py:349
                               [status:201 duration:0.006s]                             
# ... (データ投入のログが続く) ...
[04/12/25 00:05:45] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     POST https://localhost:9200/books/_doc       _transport.py:349
                               [status:201 duration:0.006s]                             
インデックス「books」を作成し、20個のサンプルデータを追加しました。必要に応じて他の操作がありますか?
Elasticsearch で何がしたいですか? > 

エージェントはまず PUT /books でインデックスを作成し、その後 POST /books/_doc を20回実行してデータを投入しました。フィールド定義 (id:int, name:str, pages:int, densvector[dim:2,cosine]) も解釈して、適切なマッピングでインデックスを作成し、ダミーデータを生成・投入してくれたようです。すごい!!

実際にどのようなマッピングが作成され、どのようなデータが投入されたかは Kibana Dev Tools や API で別途確認する必要があります。エージェントが生成したデータは必ずしも期待通りとは限りません。
実際に、勝手に何かを上書きしたり思わぬ挙動も見られました。

  1. データ表示

投入されたデータの一部を表示させます。

Elasticsearch で何がしたいですか? > books の中身を見せて
命令: books の中身を見せて
[04/12/25 00:05:55] INFO     Processing request of type ListToolsRequest      server.py:534
[04/12/25 00:05:56] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     GET https://localhost:9200/_cat/indices      _transport.py:349
                               [status:200 duration:0.023s]                             
[04/12/25 00:05:58] INFO     Processing request of type CallToolRequest       server.py:534
                     INFO     POST https://localhost:9200/books/_search    _transport.py:349
                               [status:200 duration:0.008s]                             
Here are some books from the "books" index:

1. **Book 1**
   - Pages: 234
   - Density Vector: [0.1, 0.2]

2. **Book 2**
   - Pages: 120
   - Density Vector: [0.3, 0.5]

# ... (10件まで表示) ...

10. **Book 10**
    - Pages: 220
    - Density Vector: [0.9, 0.4]

If you need more details about any specific book, feel free to ask!
Elasticsearch で何がしたいですか? > 

エージェントは POST /books/_search を実行し、取得したデータの中から一部を整形して表示しました。

考察

  • 自然言語による DB 操作の実現
    • OpenAI Agents SDK と MCP を組み合わせることで、自然言語による Elasticsearch の基本的な CRUD 操作 (インデックス一覧表示、削除、作成、データ投入、検索) が可能であることが確認できました。
    • 特に、スキーマを指定してインデックスを作成し、ベクトルデータを含むサンプルデータを投入できた点は興味深いです。
    • 定義したエージェントは直近の命令のみに反応し過去の経緯を伝えていません。命令する側が一貫性に責任を持つ必要がありますが、それなりにデータベースを操作できています。
  • MCP の役割と柔軟性
    • MCP Server (今回は elasticsearch-mcp-server) がエージェントと Elasticsearch の間の翻訳者として機能し、エージェントが Elasticsearch の API を直接知らなくてもデータベースを操作できました。
    • MCPServerStdio を使うことで、Python コードから外部の MCP Server プロセスを簡単に起動・連携できる点も便利でした。
  • Agents SDK のポテンシャル
    • エージェントへの指示 (instructions) を工夫したり、より高機能な MCP Server を用意したりすることで、さらに複雑な Elasticsearch 操作 (集計、高度な検索クエリ、クラスタ管理など) も実現できる可能性があります。
    • 他の MCP と組み合わせて利用するのも可能性が広がりそうです。
  • 実用上の課題
    • 精度と安定性: 自然言語の解釈には曖昧さが伴うため、特に複雑な指示や意図しない指示に対して、エージェントが常に正確に動作するとは限りません。エラーハンドリングや、実行前の確認ステップなどを組み込む必要があるでしょう。Human-in-the-Loop 的に人間の確認を入れたくなることも多いと思います。
    • セキュリティ: 自然言語でデータベースを操作できるようにすることは、セキュリティリスクも伴います。意図しないデータの削除や更新を防ぐためのガードレール機能や権限管理が重要になります。
    • コスト: エージェントの実行 (LLM の呼び出し) にはコストがかかります。頻繁な操作や大量のデータ処理には注意が必要です。とくに LLM にデータを見せる場合は、データの分量に依存してコストが発生します。
    • 複雑な操作: 今回は基本的な操作のみでしたが、ネストされたクエリや複雑な集計など、高度な操作を自然言語で正確に指示するのは難しい場合があります。

まとめ

この記事では、OpenAI Agents SDK と MCP Server を利用して、自然言語で Elasticsearch を操作するエージェントを作成し、その動作を確認しました。インデックスの作成・削除、データ投入、検索といった基本的な操作が自然言語の指示だけで実行できました。

MCP を介して外部ツールと連携する仕組みは強力であり、Elasticsearch 以外のデータベースや API を操作するエージェントも同様に構築できる可能性があります。自然言語インターフェースは、データベース操作の敷居を下げ、開発や運用の効率化に貢献する可能性を秘めていますが、実用化に向けては精度、安定性、セキュリティなどの課題を考慮する必要があります。

今後、より複雑な操作への対応や、エラーハンドリング、他のツールとの連携などを試していきたいと思います。

最後までお読み頂きありがとうございました。

参照

OpenAI Agents SDK Documentation (日本語):

Elasticsearch Documentation:

今回利用した MCP Server:

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?