0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

経済ニュースのサマリーを取得・通知するサービスの構築 (MCP + Python + Claude Desktop)

Posted at

Claude DesktopとMCP、Pythonで構築したサーバーを使用して経済ニュースを配信するサービスを構築します。

全体の構成

システム全体の構成図

[Claude Desktop] ←→ [MCP Server (Python)] ←→ [News API/RSS Feed]

システム全体の処理フロー

  1. ユーザーがClaude Desktopに「今日の経済ニュースのトップは?」等を入力
  2. Claude DesktopがMCPサーバーのget_top_news関数を呼び出し
  3. MCPサーバーがニュースAPIからトップ10記事を取得
  4. MCPサーバーが記事データ(タイトル、URL、要約)をClaudeに返却
  5. Claudeが重要度を判断して3本を選出し、サマリーと共に表示

開発環境の構築

  • PCはWindows 11 Pro / 23H2 / 22631.5472

1. 前提条件

  • Python 3.8以上
  • Claude Desktop(最新版)
  • News API キー(NewsAPI.orgなど)

2. プロジェクト構造

今回はC:\work\project\news-mcp-server以下に環境フォルダを作成しています。

news-mcp-server/
├── run_server.py
├── src/
│   ├── __init__.py
│   ├── server.py         // MCPサーバーのメイン処理
│   └── news_fetcher.py   // ニュース取得処理
├── requirements.txt
├── pyproject.toml
└── .env                   // News APIキー等の環境変数

3. 必要パッケージのインストール

Pythonのバージョンは今回以下を使用しています。
Powershell上で作業

python --version
Python 3.13.5
cd C:\work\project\news-mcp-server\

pip install mcp httpx python-dotenv feedparser beautifulsoup4

MCPサーバー側の実装

requirements.txt

mcp>=1.0.0
httpx>=0.25.0
python-dotenv>=1.0.0
feedparser>=6.0.10
beautifulsoup4>=4.12.0
  • News API キーの取得
  • NewsAPI.orgでアカウント作成
  • APIキーを.envファイルに設定

.env ファイル

NEWS_API_KEY=your_newsapi_key_here

# Optional: Logging Level
LOG_LEVEL=INFO

# Optional: HTTP Timeout
HTTP_TIMEOUT=30

run_server.py

#!/usr/bin/env python3
"""
Economic News MCP Server - 実行用スクリプト
"""

import sys
import os
import asyncio
import logging
from pathlib import Path

# プロジェクトの絶対パスを設定
PROJECT_ROOT = Path(__file__).parent.absolute()
SRC_PATH = PROJECT_ROOT / "src"

# パスを確実に追加
if str(SRC_PATH) not in sys.path:
    sys.path.insert(0, str(SRC_PATH))

# 環境変数の設定
os.chdir(str(PROJECT_ROOT))

# ログ設定(Claude Desktopのログに出力)
logging.basicConfig(
    level=logging.INFO,  # DEBUGからINFOに変更
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stderr),  # Claude Desktopのログに出力
        logging.FileHandler(PROJECT_ROOT / "mcp_server.log", encoding='utf-8')
    ]
)
logger = logging.getLogger(__name__)

def check_environment():
    """環境チェック"""
    print(f"Python version: {sys.version}", file=sys.stderr)
    print(f"Working directory: {os.getcwd()}", file=sys.stderr)
    print(f"Project root: {PROJECT_ROOT}", file=sys.stderr)
    print(f"Source path: {SRC_PATH}", file=sys.stderr)
    
    # .envファイルの確認
    env_file = PROJECT_ROOT / ".env"
    if env_file.exists():
        print(f" Found .env file: {env_file}", file=sys.stderr)
    else:
        print(f" No .env file found at: {env_file}", file=sys.stderr)

