6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCP勉強中!作りながら感じたその将来性!!

Last updated at Posted at 2025-12-14

はじめに

MCPに関して、[小ネタ] MCPチュートリアルやってみた#1で投稿してから、Qiita投稿はしていませんでしたが、ちょくちょく勉強は進めています。今時点で、感じたり学んだ事をまとめてみる記事です。
表題の通り勉強中の身です。間違ってる点などありましたら指摘いただけるとありがたいです。

やった事

前説明が長くなっちゃったのでメイン部分を先に。勉強するにあたって自分がやった事です。

MCPチュートリアル

チュートリアルがあります。簡単なサンプルを自分で試す事で、MCPに基づく処理の流れを実感できます。

MCPサーバー

チュートリアルでは天気予報を行うMCPサーバーをサンプル実装しました。

MCPクライアント(&MCPホスト)

チュートリアルでは、前述チュートリアルで作った天気予報MCPサーバーを使って、簡単なAIアプリをサンプル構築しました。AIアプリの処理フローを実感できました。

SDKを使って音声メモMCPサーバー実装

AIに題材を相談したところ、文字起こしサーバーを提案されました。一旦その前段階の音声メモMCPサーバーを作ることにしました。その際にAIがサンプルソースも作ってくれました。実装とは言っても、AIが作ってくれたソースを改造したぐらいです。改造するにあたっては、MCPの処理を把握する必要がありました。実際のソースは後述します。
前述チュートリアルではfilesystemという既存のローカルMCPサーバーを題材にしているのですが、そのMCPサーバーとの連携も試してみました。

構成図
whole_flow_00.png

Claudeがサンプルソース作ってくれた時の画面
image.png

感じた事

全体的に

今までだったらプログラムでロジックを組んで処理する内容を、自然言語で処理できる世の中になっている事に驚きです。AIモデルの気分によって処理がぶれる事もありますが、プログラムだとどうあがいてもプログラム以上の事は出来ないです。MCPサーバー(AIエージェント)を追加すればやれる事が増えていくと思います1

音声メモMCPサーバーを実装した際、別のMCPサーバーとの連携も試しました。そして、複数のMCPサーバー連携もAIモデルがやってくれます。

棲み分け

上で述べたAIアプリの処理のブレと関係しますが、決まった処理を求めるなら、AIモデルで行うのではなく、MCPサーバー内で行った方が確実だし効率的なはずです。prompt機能を使ってある程度の処理フロー定義も出来るかとは思います。
すべてAIモデルでやるのではなく、AIモデルに任せた方がいい処理、MCPサーバー(AIエージェント)に任せた方がいい処理をイメージするのが重要だと思います。

将来の開発者

おそらく、AIアプリを作る人と、そこで活用するMCPサーバーを作る人と分業が進んでいくと想像しています。とはいえ、それぞれが他方を理解していないとうまく活用できないだろうとも思います2。MCPが出来るまでは各社が独自仕様で提供していたものが、MCPが出来た事で整理された状況と思います。今後も引き続きMCPの事を勉強していこうと思います。

基本知識

公式ページ
What is the Model Context Protocol (MCP)?

MCPの構成要素

構成例のイメージ
mcp_role_whole_04_integrated.png

AIモデル

ChatGPT(アプリではなくモデルとして)、Sonnetなど、AIアプリの頭脳です。

MCPホスト

ChatGPTやClaudeなどのAIアプリです。後述MCPサーバーとAIモデルのコーディネーターです。

MCPサーバー

AIモデルと組み合わせる事で、AIアプリでできる事が広がります。以下3つの機能群があり、一つのMCPサーバーですべて保有する事も、単一の機能だけ保有する事もあります。

tool

天気予報などリアルタイム情報の提供、芸能人情報詳細などAIモデルが把握できてない事の対応をしたり、決まった処理を実行します。※MCP発足以前のTool Use技術がベース。

resources

AIモデルが処理する情報の提供元となります。RAGの情報元にもなるようです。

claude_rag_00.png

prompt

ユーザーが複雑なプロンプトを入力しないで済むように、MCPサーバー自身が想定しているプロンプトを提供する仕組みです。引数によって動的に変える事も可能です。
image.png

MCPクライアント

MCPホスト内で、MCPサーバーとやり取りする部分です。

SDK

公式ページ
SDKs - Model Context Protocol

MCPのpython SDK
The official Python SDK for Model Context Protocol servers and clients

MCPは規格化されているので、頑張れば各種処理を自分で実装する事も可能ですが、SDKを使ってコア部分に集中するのが基本だと思います。

MCP Inspector

ブラウザ上のテストツールです。SDKを使って開発者モードで起動すると開きます。
GitHub - modelcontextprotocol/inspector: Visual testing tool for MCP servers

宣伝

発信プラットフォームとしてはQiitaの方がよいと思いますが(※現時点の閲覧数より)、動画作成にも興味がわいている今日この頃で、ゆっくり解説系のYouTube動画も作成したりしています。よかったら見てやってください。まだまだ動画としての質は低いですが(特に初期)、内容の質も動画の質も改善していくつもりです。

