⚠️ 使用上の注意(必ずお読みください)
この記事で紹介するコードは、**「AIにローカルPCの操作権限を与える」**という非常に強力なものです。以下の内容を理解した上で、個人の学習目的にのみ利用してください。
-
自己責任で使用してください
- 本プログラムを使用して発生した、いかなる損害(データの消失、PCの故障、セキュリティトラブル等)についても、筆者は一切の責任を負いません。
-
実行内容をAI任せにしない
- AI(Gemini)は稀に誤ったコマンドを生成したり、意図しないファイルの削除を行ったりする可能性があります。AIが提示する実行内容には常に注意を払ってください。
-
機密情報の扱いに注意
- ローカルのファイルをAIに読み取らせる性質上、機密性の高いデータを扱う環境での実行は避けてください。
-
あくまで「学習用」です
- このCopilotは、MCP(Model Context Protocol)の仕組みを学ぶためのサンプルです。本番環境や業務環境での常用は推奨しません。
🌟 はじめに
みなさん、こんにちは!エンジニアライフ楽しんでいますか?🚀
「GitHub Copilotみたいに、自分のPCのファイルを読んだり、コードを勝手に実行してくれる相棒が欲しい…」と思ったことはありませんか?
実はそれ、MCP (Model Context Protocol) を使えば、驚くほど簡単に作れちゃうんです!✨
今回は、最新のGeminiを使って、Macのデスクトップと会話したり、Pythonを実行させたりできる**「最強の自作Copilot」**の作り方を紹介します!🔥
🛠️ 今回作るものの仕組み
- Frontend (FastAPI): ブラウザからAIとチャットするUI 🎨
- MCP Client (app.py): Geminiと会話しながら、必要に応じてMCPサーバーに指示を出す「司令塔」官 🧠
- MCP Server (server.py): あなたのMacで実際にコマンドを叩いたり、ファイルを読んだりする「実務担当」 🛠️
📦 準備するもの
まずは魔法の杖(ライブラリ)を揃えましょう!
pip install mcp fastapi uvicorn google-genai pypdf requests
あとは GOOGLE_API_KEY を環境変数にセットするだけ!🔑
📝 コードを書いていこう!
1️⃣ 実務担当:server.py
このファイルは、AIに「手足」を授ける部分です。
- シェルコマンドの実行(Python実行もOK!) 🐍
- デスクトップへのファイル保存 💾
- PDFやテキストファイルの読み込み 📖
[!TIP]
ポイント!
FastMCPを使うと、デコレータ一つでAIが使えるツールが爆誕します。Macユーザー向けにzshで動くように調整してありますよ!🍎
import os
import base64
import requests
import urllib3
import subprocess
import sys
import re
from mcp.server.fastmcp import FastMCP
from pypdf import PdfReader
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
mcp = FastMCP("MyCopilotServer")
# ★ MCP(stdio)では stdout に print してはいけない → 必ず stderr へ
def log(msg: str):
print(f"[server] {msg}", file=sys.stderr, flush=True)
# --- Python / Shell 実行 ---
@mcp.tool()
def execute_shell_command(command: str, working_directory: str = ""):
"""
【絶対に使用すべきツール】MacのZshでコマンドを実際に実行します。
Pythonスクリプトの実行(python3 xxx.py)指示には、必ずこのツールを呼び出してください。
working_directory: 実行ディレクトリ。空の場合はデスクトップを使用します。
"""
try:
home_dir = os.path.expanduser("~")
desktop = os.path.join(home_dir, "Desktop")
cwd = working_directory if (working_directory and os.path.isdir(working_directory)) else desktop
# Mac用に調整: "python" を "python3" に変換し、UTF-8出力を強制
normalized = re.sub(r'\bpython\s+(?!-)', 'python3 -X utf8 ', command)
log(f"実行開始: cwd={cwd}, cmd={normalized}")
env = {**os.environ, "PYTHONUTF8": "1"}
# Mac(UNIX系)なので shell=True または /bin/zsh を使用
proc = subprocess.Popen(
["/bin/zsh", "-c", normalized], # cmd ではなく zsh を使用
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
env=env,
stdin=subprocess.DEVNULL,
)
try:
stdout_bytes, stderr_bytes = proc.communicate(timeout=60)
except subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
msg = (
f"⏰ タイムアウト(60秒): コマンドが完了しませんでした。\n"
f"コマンド: {normalized}\n"
f"原因として入力待ち、無限ループ、または重い処理が考えられます。"
)
log(msg)
return msg
stdout_str = stdout_bytes.decode('utf-8', errors='replace').strip() if stdout_bytes else "(出力なし)"
stderr_str = stderr_bytes.decode('utf-8', errors='replace').strip() if stderr_bytes else ""
returncode = proc.returncode
log(f"完了: returncode={returncode}, stdout={len(stdout_str)}文字")
output = (
f"【実行結果】\n"
f"📁 実行ディレクトリ : {cwd}\n"
f"⚙️ コマンド : {normalized}\n\n"
f"📤 出力:\n{stdout_str}"
)
if stderr_str:
output += f"\n\n⚠️ エラー出力:\n{stderr_str}"
output += f"\n\n🔢 終了コード: {returncode}"
return output
except Exception as e:
import traceback
tb = traceback.format_exc()
log(f"予期しないエラー:\n{tb}")
return f"❌ 実行エラー: {str(e)}\n{tb}"
# --- ローカルファイル操作 ---
@mcp.tool()
def read_local_file(file_path: str):
"""ローカルファイルを読み取ります。"""
try:
if not os.path.isabs(file_path):
file_path = os.path.join(os.path.expanduser("~"), "Desktop", file_path)
if not os.path.exists(file_path):
return f"❌ エラー: ファイルが見つかりません: {file_path}"
if file_path.endswith(".pdf"):
return "\n".join([p.extract_text() for p in PdfReader(file_path).pages])
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
except Exception as e:
log(f"read_local_file エラー: {e}")
return f"❌ エラー: {str(e)}"
@mcp.tool()
def save_file_to_desktop(filename: str, content: str, file_type: str):
"""内容をデスクトップに保存します。"""
try:
# USERPROFILE を os.path.expanduser("~") に変更
path = os.path.join(os.path.expanduser("~"), "Desktop", f"{filename}.{file_type}")
with open(path, 'w', encoding='utf-8') as f: # Macならutf-8でOK
f.write(content)
return f"✅ 保存完了: {path}"
except Exception as e:
log(f"save_file_to_desktop エラー: {e}")
return f"❌ エラー: {str(e)}"
if __name__ == "__main__":
mcp.run(transport='stdio')
2️⃣ 司令塔:app.py
ここでは、Gemini(Googleの最新AI)とMCPを繋ぎます。
- AIが「あ、これコマンド実行が必要だな」と判断したら、自動でツールを呼び出します。
- チャット履歴も保存するので、前回の会話も覚えていてくれます!🤖
import os
import asyncio
import webbrowser
import threading
import json
import uuid
import sys
import pathlib
import base64
import time
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
# Gemini SDK
from google import genai
from google.genai import types, errors
# MCP SDK
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
app = FastAPI()
# --- 設定 ---
API_KEY = os.environ.get("GOOGLE_API_KEY")
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-3.1-flash-lite-preview")
SERVER_PATH = str(pathlib.Path(__file__).parent.resolve() / "server.py")
server_params = StdioServerParameters(
command=sys.executable,
args=[SERVER_PATH],
env=os.environ.copy()
)
SYSTEM_PROMPT = """
あなたは「My Copilot」です。Linux、macOS、Python開発に精通した最高峰のエンジニアです。
ユーザーから python xxx.py の実行を求められた場合は、必ず execute_shell_command ツールを呼び出して実際に実行してください。
Mac環境であることを考慮し、適切なコマンドを選択してください。
"""
CACHE_DIR = "cache"
SESSIONS_DIR = os.path.join(CACHE_DIR, "sessions")
os.makedirs(SESSIONS_DIR, exist_ok=True)
if not API_KEY:
print("❌ エラー: GOOGLE_API_KEY が設定されていません。")
sys.exit(1)
client = genai.Client(api_key=API_KEY)
# --- ヘルパー ---
def gemini_obj_to_dict(obj):
if hasattr(obj, 'model_dump'):
data = obj.model_dump(exclude_none=True)
elif isinstance(obj, dict):
data = obj
else:
return obj
def sanitize(item):
if isinstance(item, dict): return {k: sanitize(v) for k, v in item.items()}
elif isinstance(item, list): return [sanitize(i) for i in item]
elif isinstance(item, bytes): return base64.b64encode(item).decode('utf-8')
else: return item
return sanitize(data)
def get_session_list():
sessions = []
if not os.path.exists(SESSIONS_DIR): return []
for fname in os.listdir(SESSIONS_DIR):
if fname.endswith(".json"):
try:
with open(os.path.join(SESSIONS_DIR, fname), "r", encoding="utf-8") as f:
data = json.load(f)
sessions.append({
"id": fname.replace(".json", ""),
"title": data.get("title", "新しいチャット"),
"updated_at": data.get("updated_at", "")
})
except: continue
return sorted(sessions, key=lambda x: x["updated_at"], reverse=True)
async def generate_content_with_retry(contents, tools, config):
max_retries = 5
for i in range(max_retries):
try:
return client.models.generate_content(model=GEMINI_MODEL, contents=contents, config=config)
except errors.ClientError as e:
if e.status_code == 429:
wait_time = (2 ** i)
print(f"⚠️ 429 再試行中... ({i+1}/{max_retries})")
await asyncio.sleep(wait_time)
else: raise e
raise Exception("API制限を超えました。")
# --- HTML コンテンツ ---
HTML_CONTENT = """<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<title>My Copilot (Gemini Mac)</title>
<script src="https://unpkg.com/marked@9.1.6/marked.min.js"></script>
<style>
:root { --sidebar-width: 260px; --header-height: 60px; }
* { box-sizing: border-box; }
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 0; display: flex;
height: 100vh; background: #f9f9f9; overflow: hidden; }
#sidebar { width: var(--sidebar-width); background: #202123; color: white; display: flex; flex-direction: column; flex-shrink: 0; }
#new-chat-btn { margin: 10px; padding: 12px; border: 1px solid #4d4d4f; border-radius: 5px; cursor: pointer;
background: transparent; color: white; font-size: 0.9em; text-align: left; }
#new-chat-btn:hover { background: #2b2c2f; }
#session-list { flex: 1; overflow-y: auto; padding: 5px; }
.session-item { padding: 8px 10px; margin: 2px 0; border-radius: 5px; cursor: pointer; font-size: 0.85em;
display: flex; align-items: center; justify-content: space-between; position: relative; }
.session-item:hover { background: #2b2c2f; }
.session-item.active { background: #343541; }
.session-title { overflow: hidden; text-overflow: ellipsis; white-space: nowrap; flex: 1; }
.session-actions { display: none; gap: 8px; margin-left: 10px; }
.session-item:hover .session-actions { display: flex; }
.action-btn { background: none; border: none; color: #acacbe; cursor: pointer; padding: 2px; font-size: 1.1em; }
.action-btn:hover { color: white; }
#main-content { flex: 1; display: flex; flex-direction: column; min-width: 0; }
#header { height: var(--header-height); background: white; border-bottom: 1px solid #ddd; display: flex; align-items: center; padding: 0 20px; font-weight: bold; justify-content: space-between; }
#chat-container { flex: 1; overflow-y: auto; padding: 20px; display: flex; flex-direction: column; gap: 15px; }
.msg { padding: 15px 20px; border-radius: 12px; max-width: 85%; line-height: 1.6; }
.user { background: #007bff; color: white; align-self: flex-end; }
.ai { background: white; border: 1px solid #e0e0e0; align-self: flex-start; color: #202124; }
#input-area-container { background: white; padding: 16px 20px; border-top: 1px solid #ddd; }
#input-area { max-width: 800px; margin: 0 auto; display: flex; gap: 10px; border: 1px solid #ddd; border-radius: 15px; padding: 8px 15px; background: white; }
#user-input { flex: 1; border: none; outline: none; resize: none; font-size: 1em; max-height: 200px; padding: 5px 0; }
#send-btn { background: #28a745; color: white; border: none; border-radius: 8px; padding: 8px 18px; cursor: pointer; font-weight: bold; }
#send-btn:disabled { background: #ccc; cursor: not-allowed; }
pre { background: #1e1e1e; color: #d4d4d4; padding: 12px; border-radius: 8px; overflow-x: auto; }
.status { font-size: 0.8em; color: #666; font-style: italic; margin-top: 5px; }
</style>
</head>
<body>
<div id="sidebar">
<button id="new-chat-btn" onclick="startNewChat()">+ 新しいチャット</button>
<div id="session-list"></div>
</div>
<div id="main-content">
<div id="header">
<span id="current-title">My Copilot (Gemini Mac)</span>
<span style="font-size:0.75em; background:#e6f4ea; color:#1e8e3e; padding:4px 10px; border-radius:12px;">● Online</span>
</div>
<div id="chat-container"></div>
<div id="input-area-container">
<div id="input-area">
<textarea id="user-input" placeholder="指示を出す... (Cmd+Enterで送信)" rows="1"></textarea>
<button id="send-btn" onclick="send()">送信</button>
</div>
</div>
</div>
<script>
let currentSessionId = null;
const container = document.getElementById('chat-container');
const input = document.getElementById('user-input');
const sendBtn = document.getElementById('send-btn');
marked.setOptions({ breaks: true, gfm: true });
async function refreshSessions() {
const res = await fetch('/sessions');
const sessions = await res.json();
const list = document.getElementById('session-list');
list.innerHTML = '';
sessions.forEach(s => {
const div = document.createElement('div');
div.className = 'session-item' + (s.id === currentSessionId ? ' active' : '');
div.innerHTML = `
<div class="session-title" onclick="loadSession('${s.id}')">💬 ${s.title}</div>
<div class="session-actions">
<button class="action-btn" title="名前を変更" onclick="event.stopPropagation(); renameSession('${s.id}', '${s.title}')">✏️</button>
<button class="action-btn" title="削除" onclick="event.stopPropagation(); deleteSession('${s.id}')">🗑️</button>
</div>
`;
list.appendChild(div);
});
}
async function deleteSession(id) {
if (!confirm('このチャットを削除しますか?')) return;
await fetch(`/session/${id}`, { method: 'DELETE' });
if (currentSessionId === id) startNewChat();
else refreshSessions();
}
async function renameSession(id, oldTitle) {
const newTitle = prompt('新しいタイトルを入力してください:', oldTitle);
if (!newTitle || newTitle === oldTitle) return;
await fetch(`/session/${id}/rename`, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ title: newTitle })
});
if (currentSessionId === id) document.getElementById('current-title').textContent = newTitle;
refreshSessions();
}
function startNewChat() {
currentSessionId = null;
container.innerHTML = '';
document.getElementById('current-title').textContent = 'My Copilot (Gemini Mac)';
refreshSessions();
}
async function loadSession(id) {
currentSessionId = id;
const res = await fetch('/session/' + id);
const data = await res.json();
document.getElementById('current-title').textContent = data.title;
container.innerHTML = '';
(data.history || []).forEach(msg => {
const text = (msg.parts || []).filter(p => p.text).map(p => p.text).join('\\n');
if (text) addMsg(text, msg.role === 'user' ? 'user' : 'ai');
});
refreshSessions();
container.scrollTop = container.scrollHeight;
}
async function send() {
const text = input.value.trim();
if (!text || sendBtn.disabled) return;
input.value = ''; input.style.height = 'auto'; sendBtn.disabled = true;
addMsg(text, 'user');
const statusDiv = document.createElement('div');
statusDiv.className = 'status'; statusDiv.textContent = '⏳ Thinking...';
container.appendChild(statusDiv);
container.scrollTop = container.scrollHeight;
try {
const res = await fetch('/chat', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ message: text, session_id: currentSessionId })
});
const data = await res.json();
if (statusDiv.parentNode) container.removeChild(statusDiv);
currentSessionId = data.session_id;
addMsg(data.reply, 'ai');
refreshSessions();
} catch(e) {
if (statusDiv.parentNode) container.removeChild(statusDiv);
addMsg('❌ エラーが発生しました。', 'ai');
} finally {
sendBtn.disabled = false;
}
}
function addMsg(text, role) {
const div = document.createElement('div');
div.className = 'msg ' + role;
div.innerHTML = role === 'ai' ? marked.parse(text) : text;
container.appendChild(div);
container.scrollTop = container.scrollHeight;
}
input.addEventListener('keydown', (e) => {
if (e.key === 'Enter' && (e.metaKey || e.ctrlKey)) { e.preventDefault(); send(); }
});
input.addEventListener('input', function() {
this.style.height = 'auto';
this.style.height = Math.min(this.scrollHeight, 200) + 'px';
});
window.onload = refreshSessions;
</script>
</body>
</html>"""
@app.get("/")
async def index(): return HTMLResponse(HTML_CONTENT)
@app.get("/sessions")
async def sessions_api(): return JSONResponse(get_session_list())
@app.get("/session/{session_id}")
async def get_session(session_id: str):
path = os.path.join(SESSIONS_DIR, f"{session_id}.json")
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f: return JSONResponse(json.load(f))
return JSONResponse({"title": "新しいチャット", "history": []})
@app.delete("/session/{session_id}")
async def delete_session(session_id: str):
path = os.path.join(SESSIONS_DIR, f"{session_id}.json")
if os.path.exists(path):
os.remove(path)
return {"status": "ok"}
raise HTTPException(status_code=404)
@app.post("/session/{session_id}/rename")
async def rename_session(session_id: str, request: Request):
data = await request.json()
new_title = data.get("title", "無題のチャット")
path = os.path.join(SESSIONS_DIR, f"{session_id}.json")
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
session_data = json.load(f)
session_data["title"] = new_title
with open(path, "w", encoding="utf-8") as f:
json.dump(session_data, f, ensure_ascii=False, indent=2)
return {"status": "ok"}
raise HTTPException(status_code=404)
@app.post("/chat")
async def chat(request: Request):
session_id = str(uuid.uuid4())
try:
data = await request.json()
user_msg = data.get("message", "")
session_id = data.get("session_id") or session_id
path = os.path.join(SESSIONS_DIR, f"{session_id}.json")
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
session_data = json.load(f)
else:
session_data = {"title": user_msg[:20], "history": [], "updated_at": ""}
history = session_data["history"]
formatted_history = [types.Content(role=h["role"], parts=[types.Part(**p) for p in h["parts"]]) for h in history]
# MCPサーバーとの接続
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as mcp_session:
await mcp_session.initialize()
mcp_tools = await mcp_session.list_tools()
# Gemini用にツール定義を変換(安定化のためのクリーンアップ)
decls = []
for t in mcp_tools.tools:
schema = t.inputSchema.copy()
# Geminiが解釈できない可能性があるフィールドを削除
schema.pop("additionalProperties", None)
schema.pop("$schema", None)
decls.append(types.FunctionDeclaration(
name=t.name,
description=t.description or "",
parameters=schema
))
gemini_tools = [types.Tool(function_declarations=decls)]
formatted_history.append(types.Content(role="user", parts=[types.Part(text=user_msg)]))
# Function Calling のループ
while True:
config = types.GenerateContentConfig(
system_instruction=SYSTEM_PROMPT,
tools=gemini_tools
)
response = await generate_content_with_retry(formatted_history, gemini_tools, config)
if not response.candidates or not response.candidates[0].content:
raise Exception("Geminiからの応答が空です。")
response_content = response.candidates[0].content
formatted_history.append(response_content)
# ツール実行指示があるか確認
function_calls = [p.function_call for p in response_content.parts if p.function_call]
if function_calls:
tool_results_parts = []
for fc in function_calls:
print(f"🛠️ MCPツール実行: {fc.name}({fc.args})")
try:
# MCPサーバー側の関数を呼び出す
result = await mcp_session.call_tool(fc.name, arguments=fc.args)
res_text = result.content[0].text if result.content else "Success"
tool_results_parts.append(types.Part(
function_response=types.FunctionResponse(
name=fc.name,
response={"result": res_text}
)
))
except Exception as tool_err:
print(f"❌ ツール実行失敗: {str(tool_err)}")
tool_results_parts.append(types.Part(
function_response=types.FunctionResponse(
name=fc.name,
response={"result": f"Error: {str(tool_err)}"}
)
))
# ツールの実行結果を履歴に追加してループ継続
formatted_history.append(types.Content(role="user", parts=tool_results_parts))
else:
# ツール実行がなければ最終回答として終了
final_reply = response.text
session_data["history"] = [gemini_obj_to_dict(h) for h in formatted_history]
session_data["updated_at"] = datetime.now().isoformat()
with open(path, "w", encoding="utf-8") as f:
json.dump(session_data, f, ensure_ascii=False, indent=2)
return JSONResponse({"reply": final_reply, "session_id": session_id})
except Exception as e:
import traceback
print("\n" + "!"*60)
print("🚨 エラー詳細ログ出力:")
print(traceback.format_exc())
print("!"*60 + "\n")
return JSONResponse({
"reply": f"申し訳ありません、エラーが発生しました。\n\n```\n{str(e)}\n```",
"session_id": session_id
})
if __name__ == "__main__":
import uvicorn
print(f"🚀 My Copilot 起動中... モデル: {GEMINI_MODEL}")
threading.Timer(1.5, lambda: webbrowser.open("http://127.0.0.1:8000")).start()
uvicorn.run(app, host="127.0.0.1", port=8000)
🚀 いざ、起動!
コマンドラインで python app.py を実行すると…
ブラウザが自動で開き、あなた専用のCopilot画面が立ち上がります!🎉
こんな指示をしてみてください:
- 「現在時刻を表示するPythonコードを書いて、そのまま実行してみて」
- 「Hello world!!を表示するhello.pyを作って」
- 「Yoube動画 https://www.youtube.com/watch?v=OIQ18Roycuo の内容を教えて下さい。」
【重要】ご利用は自己責任でお願いします
本記事のコードはローカル環境を操作する強力なツールです。
学習用として活用し、実行されるコマンドの内容は必ず確認するようにしてください。
✨ MCPで作るCopilotのここがスゴイ!
- 安全・爆速: 自分のローカル環境で動くから、機密情報も安心(設定次第)!
-
無限の拡張性:
server.pyに関数を追加するだけで、AIにできることがどんどん増えます。 - Gemini 3.1 Flash (Lite) を使えば、レスポンスも超速でストレスフリー!⚡️
👋 おわりに
MCPの世界へようこそ!
今まで「AIにコードを書いてもらうだけ」だったのが、これからは**「AIに動かしてもらう」**時代です。
ぜひ自分好みにカスタマイズして、世界に一つだけの最強の相棒を育ててみてくださいね!🧸💎
免責事項
本記事で提供される情報、コード、および解説は、あくまで技術的な学習および情報共有を目的としたものです。
- 損害に対する責任: 本記事の内容(プログラムの実行、設定の変更等)を適用した結果、利用者または第三者に生じたいかなる直接的・間接的な損害(データの損失、システムの停止、セキュリティ侵害、PCの故障等)について、筆者は一切の責任を負いません。
- 正確性の保証: 記事の内容は投稿時点の情報に基づき作成されていますが、その正確性、完全性、最新性を保証するものではありません。また、外部ライブラリ(MCP, Gemini SDK等)やAPIの仕様変更に伴う動作不良についても責任を負いかねます。
- 自己責任の原則: 本プログラムの利用およびコマンドの実行は、必ず利用者の管理・判断・責任において行ってください。
本記事を参照・利用した時点で、上記の免責事項に同意したものとみなします。