def main():
    """メイン実行関数"""
    try:
        print("=" * 50, file=sys.stderr)
        print("Economic News MCP Server Starting...", file=sys.stderr)
        check_environment()
        print("=" * 50, file=sys.stderr)
        
        # 必要なモジュールのインポート確認
        try:
            print("Testing imports...", file=sys.stderr)
            from dotenv import load_dotenv
            print(" dotenv imported", file=sys.stderr)
            
            # .envファイルを読み込み
            env_file = PROJECT_ROOT / ".env"
            load_dotenv(env_file)
            print(" .env loaded", file=sys.stderr)
            
            from news_fetcher import NewsFetcher
            print(" news_fetcher imported", file=sys.stderr)
            
            from server import main as server_main
            print(" server imported", file=sys.stderr)
            
        except ImportError as e:
            print(f"Import failed: {e}", file=sys.stderr)
            print("Please install required packages:", file=sys.stderr)
            print("pip install mcp httpx python-dotenv feedparser beautifulsoup4", file=sys.stderr)
            return 1
        
        # API キーの確認
        api_key = os.getenv('NEWS_API_KEY')
        if api_key:
            print(f" NEWS_API_KEY found (starts with: {api_key[:8]}...)", file=sys.stderr)
        else:
            print(" NEWS_API_KEY not found - will use RSS fallback", file=sys.stderr)
        
        # サーバー実行
        print("Starting MCP server...", file=sys.stderr)
        asyncio.run(server_main())
        
    except KeyboardInterrupt:
        print("Server stopped by user (Ctrl+C)", file=sys.stderr)
        return 0
    except Exception as e:
        print(f"Fatal error: {e}", file=sys.stderr)
        import traceback
        traceback.print_exc(file=sys.stderr)
        return 1

if __name__ == "__main__":
    exit_code = main()
    sys.exit(exit_code)

src/init.py

"""
Economic News MCP Server
"""

__version__ = "1.0.0"

# 絶対インポート
from server import app
from news_fetcher import NewsFetcher

__all__ = ['app', 'NewsFetcher']

src/news_fetcher.py

"""
News Fetcher Module - Fixed Version with Working RSS Feeds
"""

import httpx
import feedparser
import os
import logging
import asyncio
from typing import List, Dict, Optional
from datetime import datetime, timezone  # ← 追加
import json
import re
from bs4 import BeautifulSoup

logger = logging.getLogger(__name__)