MCPって何?
MCPの構成を勉強するプレイリストです。#1~#6の6本の動画を上げています。

MCPサーバー作る!!
前述「MCPって何?」で勉強した事をもとに、MCPサーバーを作りながら詳細を勉強するプレイリストです。2025年12月現在進行中です。現時点で#7,#8の2本の動画を上げています。今回の記事で述べている音声メモMCPサーバーは#8の動画で説明しています。

ソース

チュートリアルのソースは公式ページにあるので、音声メモMCPサーバーで使ったソースを紹介します。とはいえClaudeデスクトップが作ってくれたものが基本です。

MCPサーバー起動部分

AIが作ってくれたドキュメント内に、メイン処理部分をMCPサーバーとして動作させるソースがありました。抜き出してpythonソースファイルとしています。Low-Level Serverと呼ばれる形式でした。

改造前
import json
import asyncio
import mcp.server.stdio
import mcp.types as types
from typing import Any
from mcp.server.lowlevel import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from transcription_mcp_server import TranscriptionMCPServer

# サーバーインスタンス作成
server = Server("transcription-server")

# ツール定義
@server.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="record_audio",
            description="マイクから音声を録音します",
            inputSchema={
                "type": "object",
                "properties": {
                    "duration": {
                        "type": "number",
                        "description": "録音時間(秒)",
                        "default": 5
                    },
                    "filename": {
                        "type": "string",
                        "description": "保存ファイル名(省略可)"
                    }
                }
            }
        ),
        types.Tool(
            name="transcribe",
            description="音声ファイルを文字起こしします",
            inputSchema={
                "type": "object",
                "properties": {
                    "filepath": {
                        "type": "string",
                        "description": "音声ファイルのパス(省略時は最後の録音)"
                    },
                    "language": {
                        "type": "string",
                        "description": "言語コード(ja, en等)",
                        "default": "ja"
                    }
                }
            }
        ),
        types.Tool(
            name="list_recordings",
            description="保存された録音ファイル一覧を取得します",
            inputSchema={
                "type": "object",
                "properties": {}
            }
        )
    ]

# ツール実行
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> dict[str, Any]:
    server = TranscriptionMCPServer()
    result = await server.handle_request(name, arguments)
    return [types.TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]

async def run():
    """Run the structured output server."""
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="structured-output-example",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )

if __name__ == "__main__":
    asyncio.run(run())

Low-Level Serverと呼ばれる形式だと、MCP Inspectorが使えなさそうだったので、FastMCPというSDKが提供してくれているモジュールの形式に書き換えました。

改造後
"""
FastMCP quickstart example.

cd to the `examples/snippets/clients` directory and run:
    uv run server fastmcp_quickstart stdio
"""

from mcp.server.fastmcp import FastMCP
from transcription_mcp_server import TranscriptionMCPServer

# Create an MCP server
mcp = FastMCP("Sound Recording Trial")

# Add an addition tool
@mcp.tool()
def record_audio(duration: int, file_path: str) -> dict:
    """マイクから音声を録音します

    Args:
        duration : 録音時間(秒)
        file_path : 録音ファイルの保存パス
    Returns:
        dict: 録音結果情報ディクショナリ
            status: "success" or "error"
            filepath: 録音ファイルのパス
            duration: 録音時間(秒)
            message: 詳細メッセージ
    """
    server = TranscriptionMCPServer()
    return server.record_audio(duration, file_path)

@mcp.tool()
def list_recordings() -> dict:
    """保存された録音ファイル一覧を取得します
    Returns:
        dict: 録音ファイル一覧情報ディクショナリ
            status: "success" or "error"
            count: 録音ファイル数
            recordings: 録音ファイル情報リスト
                filename: 録音ファイル名
                filepath: 録音ファイルのパス
                size_mb: ファイルサイズ(MB)
                modified: 録音日時(UNIXタイムスタンプ)
    """
    server = TranscriptionMCPServer()
    return server.list_recordings()

def main():
    # Initialize and run the server
    mcp.run(transport='stdio')

if __name__ == "__main__":
    main()

メインクラス

録音処理を行う本体部分です。もともと固定フォルダへの保存だったのを、絶対パスで保存できるようにしました。それ以外はAIが作ってくれたままです。文字起こし部分もありますが、今の時点では使ってないです。

処理本体
#!/usr/bin/env python3
"""
音声文字起こしMCPサーバー
マイクから音声を録音し、Whisperで文字起こしを行います
"""

import asyncio
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Any

import pyaudio
import wave
import whisper