class NewsFetcher:
    """ニュース取得クラス"""
    
    def __init__(self):
        self.news_api_key = os.getenv('NEWS_API_KEY')
        self.base_url = "https://newsapi.org/v2"
        self.timeout = 30.0
        self.max_retries = 3
        
        # 動作するRSSフィード設定(2025年対応)
        self.rss_feeds = [
            {
                'url': "https://news.yahoo.co.jp/rss/topics/business.xml",
                'name': "Yahoo!ニュース(経済)",
                'encoding': 'utf-8'
            },
            {
                'url': "https://www3.nhk.or.jp/rss/news/cat2.xml",
                'name': "NHKニュース(経済)",
                'encoding': 'utf-8'
            }
        ]
        
        logger.info(f"NewsFetcher initialized with {len(self.rss_feeds)} RSS feeds")
        if self.news_api_key:
            logger.info(f"NewsAPI available: {self.news_api_key[:8]}...")
        else:
            logger.info("NewsAPI not available - using RSS only")
    
    async def get_economic_news(self, limit: int = 10, category: str = "business") -> List[Dict]:
        """経済ニュースを取得"""
        logger.info(f"Getting economic news: limit={limit}, category={category}")
        
        all_articles = []
        
        # 1. NewsAPI(利用可能な場合)
        if self.news_api_key:
            try:
                api_articles = await self._get_newsapi_articles(limit // 2, category)
                if api_articles:
                    all_articles.extend(api_articles)
                    logger.info(f"NewsAPI: {len(api_articles)} articles")
            except Exception as e:
                logger.warning(f"NewsAPI failed: {e}")
        
        # 2. RSS フィード
        rss_articles = await self._get_rss_articles(limit)
        if rss_articles:
            all_articles.extend(rss_articles)
            logger.info(f"RSS: {len(rss_articles)} articles")
        
        # 3. フォールバック記事(RSSも失敗した場合)
        if len(all_articles) < 3:
            fallback_articles = self._get_fallback_articles(limit)
            all_articles.extend(fallback_articles)
            logger.info(f"Fallback: {len(fallback_articles)} articles")
        
        # 重複除去とソート
        unique_articles = self._deduplicate_articles(all_articles)
        sorted_articles = sorted(unique_articles, key=lambda x: x.get('published_at', ''), reverse=True)
        
        result = sorted_articles[:limit]
        logger.info(f"Final result: {len(result)} articles")
        
        return result
    
    async def _get_newsapi_articles(self, limit: int, category: str) -> List[Dict]:
        """NewsAPIから記事を取得"""
        
        for attempt in range(self.max_retries):
            try:
                async with httpx.AsyncClient(timeout=self.timeout) as client:
                    # 日本語記事を取得するパラメータ
                    params = {
                        'apiKey': self.news_api_key,
                        'q': '経済 OR ビジネス OR 株式 OR 市場',  # 日本語キーワード
                        'language': 'ja',
                        'sortBy': 'popularity',
                        'pageSize': min(limit, 20)
                    }
                    
                    logger.debug(f"NewsAPI request: {params}")
                    response = await client.get(f"{self.base_url}/everything", params=params)
                    response.raise_for_status()
                    
                    data = response.json()
                    
                    if data.get('status') != 'ok':
                        raise Exception(f"NewsAPI error: {data.get('message', 'Unknown error')}")
                    
                    articles = []
                    for article_data in data.get('articles', []):
                        if self._is_valid_article(article_data):
                            articles.append(self._format_newsapi_article(article_data))
                    
                    return articles
                    
            except Exception as e:
                logger.error(f"NewsAPI error on attempt {attempt + 1}: {e}")
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)
                else:
                    raise
        
        return []
    
    async def _get_rss_articles(self, limit: int) -> List[Dict]:
        """RSSフィードから記事を取得"""
        logger.info("Fetching articles from RSS feeds...")
        
        all_articles = []
        
        for feed_info in self.rss_feeds:
            try:
                logger.info(f"Fetching from: {feed_info['name']}")
                articles = await self._fetch_single_rss(feed_info, 5)  # 各フィードから5件
                
                if articles:
                    all_articles.extend(articles)
                    logger.info(f" {feed_info['name']}: {len(articles)} articles")
                else:
                    logger.warning(f"{feed_info['name']}: No articles")
                    
            except Exception as e:
                logger.warning(f" {feed_info['name']} failed: {e}")
                continue
        
        logger.info(f"RSS total: {len(all_articles)} articles from all feeds")
        return all_articles
    
    async def _fetch_single_rss(self, feed_info: Dict, max_articles: int) -> List[Dict]:
        """単一のRSSフィードから記事を取得"""
        
        try:
            async with httpx.AsyncClient(
                timeout=self.timeout,
                headers={
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                }
            ) as client:
                
                logger.debug(f"Requesting: {feed_info['url']}")
                response = await client.get(feed_info['url'], follow_redirects=True)
                response.raise_for_status()
                
                # Content-Typeから文字エンコーディングを検出
                content_type = response.headers.get('content-type', '')
                if 'charset=' in content_type:
                    charset = content_type.split('charset=')[1].split(';')[0]
                    response.encoding = charset
                elif feed_info.get('encoding'):
                    response.encoding = feed_info['encoding']
                
                # RSSパース
                feed = feedparser.parse(response.text)
                
                # フィード情報をログ出力
                logger.debug(f"Feed title: {getattr(feed.feed, 'title', 'Unknown')}")
                logger.debug(f"Feed entries: {len(feed.entries)}")
                
                if not feed.entries:
                    logger.warning(f"No entries found in feed: {feed_info['name']}")
                    return []
                
                articles = []
                for entry in feed.entries[:max_articles]:
                    if self._is_valid_rss_entry(entry):
                        article = self._format_rss_article(entry, feed_info['name'])
                        articles.append(article)
                
                return articles
                
        except httpx.HTTPStatusError as e:
            logger.error(f"HTTP error for {feed_info['name']}: {e.response.status_code}")
            raise
        except Exception as e:
            logger.error(f"RSS fetch error for {feed_info['name']}: {e}")
            raise
    
    def _get_fallback_articles(self, limit: int) -> List[Dict]:
        """フォールバック記事(オフライン時用)"""
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        fallback_articles = [
            {
                'title': '日経平均株価、本日の取引動向について',
                'description': '本日の日本株式市場の動向と主要指標の分析',
                'url': 'https://www.nikkei.com/',
                'source': 'フォールバック記事',
                'published_at': current_time,
                'url_to_image': None
            },
            {
                'title': '円相場の最新動向と今後の見通し',
                'description': '為替市場における円の動きと経済への影響分析',
                'url': 'https://www.bloomberg.co.jp/',
                'source': 'フォールバック記事',
                'published_at': current_time,
                'url_to_image': None
            },
            {
                'title': '主要企業の決算発表と業績見通し',
                'description': '上場企業の最新決算情報と市場への影響',
                'url': 'https://kabutan.jp/',
                'source': 'フォールバック記事',
                'published_at': current_time,
                'url_to_image': None
            }
        ]
        
        return fallback_articles[:limit]
    
    def _is_valid_article(self, article: Dict) -> bool:
        """NewsAPI記事の有効性チェック"""
        return (
            article.get('title') and
            article.get('url') and
            article.get('title') != '[Removed]' and
            'http' in article.get('url', '') and
            len(article.get('title', '')) > 10  # タイトルが短すぎるものを除外
        )
    
    def _is_valid_rss_entry(self, entry) -> bool:
        """RSS記事の有効性チェック"""
        if not (hasattr(entry, 'title') and hasattr(entry, 'link')):
            return False
        
        title = getattr(entry, 'title', '')
        link = getattr(entry, 'link', '')
        
        return (
            title and link and
            len(title.strip()) > 5 and
            ('http' in link) and
            not any(spam in title.lower() for spam in ['広告', 'pr', 'sponsored'])
        )
    
    def _format_newsapi_article(self, article: Dict) -> Dict:
        """NewsAPI記事をフォーマット"""
        return {
            'title': article['title'],
            'description': article.get('description', ''),
            'url': article['url'],
            'source': article['source']['name'],
            'published_at': article.get('publishedAt', ''),
            'url_to_image': article.get('urlToImage')
        }
    
    def _format_rss_article(self, entry, source_name: str) -> Dict:
        """RSS記事をフォーマット"""
        return {
            'title': getattr(entry, 'title', ''),
            'description': getattr(entry, 'summary', getattr(entry, 'description', ''))[:200],
            'url': getattr(entry, 'link', ''),
            'source': source_name,
            'published_at': getattr(entry, 'published', getattr(entry, 'updated', '')),
            'url_to_image': None
        }
    
    def _deduplicate_articles(self, articles: List[Dict]) -> List[Dict]:
        """記事の重複を除去"""
        seen_titles = set()
        seen_urls = set()
        unique_articles = []
        
        for article in articles:
            # タイトルの正規化
            title_key = re.sub(r'[^\w\s]', '', article['title'].lower()).strip()
            url_key = article['url'].lower()
            
            if title_key not in seen_titles and url_key not in seen_urls:
                seen_titles.add(title_key)
                seen_urls.add(url_key)
                unique_articles.append(article)
        
        return unique_articles
    
    async def test_connection(self) -> bool:
        """接続テスト"""
        logger.info("=== Connection Test Starting ===")
        
        success_count = 0
        total_tests = 0
        
        # NewsAPIテスト
        if self.news_api_key:
            total_tests += 1
            try:
                api_articles = await self._get_newsapi_articles(1, "business")
                if api_articles:
                    success_count += 1
                    logger.info(" NewsAPI test: PASS")
                else:
                    logger.warning("⚠ NewsAPI test: No articles returned")
            except Exception as e:
                logger.error(f" NewsAPI test: FAIL - {e}")
        
        # RSSフィードテスト
        for feed_info in self.rss_feeds:
            total_tests += 1
            try:
                articles = await self._fetch_single_rss(feed_info, 1)
                if articles:
                    success_count += 1
                    logger.info(f" RSS test ({feed_info['name']}): PASS")
                else:
                    logger.warning(f" RSS test ({feed_info['name']}): No articles")
            except Exception as e:
                logger.error(f" RSS test ({feed_info['name']}): FAIL - {e}")
        
        # フォールバックは常に利用可能
        total_tests += 1
        success_count += 1
        logger.info(" Fallback articles: AVAILABLE")
        
        success_rate = success_count / total_tests if total_tests > 0 else 0
        logger.info(f"=== Test Results: {success_count}/{total_tests} ({success_rate:.1%}) ===")
        
        return success_count > 0