# MCPサーバーの基本実装
class TranscriptionMCPServer:
    def __init__(self):
        self.recordings_dir = Path("recordings")
        self.recordings_dir.mkdir(exist_ok=True)
        self.model = None
        self.last_recording = None
        
    async def load_model(self, model_size: str = "base"):
        """Whisperモデルをロード"""
        if self.model is None:
            self.model = whisper.load_model(model_size)
        return f"Model '{model_size}' loaded successfully"
    
    def record_audio(self, duration: int = 5, filename: str = None) -> dict:
        """
        マイクから音声を録音
        
        Args:
            duration: 録音時間(秒)
            filename: 保存ファイル名(省略時は自動生成)
        """
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"recording_{timestamp}.wav"

        # 絶対パスに対応するために改造した部分
        if os.path.split(filename)[-1] != filename:
            filepath = filename
        else:
            filepath = self.recordings_dir / filename

        # 録音設定
        CHUNK = 1024
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 16000
        
        p = pyaudio.PyAudio()
        
        try:
            stream = p.open(
                format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK
            )
            
            print(f"Recording for {duration} seconds...")
            frames = []
            
            for i in range(0, int(RATE / CHUNK * duration)):
                data = stream.read(CHUNK)
                frames.append(data)
            
            print("Recording finished")
            
            stream.stop_stream()
            stream.close()
            
            # WAVファイルとして保存
            wf = wave.open(str(filepath), 'wb')
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(p.get_sample_size(FORMAT))
            wf.setframerate(RATE)
            wf.writeframes(b''.join(frames))
            wf.close()
            
            self.last_recording = str(filepath)
            
            return {
                "status": "success",
                "filepath": str(filepath),
                "duration": duration,
                "message": f"Recorded {duration} seconds to {filename}"
            }
            
        finally:
            p.terminate()
    
    async def transcribe(self, filepath: str = None, language: str = "ja") -> dict:
        """
        音声ファイルを文字起こし
        
        Args:
            filepath: 音声ファイルのパス(省略時は最後の録音)
            language: 言語コード(ja, en, etc.)
        """
        if self.model is None:
            await self.load_model()
        
        if filepath is None:
            filepath = self.last_recording
        
        if filepath is None:
            return {
                "status": "error",
                "message": "No recording found. Please record audio first."
            }
        
        if not os.path.exists(filepath):
            return {
                "status": "error",
                "message": f"File not found: {filepath}"
            }
        
        print(f"Transcribing {filepath}...")
        result = self.model.transcribe(filepath, language=language)
        
        return {
            "status": "success",
            "filepath": filepath,
            "text": result["text"],
            "language": result.get("language", language),
            "segments": [
                {
                    "start": seg["start"],
                    "end": seg["end"],
                    "text": seg["text"]
                }
                for seg in result["segments"]
            ]
        }
    
    def list_recordings(self) -> dict:
        """保存された録音ファイル一覧を取得"""
        recordings = list(self.recordings_dir.glob("*.wav"))
        return {
            "status": "success",
            "count": len(recordings),
            "recordings": [
                {
                    "filename": r.name,
                    "filepath": str(r),
                    "size_mb": r.stat().st_size / (1024 * 1024),
                    "modified": datetime.fromtimestamp(r.stat().st_mtime).isoformat()
                }
                for r in sorted(recordings, key=lambda x: x.stat().st_mtime, reverse=True)
            ]
        }
    
    async def handle_request(self, method: str, params: dict) -> dict:
        """リクエストハンドラー"""
        if method == "record_audio":
            duration = params.get("duration", 5)
            filename = params.get("filename")
            return self.record_audio(duration, filename)
        
        elif method == "transcribe":
            filepath = params.get("filepath")
            language = params.get("language", "ja")
            return await self.transcribe(filepath, language)
        
        elif method == "list_recordings":
            return self.list_recordings()
        
        elif method == "load_model":
            model_size = params.get("model_size", "base")
            result = await self.load_model(model_size)
            return {"status": "success", "message": result}
        
        else:
            return {
                "status": "error",
                "message": f"Unknown method: {method}"
            }


async def main():
    """MCPサーバーのメインループ(簡易版)"""
    server = TranscriptionMCPServer()
    
    print("Transcription MCP Server Started")
    print("Available methods:")
    print("  - record_audio(duration=5, filename=None)")
    print("  - transcribe(filepath=None, language='ja')")
    print("  - list_recordings()")
    print("  - load_model(model_size='base')")
    print()
    
    # テスト実行例
    print("=== Test: Recording 5 seconds ===")
    result = server.record_audio(duration=5)
    print(json.dumps(result, indent=2, ensure_ascii=False))
    
    print("\n=== Test: Transcribing ===")
    # result = await server.transcribe(language="ja")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    
    print("\n=== Test: List recordings ===")
    result = server.list_recordings()
    print(json.dumps(result, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    asyncio.run(main())

  1. MCPサーバーを入れすぎるとトークンが増えたり、どのMCPサーバーを使うかの判断のブレなどが出ると思うので必要なものだけにした方がいいとは思います。

  2. 音声メモMCPサーバーを改造した時、MCPの処理フローを把握していた事が役立ちました。

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?