src/server.py

"""
Economic News MCP Server - Main Server Module
"""

import asyncio
import os
import sys
import logging
from pathlib import Path
from typing import Any, Sequence
from datetime import datetime  # ← 追加
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, CallToolResult
from dotenv import load_dotenv

# パス設定
current_dir = Path(__file__).parent
project_root = current_dir.parent

# 環境変数の読み込み
env_path = project_root / ".env"
if env_path.exists():
    load_dotenv(env_path)

# ログ設定
logger = logging.getLogger(__name__)

# NewsFetcherをインポート
try:
    from news_fetcher import NewsFetcher
    logger.info("NewsFetcher imported successfully")
except ImportError as e:
    logger.error(f"Failed to import NewsFetcher: {e}")
    sys.exit(1)

# MCPサーバーの初期化
app = Server("economic-news-server")
news_fetcher = NewsFetcher()

@app.list_tools()
async def list_tools() -> list[Tool]:
    """利用可能なツールのリストを返す"""
    logger.info("Tools list requested")
    return [
        Tool(
            name="get_economic_news",
            description="経済ニュースのトップ記事を取得します。日本の主要経済メディアから最新ニュースを収集します。",
            inputSchema={
                "type": "object",
                "properties": {
                    "limit": {
                        "type": "integer",
                        "description": "取得する記事数(1-20)",
                        "default": 10,
                        "minimum": 1,
                        "maximum": 20
                    },
                    "category": {
                        "type": "string",
                        "description": "ニュースカテゴリ",
                        "enum": ["business", "technology", "general"],
                        "default": "business"
                    }
                },
                "required": []
            }
        ),
        Tool(
            name="get_news_summary",
            description="重要なニュースをサマリー形式で取得します",
            inputSchema={
                "type": "object",
                "properties": {
                    "count": {
                        "type": "integer",
                        "description": "サマリーに含める記事数",
                        "default": 3,
                        "minimum": 1,
                        "maximum": 5
                    }
                },
                "required": []
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
    """ツールの実行"""
    logger.info(f"Tool called: {name} with arguments: {arguments}")
    
    try:
        if name == "get_economic_news":
            return await handle_get_economic_news(arguments)
        elif name == "get_news_summary":
            return await handle_get_news_summary(arguments)
        else:
            logger.warning(f"Unknown tool: {name}")
            return [TextContent(
                type="text",
                text=f"エラー: 不明なツール '{name}' が呼び出されました"
            )]
    except Exception as e:
        logger.error(f"Tool execution error: {e}", exc_info=True)
        return [TextContent(
            type="text",
            text=f"ツール実行中にエラーが発生しました: {str(e)}"
        )]

async def handle_get_economic_news(arguments: dict[str, Any]) -> list[TextContent]:
    """経済ニュース取得の処理"""
    limit = arguments.get("limit", 10)
    category = arguments.get("category", "business")
    
    logger.info(f"Fetching {limit} {category} articles...")
    
    try:
        # ニュース取得の試行
        articles = await news_fetcher.get_economic_news(limit, category)
        
        if not articles:
            return [TextContent(
                type="text",
                text="""📰 ニュース取得結果

現在、ニュースデータを取得できませんでした。

考えられる原因:
- インターネット接続の問題
- RSSフィードの一時的な障害
- APIキーの設定問題

しばらく時間をおいて再度お試しください。
ログファイル(mcp_server.log)で詳細なエラー情報を確認できます。"""
            )]
        
        # 結果のフォーマット
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        source_count = len(set(a.get('source', 'Unknown') for a in articles))
        
        result_lines = [
            f" 経済ニュース TOP{len(articles)}",
            f" 取得時刻: {current_time}",
            f" データソース: {source_count}種類",
            "",
        ]
        
        for i, article in enumerate(articles, 1):
            title = article.get('title', 'タイトル不明')
            source = article.get('source', 'Unknown')
            url = article.get('url', '')
            description = article.get('description', '')
            published_at = article.get('published_at', '')
            
            result_lines.extend([
                f"{i}{title}",
                f" {source}",
                f" {url}",
            ])
            
            if description:
                desc = description[:150]
                result_lines.append(f"📝 {desc}...")
            
            if published_at:
                result_lines.append(f"📅 {published_at}")
            
            result_lines.append("")
        
        result_text = "\n".join(result_lines)
        logger.info(f"Successfully returned {len(articles)} articles from {source_count} sources")
        
        return [TextContent(type="text", text=result_text)]
        
    except Exception as e:
        logger.error(f"Error in handle_get_economic_news: {e}", exc_info=True)
        return [TextContent(
            type="text",
            text=f""" ニュース取得エラー

エラーメッセージ: {str(e)}

対処方法:
1. インターネット接続を確認してください
2. .envファイルのNEWS_API_KEYを確認してください
3. ログファイル(mcp_server.log)で詳細を確認してください
4. しばらく時間をおいて再度お試しください"""
        )]

async def handle_get_news_summary(arguments: dict[str, Any]) -> list[TextContent]:
    """ニュースサマリー取得の処理"""
    count = arguments.get("count", 3)
    
    logger.info(f"Generating news summary for top {count} articles")
    
    try:
        # 最新ニュースを取得
        articles = await news_fetcher.get_economic_news(10)
        
        if not articles:
            return [TextContent(
                type="text",
                text="サマリー作成用のニュースデータを取得できませんでした。"
            )]
        
        # トップ記事を選出
        top_articles = articles[:count]
        
        result_lines = [
            f" 重要経済ニュース TOP{count}",
            f" {len(articles)}件から選出",
            "",
        ]
        
        for i, article in enumerate(top_articles, 1):
            title = article.get('title', 'タイトル不明')
            source = article.get('source', 'Unknown')
            url = article.get('url', '')
            description = article.get('description', '')
            
            result_lines.extend([
                f"{i}位: {title}",
                f" 出典: {source}",
            ])
            
            if description:
                result_lines.append(f" 概要: {description}")
            
            result_lines.extend([
                f" 詳細: {url}",
                "" * 50,
                ""
            ])
        
        result_text = "\n".join(result_lines)
        logger.info(f"Successfully generated summary for {len(top_articles)} articles")
        
        return [TextContent(type="text", text=result_text)]
        
    except Exception as e:
        logger.error(f"Error in handle_get_news_summary: {e}", exc_info=True)
        return [TextContent(
            type="text",
            text=f"サマリー生成中にエラーが発生しました: {str(e)}"
        )]

async def main():
    """メインエントリーポイント"""
    logger.info("Economic News MCP Server initializing...")
    logger.info(f"Working directory: {os.getcwd()}")
    logger.info(f"Python path: {sys.path[:2]}...")
    
    # 初期化テスト
    try:
        logger.info("Testing news fetcher initialization...")
        test_result = await news_fetcher.test_connection()
        if test_result:
            logger.info(" News fetcher test passed")
        else:
            logger.warning(" News fetcher test failed - server will continue")
    except Exception as e:
        logger.error(f"News fetcher test error: {e}")
        logger.warning("Server will continue despite test failure")
    
    # MCPサーバー開始
    try:
        logger.info("Starting MCP stdio server...")
        async with stdio_server() as (read_stream, write_stream):
            await app.run(
                read_stream,
                write_stream,
                InitializationOptions(
                    server_name="economic-news-server",
                    server_version="1.0.0",
                    capabilities={}
                )
            )
    except Exception as e:
        logger.error(f"Server runtime error: {e}", exc_info=True)
        raise

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Server stopped by user")
    except Exception as e:
        logger.error(f"Fatal server error: {e}", exc_info=True)
        sys.exit(1)

pyproject.toml

[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[project]
name = "economic-news-mcp-server"
version = "1.0.0"
description = "MCP server for fetching economic news"
dependencies = [
    "mcp>=1.0.0",
    "httpx>=0.24.0",
    "python-dotenv>=1.0.0",
    "feedparser>=6.0.0",
    "beautifulsoup4>=4.12.0"
]

[tool.setuptools.packages.find]
where = ["src"]

[tool.setuptools.package-dir]
"" = "src"

Claude Desktop設定

(1) Claude Desktopアプリ画面左上のメニュー(三本線)を選択して[ファイル]→[設定]を選択
(2) 「設定」画面の一番下の「開発者」をクリック→「ローカルMCPサーバー」の[設定を編集]を選択
(3) エクスプローラーが開いてclaude_desktop_config.jsonが選択されているのでそのファイルを編集
(4) Claude Desktopを一旦終了

claude_desktop_config.json

{
  "mcpServers": {
    "economic-news": {
      "command": "C:\\Python313\\python.exe",
      "args": ["C:\\work\\project\\news-mcp-server\\run_server.py"],
      "cwd": "C:\\work\\project\\news-mcp-server",
      "env": {
        "PYTHONIOENCODING": "utf-8",
        "PYTHONPATH": "C:\\work\\project\\news-mcp-server\\src"
      }
    }
  }
}

セットアップ手順

  1. 依存関係のインストール
    Powershell上で作業
pip install -r requirements.txt
  1. Claude Desktop設定
    上記claude_desktop_config.jsonの編集後終了したClaude Desktopを再起動

  2. MCPサーバー実行
    Powershell上で作業

cd C:\work\project\news-mcp-server\
python run_server.py

使用方法

Claude Desktopのチャットコマンドラインから質問を入力します。

claude_dsk_01.png

Claude Desktopから入力する質問の例:

  • "今日の経済ニュースのトップは?"
  • "最新のビジネスニュースを教えて"
  • "重要な経済ニュース3本をサマリー付きで"

経済ニュース取得ツールの使用(MCPサーバーへのアクセス)の許可を問い合わせてきますので、「常に許可する」「一度だけ許可」のいずれかを選択します。

claude_dsk_02.png

MCPサーバーが経済ニュースを取得し、Claudeが判断ニュースを選出し、サマリーをClaude Desktop画面に表示します。

claude_dsk_03.png

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?