0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Claude Codeで速攻開発するリアルタイムOSINTダッシュボード:Pythonだけでニュース・CVE・CISA KEVを収集、スコアリング、監視する

0
Posted at

Claude Codeで速攻開発するリアルタイムOSINTダッシュボード:Pythonだけでニュース・CVE・CISA KEVを収集、スコアリング、監視する

この記事で作るもの

この記事では、Pythonだけで動く リアルタイムOSINTダッシュボード を作ります。

対象は、合法・公開情報のみです。

  • RSSニュース
  • GDELTグローバルニュース
  • CISA KEV:実際に悪用確認済み脆弱性
  • NVD CVE:CVE脆弱性情報
  • 任意の公開RSS
  • 自社・顧客・業界キーワードによる監視
  • スコアリング
  • SQLite保存
  • FastAPIダッシュボード
  • API出力
  • Claude Codeで一気に生成・改修しやすい構造

完成すると、ローカルでこういう画面が動きます。

http://127.0.0.1:8000

[OSINT Lite]
- High priority items
- CISA KEV
- Recent CVEs
- GDELT news
- RSS news
- Keyword hits
- Source / Published time / Score / Tags

この記事は、単なるスクレイピング入門ではありません。

公開情報を、意思決定に使えるインテリジェンスへ変換するための最小実装です。

重要:この記事の安全・倫理スコープ

この記事のOSINTシステムは、以下に限定します。

  • 公開API
  • 公開RSS
  • 公開ニュース
  • 公開脆弱性情報
  • 自社・業界・技術・制度・サプライチェーン監視
  • 防御・リスク管理・調査メモ作成

やらないことは明確にします。

  • 認証回避
  • 非公開情報の取得
  • 個人の追跡・晒し・ドキシング
  • アカウント乗っ取り
  • 侵入・スキャン・悪用コード実行
  • 利用規約に反する大量取得
  • CAPTCHA回避
  • 盗まれた認証情報の収集
  • 攻撃対象選定の自動化

OSINTは強力です。
強力だからこそ、合法性・目的・対象・保存期間・アクセス権限を最初に決めるべきです。

この記事では、企業の防御、事業開発、リスク監視、セキュリティ運用、技術調査、エンタープライズSaaSのセキュリティ強化を目的にします。


なぜ今、軽量OSINTシステムが必要なのか

OSINTというと、MISPやOpenCTIのような本格CTI基盤を思い浮かべる人も多いと思います。

それは正しいです。

MISPは脅威情報・IOC・イベントを構造化し、共有・相関・エクスポートするためのOSSです。
OpenCTIは脅威情報、観測可能データ、脆弱性、アセット、レポートなどをナレッジグラフとして扱うOSSです。

ただし、いきなりMISPやOpenCTIを導入するのは重い場合があります。

特に次のような段階では、まず軽量な自前OSINTレイヤーがあると強いです。

  • 経営・事業開発向けに業界ニュースを監視したい
  • 自社SaaSの利用技術に関係するCVEだけ追いたい
  • CISA KEVに入った脆弱性を即検知したい
  • 顧客業界の制度・事故・サイバー動向を見たい
  • Claude Codeで短期間にプロトタイプしたい
  • OpenCTI/MISP導入前に情報源とワークフローを整理したい
  • Slack通知やレポート生成に後でつなげたい

つまり、この記事で作るものは、MISP/OpenCTIの代替ではありません。

MISP/OpenCTIに入れる前の軽量インテリジェンス収集・前処理レイヤーです。


全体アーキテクチャ

今回作る構成はこれです。

公開情報ソース
  ├─ RSS
  ├─ GDELT DOC API
  ├─ CISA KEV JSON
  └─ NVD CVE API
        ↓
Collector
        ↓
Normalizer
        ↓
Scoring / Tagging
        ↓
SQLite
        ↓
FastAPI
  ├─ Web Dashboard
  ├─ JSON API
  └─ Manual refresh endpoint

最小構成ですが、重要な設計思想を入れます。

設計要素 目的
source分離 情報源を後で増やせる
normalize RSS/CVE/ニュースを同じItem型にする
dedup URLやCVE IDで重複排除
scoring 重要度を自動でざっくり評価
tagging vulnerability / energy / ai / policyなど分類
SQLite ローカルで即動く
FastAPI API化・UI化しやすい
Claude Code向け構造 ファイル分割・改修しやすい

どんな用途に使えるか

このシステムは、セキュリティだけでなく、事業開発にも使えます。

セキュリティ用途

  • CISA KEV追加監視
  • NVD CVE監視
  • 自社利用技術名の脆弱性監視
  • ランサムウェア関連ニュース監視
  • クラウド障害・脆弱性ニュース監視
  • 取引先SaaS・OSS・クラウドのリスク監視

事業開発用途

  • エネルギー政策ニュース監視
  • 再エネ・蓄電池・EV・V2H関連ニュース監視
  • 競合・パートナー企業名監視
  • 補助金・制度変更監視
  • AIエージェント・SaaS・APIトレンド監視
  • 顧客業界の事故・法規制・調達ニュース監視

SaaS運用用途

  • セキュリティチェックシート回答の根拠収集
  • プレスリリース・ブログネタ収集
  • 営業向け業界アラート
  • CS向け障害・脆弱性アラート
  • 社内デイリーブリーフの自動生成

OSINTは、セキュリティ部門だけのものではありません。
事業開発、マーケ、CS、PdM、SRE、情シスが同じ情報面を見るための基盤になります。


海外事例から見る設計の方向性

本格的なOSINT/CTI基盤には、すでに強いOSSがあります。

MISP

MISPは、脅威情報やIOCを構造化し、共有・相関・エクスポートできるOSSです。

  • IOC管理
  • 脅威情報共有
  • タグ・分類
  • MISP taxonomies
  • STIX/OpenIOC等へのエクスポート
  • IDS/SIEM連携

公式サイト:
https://www.misp.software/

OpenCTI

OpenCTIは、脅威情報をナレッジグラフとして扱うOSSです。

  • Threat actor
  • Intrusion set
  • Indicator
  • Vulnerability
  • Asset
  • Report
  • Relationship
  • STIX 2.1的な構造化
  • コネクタによる外部情報取り込み
  • ダッシュボード

公式ドキュメント:
https://docs.opencti.io/latest/

OWASP SocialOSINTAgent / SocialOSINTLM

OWASPにも、公開ソーシャルデータを公式API経由で収集し、LLM分析に使うSocial OSINT系プロジェクトがあります。

公式ページ:
https://owasp.org/www-project-social-osint-agent/
https://owasp.org/www-project-social-osintlm/

ただし、本記事では人物追跡系のソーシャルOSINTは扱いません。

企業・業界・技術・脆弱性・政策監視に寄せます。


今回の実装方針

今回は、あえて軽量にします。

項目 方針
言語 Python
Web FastAPI
DB SQLite
収集 httpx / feedparser
スケジューラ APScheduler
設定 YAML
HTML Jinja2
フロント 最小HTMLのみ
LLM なし。後でClaude API等に接続可能
外部CTI基盤 なし。後でMISP/OpenCTIへ拡張可能

最初から凝りすぎません。

「動く最小OSINT基盤」を作り、その後Claude Codeで拡張するのが目的です。


完成ディレクトリ

osint-lite/
  app/
    __init__.py
    main.py
    config.py
    db.py
    models.py
    sources.py
    scoring.py
    collector.py
    templates/
      index.html
      item.html
  config/
    sources.yaml
  data/
    osint.sqlite3
  requirements.txt
  README.md

セットアップ

Python 3.11以上を想定します。

mkdir osint-lite
cd osint-lite

python -m venv .venv
source .venv/bin/activate  # Windowsなら .venv\Scripts\activate

mkdir -p app/templates config data
touch app/__init__.py

requirements.txt を作ります。

fastapi==0.115.6
uvicorn[standard]==0.34.0
httpx==0.28.1
feedparser==6.0.11
APScheduler==3.10.4
PyYAML==6.0.2
Jinja2==3.1.5
python-dateutil==2.9.0.post0

インストールします。

pip install -r requirements.txt

config/sources.yaml

監視キーワードとRSSフィードを設定します。

watch_keywords:
  - "AWS Lambda"
  - "Amazon API Gateway"
  - "DynamoDB"
  - "AWS WAF"
  - "CVE"
  - "ransomware"
  - "zero day"
  - "OpenAI"
  - "Claude Code"
  - "solar"
  - "battery storage"
  - "renewable energy"
  - "V2H"
  - "PPA"
  - "Japan energy"

high_priority_keywords:
  - "actively exploited"
  - "known exploited"
  - "critical vulnerability"
  - "ransomware"
  - "zero-day"
  - "remote code execution"
  - "authentication bypass"
  - "data breach"
  - "emergency directive"
  - "CISA KEV"

rss_feeds:
  - name: "CISA News"
    url: "https://www.cisa.gov/news.xml"
    category: "security"

  - name: "AWS Security Blog"
    url: "https://aws.amazon.com/blogs/security/feed/"
    category: "cloud_security"

  - name: "AWS News Blog"
    url: "https://aws.amazon.com/blogs/aws/feed/"
    category: "cloud"

  - name: "The Hacker News"
    url: "https://feeds.feedburner.com/TheHackersNews"
    category: "security_news"

gdelt_queries:
  - name: "Cybersecurity"
    query: '(cybersecurity OR ransomware OR "zero day" OR "data breach")'
    category: "security_news"

  - name: "AI Agents"
    query: '("AI agent" OR "Claude Code" OR "OpenAI API" OR "model context protocol")'
    category: "ai"

  - name: "Energy Storage"
    query: '("battery storage" OR "solar power" OR "renewable energy" OR "virtual power plant")'
    category: "energy"

nvd:
  enabled: true
  keywords:
    - "lambda"
    - "dynamodb"
    - "api gateway"
    - "wordpress"
    - "linux"
    - "openssl"
    - "apache"
    - "nginx"
    - "node.js"
    - "python"

cisa_kev:
  enabled: true

app/models.py

OSINTアイテムの共通モデルです。

from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass(frozen=True)
class OsintItem:
    source: str
    source_type: str
    title: str
    url: str
    summary: str
    published_at: Optional[datetime]
    category: str
    external_id: Optional[str] = None
    severity: Optional[str] = None
    score: int = 0
    tags: tuple[str, ...] = ()

app/config.py

YAML設定を読み込みます。

from pathlib import Path
from typing import Any

import yaml


BASE_DIR = Path(__file__).resolve().parent.parent
CONFIG_PATH = BASE_DIR / "config" / "sources.yaml"
DATA_DIR = BASE_DIR / "data"
DB_PATH = DATA_DIR / "osint.sqlite3"


def load_config() -> dict[str, Any]:
    if not CONFIG_PATH.exists():
        raise FileNotFoundError(f"Config not found: {CONFIG_PATH}")

    with CONFIG_PATH.open("r", encoding="utf-8") as f:
        return yaml.safe_load(f)

app/db.py

SQLiteを直接使います。
SQLAlchemyを使ってもよいですが、今回はコピペで動く軽量実装にします。

import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any

from app.config import DB_PATH, DATA_DIR
from app.models import OsintItem


def get_conn() -> sqlite3.Connection:
    DATA_DIR.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn


def init_db() -> None:
    with get_conn() as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                source TEXT NOT NULL,
                source_type TEXT NOT NULL,
                title TEXT NOT NULL,
                url TEXT NOT NULL UNIQUE,
                summary TEXT NOT NULL,
                published_at TEXT,
                category TEXT NOT NULL,
                external_id TEXT,
                severity TEXT,
                score INTEGER NOT NULL DEFAULT 0,
                tags TEXT NOT NULL DEFAULT '',
                first_seen_at TEXT NOT NULL,
                last_seen_at TEXT NOT NULL
            )
            """
        )
        conn.execute("CREATE INDEX IF NOT EXISTS idx_items_score ON items(score DESC)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_items_published ON items(published_at DESC)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_items_category ON items(category)")
        conn.execute("CREATE INDEX IF NOT EXISTS idx_items_source_type ON items(source_type)")
        conn.commit()


def upsert_item(item: OsintItem) -> bool:
    now = datetime.utcnow().isoformat()

    tags = ",".join(item.tags)
    published = item.published_at.isoformat() if item.published_at else None

    with get_conn() as conn:
        existing = conn.execute(
            "SELECT id FROM items WHERE url = ?",
            (item.url,),
        ).fetchone()

        if existing:
            conn.execute(
                """
                UPDATE items
                SET score = ?,
                    tags = ?,
                    last_seen_at = ?,
                    summary = ?,
                    severity = ?
                WHERE url = ?
                """,
                (item.score, tags, now, item.summary, item.severity, item.url),
            )
            conn.commit()
            return False

        conn.execute(
            """
            INSERT INTO items (
                source,
                source_type,
                title,
                url,
                summary,
                published_at,
                category,
                external_id,
                severity,
                score,
                tags,
                first_seen_at,
                last_seen_at
            )
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
            (
                item.source,
                item.source_type,
                item.title,
                item.url,
                item.summary,
                published,
                item.category,
                item.external_id,
                item.severity,
                item.score,
                tags,
                now,
                now,
            ),
        )
        conn.commit()
        return True


def list_items(
    limit: int = 100,
    category: str | None = None,
    source_type: str | None = None,
    min_score: int | None = None,
    q: str | None = None,
) -> list[dict[str, Any]]:
    sql = "SELECT * FROM items WHERE 1=1"
    params: list[Any] = []

    if category:
        sql += " AND category = ?"
        params.append(category)

    if source_type:
        sql += " AND source_type = ?"
        params.append(source_type)

    if min_score is not None:
        sql += " AND score >= ?"
        params.append(min_score)

    if q:
        sql += " AND (title LIKE ? OR summary LIKE ? OR tags LIKE ?)"
        like = f"%{q}%"
        params.extend([like, like, like])

    sql += " ORDER BY score DESC, COALESCE(published_at, last_seen_at) DESC LIMIT ?"
    params.append(limit)

    with get_conn() as conn:
        rows = conn.execute(sql, params).fetchall()
        return [dict(row) for row in rows]


def get_item(item_id: int) -> dict[str, Any] | None:
    with get_conn() as conn:
        row = conn.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
        return dict(row) if row else None


def stats() -> dict[str, Any]:
    with get_conn() as conn:
        total = conn.execute("SELECT COUNT(*) AS c FROM items").fetchone()["c"]
        high = conn.execute("SELECT COUNT(*) AS c FROM items WHERE score >= 80").fetchone()["c"]
        by_source = conn.execute(
            """
            SELECT source_type, COUNT(*) AS c
            FROM items
            GROUP BY source_type
            ORDER BY c DESC
            """
        ).fetchall()
        by_category = conn.execute(
            """
            SELECT category, COUNT(*) AS c
            FROM items
            GROUP BY category
            ORDER BY c DESC
            """
        ).fetchall()

    return {
        "total": total,
        "high": high,
        "by_source": [dict(r) for r in by_source],
        "by_category": [dict(r) for r in by_category],
    }

app/scoring.py

単純なルールベースでスコアリングします。

本番ではMLやLLM分類を追加できますが、最初はルールで十分です。

import re
from app.models import OsintItem


DEFAULT_TAG_RULES = {
    "security": [
        "cve",
        "vulnerability",
        "ransomware",
        "malware",
        "zero day",
        "zero-day",
        "exploit",
        "phishing",
        "breach",
        "incident",
        "authentication bypass",
        "remote code execution",
    ],
    "cloud": [
        "aws",
        "lambda",
        "api gateway",
        "dynamodb",
        "cloudwatch",
        "waf",
        "iam",
        "s3",
    ],
    "ai": [
        "openai",
        "claude",
        "anthropic",
        "ai agent",
        "llm",
        "model context protocol",
        "mcp",
    ],
    "energy": [
        "solar",
        "battery",
        "renewable",
        "vpp",
        "ppa",
        "ev",
        "v2h",
        "grid",
        "energy storage",
    ],
    "policy": [
        "regulation",
        "policy",
        "government",
        "ministry",
        "subsidy",
        "compliance",
        "directive",
    ],
}


def normalize_text(value: str) -> str:
    return re.sub(r"\s+", " ", value or "").strip().lower()


def score_item(
    title: str,
    summary: str,
    category: str,
    watch_keywords: list[str],
    high_priority_keywords: list[str],
    severity: str | None = None,
) -> tuple[int, tuple[str, ...]]:
    text = normalize_text(f"{title} {summary} {category}")
    score = 10
    tags: set[str] = set()

    for keyword in watch_keywords:
        if normalize_text(keyword) in text:
            score += 10
            tags.add(keyword.lower().replace(" ", "_"))

    for keyword in high_priority_keywords:
        if normalize_text(keyword) in text:
            score += 25
            tags.add("high_priority")

    for tag, words in DEFAULT_TAG_RULES.items():
        if any(w in text for w in words):
            tags.add(tag)
            score += 8

    if severity:
        sev = severity.lower()
        if sev in {"critical", "high"}:
            score += 35
            tags.add(sev)
        elif sev == "medium":
            score += 15
            tags.add(sev)

    if "cisa kev" in text or "known exploited" in text:
        score += 40
        tags.add("kev")

    if "actively exploited" in text:
        score += 40
        tags.add("actively_exploited")

    return min(score, 100), tuple(sorted(tags))


def enrich_item(
    item: OsintItem,
    watch_keywords: list[str],
    high_priority_keywords: list[str],
) -> OsintItem:
    score, tags = score_item(
        title=item.title,
        summary=item.summary,
        category=item.category,
        watch_keywords=watch_keywords,
        high_priority_keywords=high_priority_keywords,
        severity=item.severity,
    )

    return OsintItem(
        source=item.source,
        source_type=item.source_type,
        title=item.title,
        url=item.url,
        summary=item.summary,
        published_at=item.published_at,
        category=item.category,
        external_id=item.external_id,
        severity=item.severity,
        score=score,
        tags=tags,
    )

app/sources.py

情報源ごとの収集処理です。

from __future__ import annotations

from datetime import datetime, timedelta, timezone
from typing import Any
from urllib.parse import urlencode

import feedparser
import httpx
from dateutil import parser as dtparser

from app.models import OsintItem


USER_AGENT = "osint-lite/0.1 defensive-research"


def parse_datetime(value: Any) -> datetime | None:
    if not value:
        return None

    try:
        if isinstance(value, datetime):
            return value

        return dtparser.parse(str(value))
    except Exception:
        return None


async def fetch_rss_feed(feed: dict[str, Any]) -> list[OsintItem]:
    name = feed["name"]
    url = feed["url"]
    category = feed.get("category", "rss")

    async with httpx.AsyncClient(timeout=20, headers={"User-Agent": USER_AGENT}) as client:
        resp = await client.get(url)
        resp.raise_for_status()

    parsed = feedparser.parse(resp.text)
    items: list[OsintItem] = []

    for entry in parsed.entries[:50]:
        title = getattr(entry, "title", "").strip()
        link = getattr(entry, "link", "").strip()

        if not title or not link:
            continue

        summary = getattr(entry, "summary", "") or getattr(entry, "description", "")
        published = (
            parse_datetime(getattr(entry, "published", None))
            or parse_datetime(getattr(entry, "updated", None))
        )

        items.append(
            OsintItem(
                source=name,
                source_type="rss",
                title=title,
                url=link,
                summary=strip_html(summary)[:1000],
                published_at=published,
                category=category,
            )
        )

    return items


async def fetch_gdelt_query(query_conf: dict[str, Any]) -> list[OsintItem]:
    name = query_conf["name"]
    query = query_conf["query"]
    category = query_conf.get("category", "gdelt")

    params = {
        "query": query,
        "mode": "artlist",
        "format": "json",
        "maxrecords": 50,
        "sort": "datedesc",
        "timespan": "24h",
    }

    url = "https://api.gdeltproject.org/api/v2/doc/doc?" + urlencode(params)

    async with httpx.AsyncClient(timeout=30, headers={"User-Agent": USER_AGENT}) as client:
        resp = await client.get(url)
        resp.raise_for_status()
        data = resp.json()

    items: list[OsintItem] = []

    for article in data.get("articles", []):
        title = article.get("title") or article.get("seendate") or "GDELT article"
        link = article.get("url")
        if not link:
            continue

        published = parse_datetime(article.get("seendate"))

        domain = article.get("domain", "")
        lang = article.get("language", "")
        source_country = article.get("sourcecountry", "")

        summary = f"domain={domain} language={lang} source_country={source_country}"

        items.append(
            OsintItem(
                source=f"GDELT:{name}",
                source_type="gdelt",
                title=title,
                url=link,
                summary=summary,
                published_at=published,
                category=category,
            )
        )

    return items


async def fetch_cisa_kev() -> list[OsintItem]:
    url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"

    async with httpx.AsyncClient(timeout=30, headers={"User-Agent": USER_AGENT}) as client:
        resp = await client.get(url)
        resp.raise_for_status()
        data = resp.json()

    items: list[OsintItem] = []

    for vuln in data.get("vulnerabilities", [])[:2000]:
        cve_id = vuln.get("cveID")
        vendor = vuln.get("vendorProject", "")
        product = vuln.get("product", "")
        name = vuln.get("vulnerabilityName", "")
        date_added = vuln.get("dateAdded")
        due_date = vuln.get("dueDate")
        known_ransomware = vuln.get("knownRansomwareCampaignUse", "Unknown")
        required_action = vuln.get("requiredAction", "")

        if not cve_id:
            continue

        title = f"{cve_id}: {vendor} {product} - {name}"
        summary = (
            f"CISA KEV date_added={date_added} due_date={due_date} "
            f"ransomware={known_ransomware}. Action: {required_action}"
        )

        items.append(
            OsintItem(
                source="CISA KEV",
                source_type="cisa_kev",
                title=title,
                url=f"https://www.cisa.gov/known-exploited-vulnerabilities-catalog?search_api_fulltext={cve_id}",
                summary=summary,
                published_at=parse_datetime(date_added),
                category="vulnerability",
                external_id=cve_id,
                severity="high",
            )
        )

    return items


async def fetch_nvd_recent(keyword: str) -> list[OsintItem]:
    end = datetime.now(timezone.utc)
    start = end - timedelta(days=7)

    params = {
        "keywordSearch": keyword,
        "pubStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
        "pubEndDate": end.strftime("%Y-%m-%dT%H:%M:%S.000Z"),
        "resultsPerPage": 20,
    }

    url = "https://services.nvd.nist.gov/rest/json/cves/2.0?" + urlencode(params)

    async with httpx.AsyncClient(timeout=30, headers={"User-Agent": USER_AGENT}) as client:
        resp = await client.get(url)
        resp.raise_for_status()
        data = resp.json()

    items: list[OsintItem] = []

    for row in data.get("vulnerabilities", []):
        cve = row.get("cve", {})
        cve_id = cve.get("id")
        if not cve_id:
            continue

        descriptions = cve.get("descriptions", [])
        desc = next((d["value"] for d in descriptions if d.get("lang") == "en"), "")

        metrics = cve.get("metrics", {})
        severity = extract_cvss_severity(metrics)

        published = parse_datetime(cve.get("published"))

        items.append(
            OsintItem(
                source=f"NVD:{keyword}",
                source_type="nvd",
                title=f"{cve_id}: {desc[:160]}",
                url=f"https://nvd.nist.gov/vuln/detail/{cve_id}",
                summary=desc[:1200],
                published_at=published,
                category="vulnerability",
                external_id=cve_id,
                severity=severity,
            )
        )

    return items


def extract_cvss_severity(metrics: dict[str, Any]) -> str | None:
    for key in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
        values = metrics.get(key)
        if values and isinstance(values, list):
            severity = values[0].get("cvssData", {}).get("baseSeverity") or values[0].get("baseSeverity")
            if severity:
                return str(severity).lower()
    return None


def strip_html(value: str) -> str:
    import re

    value = re.sub(r"<[^>]+>", " ", value or "")
    value = re.sub(r"\s+", " ", value).strip()
    return value

app/collector.py

全ソースから収集し、スコアリングして保存します。

import asyncio
from typing import Any

from app.db import upsert_item
from app.scoring import enrich_item
from app.sources import (
    fetch_cisa_kev,
    fetch_gdelt_query,
    fetch_nvd_recent,
    fetch_rss_feed,
)


async def collect_all(config: dict[str, Any]) -> dict[str, int]:
    watch_keywords = config.get("watch_keywords", [])
    high_priority_keywords = config.get("high_priority_keywords", [])

    tasks = []

    for feed in config.get("rss_feeds", []):
        tasks.append(fetch_rss_feed(feed))

    for query_conf in config.get("gdelt_queries", []):
        tasks.append(fetch_gdelt_query(query_conf))

    if config.get("cisa_kev", {}).get("enabled", False):
        tasks.append(fetch_cisa_kev())

    if config.get("nvd", {}).get("enabled", False):
        for keyword in config.get("nvd", {}).get("keywords", []):
            tasks.append(fetch_nvd_recent(keyword))

    results = await asyncio.gather(*tasks, return_exceptions=True)

    inserted = 0
    updated = 0
    errors = 0

    for result in results:
        if isinstance(result, Exception):
            print(f"[collector] error: {result}")
            errors += 1
            continue

        for item in result:
            enriched = enrich_item(
                item=item,
                watch_keywords=watch_keywords,
                high_priority_keywords=high_priority_keywords,
            )
            is_inserted = upsert_item(enriched)
            if is_inserted:
                inserted += 1
            else:
                updated += 1

    return {
        "inserted": inserted,
        "updated": updated,
        "errors": errors,
    }


def collect_all_sync(config: dict[str, Any]) -> dict[str, int]:
    return asyncio.run(collect_all(config))

app/templates/index.html

UIです。
見た目は最低限ですが、Qiita記事用に分かりやすくします。

<!doctype html>
<html lang="ja">
<head>
  <meta charset="utf-8">
  <title>OSINT Lite</title>
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <meta http-equiv="refresh" content="60">
  <style>
    body {
      font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
      margin: 32px;
      background: #f8fafc;
      color: #0f172a;
    }
    a { color: #0f766e; text-decoration: none; }
    a:hover { text-decoration: underline; }
    .header {
      display: flex;
      justify-content: space-between;
      gap: 16px;
      align-items: center;
      margin-bottom: 24px;
    }
    .title { font-size: 28px; font-weight: 800; }
    .subtitle { color: #475569; margin-top: 4px; }
    .stats {
      display: flex;
      gap: 12px;
      flex-wrap: wrap;
      margin-bottom: 20px;
    }
    .card {
      background: white;
      border: 1px solid #e2e8f0;
      border-radius: 14px;
      padding: 14px 16px;
      min-width: 140px;
      box-shadow: 0 1px 2px rgba(15, 23, 42, 0.04);
    }
    .card .num {
      font-size: 24px;
      font-weight: 800;
    }
    .filters {
      background: white;
      padding: 16px;
      border: 1px solid #e2e8f0;
      border-radius: 14px;
      margin-bottom: 20px;
    }
    input, select, button {
      padding: 9px 10px;
      border: 1px solid #cbd5e1;
      border-radius: 8px;
      margin: 4px;
    }
    button {
      background: #0f766e;
      color: white;
      cursor: pointer;
      border: none;
    }
    .item {
      background: white;
      border: 1px solid #e2e8f0;
      border-left: 6px solid #94a3b8;
      border-radius: 14px;
      padding: 16px;
      margin-bottom: 12px;
    }
    .item.high { border-left-color: #dc2626; }
    .item.medium { border-left-color: #f97316; }
    .item.low { border-left-color: #0f766e; }
    .item-title {
      font-size: 17px;
      font-weight: 750;
      margin-bottom: 8px;
    }
    .meta {
      color: #64748b;
      font-size: 13px;
      margin-bottom: 8px;
    }
    .summary {
      color: #334155;
      line-height: 1.55;
    }
    .tags {
      margin-top: 10px;
    }
    .tag {
      display: inline-block;
      font-size: 12px;
      padding: 3px 8px;
      border-radius: 999px;
      background: #ccfbf1;
      color: #134e4a;
      margin-right: 4px;
      margin-top: 4px;
    }
    .score {
      font-weight: 800;
      color: #0f172a;
    }
    .footer {
      margin-top: 28px;
      color: #64748b;
      font-size: 12px;
    }
  </style>
</head>
<body>
  <div class="header">
    <div>
      <div class="title">OSINT Lite</div>
      <div class="subtitle">Public-source monitoring for defensive intelligence</div>
    </div>
    <form action="/refresh" method="post">
      <button type="submit">Manual refresh</button>
    </form>
  </div>

  <div class="stats">
    <div class="card">
      <div>Total</div>
      <div class="num">{{ stats.total }}</div>
    </div>
    <div class="card">
      <div>High score</div>
      <div class="num">{{ stats.high }}</div>
    </div>
    {% for row in stats.by_source %}
    <div class="card">
      <div>{{ row.source_type }}</div>
      <div class="num">{{ row.c }}</div>
    </div>
    {% endfor %}
  </div>

  <div class="filters">
    <form method="get" action="/">
      <input name="q" placeholder="keyword" value="{{ q or '' }}">
      <select name="source_type">
        <option value="">all sources</option>
        <option value="rss" {% if source_type == 'rss' %}selected{% endif %}>rss</option>
        <option value="gdelt" {% if source_type == 'gdelt' %}selected{% endif %}>gdelt</option>
        <option value="cisa_kev" {% if source_type == 'cisa_kev' %}selected{% endif %}>cisa_kev</option>
        <option value="nvd" {% if source_type == 'nvd' %}selected{% endif %}>nvd</option>
      </select>
      <select name="min_score">
        <option value="">all scores</option>
        <option value="50" {% if min_score == 50 %}selected{% endif %}>score >= 50</option>
        <option value="80" {% if min_score == 80 %}selected{% endif %}>score >= 80</option>
      </select>
      <button type="submit">Filter</button>
    </form>
  </div>

  {% for item in items %}
    {% set level = 'high' if item.score >= 80 else 'medium' if item.score >= 50 else 'low' %}
    <div class="item {{ level }}">
      <div class="item-title">
        <a href="{{ item.url }}" target="_blank" rel="noopener noreferrer">{{ item.title }}</a>
      </div>
      <div class="meta">
        <span class="score">score {{ item.score }}</span>
        / {{ item.source_type }}
        / {{ item.source }}
        / {{ item.category }}
        {% if item.severity %}/ severity={{ item.severity }}{% endif %}
        {% if item.published_at %}/ {{ item.published_at }}{% endif %}
      </div>
      <div class="summary">{{ item.summary }}</div>
      <div class="tags">
        {% for tag in item.tags.split(',') if tag %}
          <span class="tag">{{ tag }}</span>
        {% endfor %}
      </div>
    </div>
  {% endfor %}

  <div class="footer">
    Auto refreshes every 60 seconds. Data sources are public APIs and RSS feeds only.
  </div>
</body>
</html>

app/templates/item.html

詳細ページ用です。

<!doctype html>
<html lang="ja">
<head>
  <meta charset="utf-8">
  <title>{{ item.title }}</title>
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <style>
    body {
      font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
      margin: 32px;
      background: #f8fafc;
      color: #0f172a;
      line-height: 1.7;
    }
    a { color: #0f766e; }
    .box {
      background: white;
      border: 1px solid #e2e8f0;
      border-radius: 16px;
      padding: 24px;
    }
    .meta {
      color: #64748b;
      margin-bottom: 16px;
    }
    .score {
      font-size: 28px;
      font-weight: 800;
    }
  </style>
</head>
<body>
  <p><a href="/">← Back</a></p>
  <div class="box">
    <h1>{{ item.title }}</h1>
    <p class="meta">
      {{ item.source_type }} / {{ item.source }} / {{ item.category }}
    </p>
    <p class="score">Score: {{ item.score }}</p>
    <p>{{ item.summary }}</p>
    <p>
      <a href="{{ item.url }}" target="_blank" rel="noopener noreferrer">
        Open original source
      </a>
    </p>
    <pre>{{ item }}</pre>
  </div>
</body>
</html>

app/main.py

FastAPI本体です。
APSchedulerで定期収集します。

from __future__ import annotations

from typing import Optional

from apscheduler.schedulers.background import BackgroundScheduler
from fastapi import FastAPI, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates

from app.collector import collect_all_sync
from app.config import load_config
from app.db import get_item, init_db, list_items, stats


app = FastAPI(title="OSINT Lite", version="0.1.0")
templates = Jinja2Templates(directory="app/templates")

config = load_config()
scheduler = BackgroundScheduler()


@app.on_event("startup")
def on_startup() -> None:
    init_db()

    # 起動時に1回収集
    try:
        result = collect_all_sync(config)
        print(f"[startup collect] {result}")
    except Exception as e:
        print(f"[startup collect] failed: {e}")

    # 10分ごとに収集
    scheduler.add_job(
        lambda: print(f"[scheduled collect] {collect_all_sync(config)}"),
        "interval",
        minutes=10,
        id="collect_all",
        replace_existing=True,
    )
    scheduler.start()


@app.on_event("shutdown")
def on_shutdown() -> None:
    scheduler.shutdown(wait=False)


@app.get("/", response_class=HTMLResponse)
def index(
    request: Request,
    q: Optional[str] = Query(default=None),
    source_type: Optional[str] = Query(default=None),
    min_score: Optional[int] = Query(default=None),
):
    items = list_items(
        limit=200,
        q=q,
        source_type=source_type,
        min_score=min_score,
    )

    return templates.TemplateResponse(
        "index.html",
        {
            "request": request,
            "items": items,
            "stats": stats(),
            "q": q,
            "source_type": source_type,
            "min_score": min_score,
        },
    )


@app.get("/items/{item_id}", response_class=HTMLResponse)
def item_detail(request: Request, item_id: int):
    item = get_item(item_id)
    if not item:
        return HTMLResponse("Not found", status_code=404)

    return templates.TemplateResponse(
        "item.html",
        {
            "request": request,
            "item": item,
        },
    )


@app.get("/api/items")
def api_items(
    q: Optional[str] = Query(default=None),
    source_type: Optional[str] = Query(default=None),
    min_score: Optional[int] = Query(default=None),
    limit: int = Query(default=100, ge=1, le=500),
):
    return {
        "items": list_items(
            limit=limit,
            q=q,
            source_type=source_type,
            min_score=min_score,
        ),
        "stats": stats(),
    }


@app.post("/refresh")
def refresh():
    result = collect_all_sync(config)
    print(f"[manual refresh] {result}")
    return RedirectResponse("/", status_code=303)


@app.get("/health")
def health():
    return {"status": "ok"}

README.md

# OSINT Lite

Public-source monitoring dashboard for defensive intelligence.

## Scope

This tool only ingests public APIs and RSS feeds.

It does not perform scanning, credential collection, bypass, scraping of private content, or personal targeting.

## Setup

```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
uvicorn app.main:app --reload

Open:

http://127.0.0.1:8000

Sources

  • RSS feeds
  • GDELT DOC API
  • CISA KEV
  • NVD CVE API

API

GET /api/items
GET /api/items?min_score=80
GET /api/items?source_type=cisa_kev
GET /api/items?q=lambda
POST /refresh
GET /health

---

## 起動する

```bash
uvicorn app.main:app --reload

ブラウザで開きます。

http://127.0.0.1:8000

APIも見られます。

curl "http://127.0.0.1:8000/api/items?min_score=80" | jq

手動収集します。

curl -X POST "http://127.0.0.1:8000/refresh"

Claude Codeに投げるプロンプト

Claude Codeで一気に作るなら、以下を投げます。

あなたはPython/FastAPI/OSINT/CTI/セキュリティ運用に詳しいシニアエンジニアです。

以下の要件で、ローカルで動く軽量OSINTダッシュボードを実装してください。

目的:
- 公開情報のみを収集する防御・事業調査用OSINTシステム
- RSS, GDELT DOC API, CISA KEV JSON, NVD CVE APIを取り込む
- SQLiteに保存する
- 重複排除する
- キーワードと重要語でスコアリングする
- FastAPIでWebダッシュボードとJSON APIを提供する
- 10分ごとに定期収集する
- 手動refresh endpointを持つ

制約:
- Python 3.11+
- FastAPI
- httpx
- feedparser
- APScheduler
- PyYAML
- Jinja2
- SQLite標準ライブラリ
- 非公開情報の取得、認証回避、スキャン、個人追跡、攻撃用途の機能は禁止
- public API/RSSのみ
- コピペで動くこと

ディレクトリ:
osint-lite/
  app/
    __init__.py
    main.py
    config.py
    db.py
    models.py
    sources.py
    scoring.py
    collector.py
    templates/
      index.html
      item.html
  config/
    sources.yaml
  data/
  requirements.txt
  README.md

実装品質:
- sourceごとの処理を分離
- OsintItemに正規化
- URLで重複排除
- score/tagsを付与
- エラーで全体停止しない
- /api/items でJSON出力
- /health あり
- READMEに起動手順を書く

Claude Codeで追加開発すると強い機能

ここから先は拡張です。

1. Slack通知

高スコアだけSlackに通知します。

score >= 80
source_type in ["cisa_kev", "nvd"]
tags contains ["actively_exploited", "critical"]

通知文はこうです。

🚨 OSINT High Priority
CVE-xxxx-yyyy ...
score: 95
source: CISA KEV
url: ...

2. Claude要約

高スコア記事だけ、Claude APIに渡して要約します。

ただし、最初から記事全文を取りに行かない方が安全です。

まずはタイトル・summary・source・urlだけで十分です。

{
  "title": "...",
  "summary": "...",
  "source": "...",
  "url": "...",
  "why_it_matters": "...",
  "recommended_action": "..."
}

3. MISP/OpenCTIエクスポート

本格運用では、MISPやOpenCTIへ渡したくなります。

最初はJSON exportで十分です。

GET /api/items?min_score=80&source_type=nvd

その後、以下を検討します。

  • STIX 2.1風の構造化
  • MISP event export
  • OpenCTI connector
  • observable抽出
  • CVE ID / domain / IP / hash の抽出

ただし、この記事の範囲では、攻撃インフラ探索やスキャンはしません。

4. 監視対象キーワードをYAML化する

すでに config/sources.yaml にあります。

これをチームでGit管理すると強いです。

watch_keywords:
  - "customer company name"
  - "product name"
  - "AWS Lambda"
  - "DynamoDB"
  - "CVE"
  - "ransomware"

5. 情報源ごとの信頼度を持つ

全情報源を同じ重みで扱わない方がよいです。

source_weights:
  cisa_kev: 50
  nvd: 35
  gdelt: 10
  rss: 10

CISA KEVやNVDは高く、一般ニュースは低めにします。


この設計で一番大事なこと

OSINTシステムで一番大事なのは、たくさん集めることではありません。

意思決定に使える形に落とすことです。

公開情報は無限にあります。
集めるだけならノイズが増えます。

必要なのは、次の処理です。

収集
  ↓
正規化
  ↓
重複排除
  ↓
スコアリング
  ↓
タグ付け
  ↓
人間が判断できる形で提示
  ↓
必要なら通知・レポート化

この記事の実装は、この最小ループです。


運用時の注意点

1. API制限を守る

NVD APIやGDELTなど、公開APIには利用条件や負荷の配慮があります。

  • 短すぎる間隔で叩かない
  • 取得件数を制限する
  • User-Agentを入れる
  • キャッシュする
  • エラー時にリトライしすぎない

2. 個人情報を集めない

企業・技術・脆弱性・制度監視に寄せます。

人物の行動追跡や晒しはやらない。

3. “未確認”を区別する

OSINTは公開情報です。
公開情報は必ずしも正確とは限りません。

UIやレポートでは、次を区別します。

  • 公式情報
  • 一次情報
  • 報道
  • ブログ
  • SNS
  • 未確認情報

4. スコアは判断ではなく優先順位

score 100だから正しい、ではありません。

scoreは「見る順番」を決めるためのものです。

5. 保存期間を決める

何でも永久保存しない方がよいです。

  • ニュース:90日
  • CVE:長期保存可
  • KEV:長期保存可
  • 顧客名関連:社内ルールに従う

Qiitaで拡散されやすくするなら

この記事のようなOSINT記事は、以下を入れると刺さりやすいです。

  • コピペで動く
  • FastAPIで画面がある
  • SQLiteで軽い
  • CISA KEVとNVDを扱う
  • GDELTで海外ニュースも拾う
  • Claude Codeプロンプトがある
  • 安全・倫理スコープが明確
  • MISP/OpenCTIへの拡張余地がある
  • セキュリティだけでなく事業開発にも使える

特に重要なのは、攻撃ツールに見せないことです。

防御・リスク管理・事業インテリジェンスの文脈に置く。
これは技術的にも倫理的にも大事です。


まとめ

この記事では、Pythonだけで動く軽量リアルタイムOSINTシステムを作りました。

機能は以下です。

  • RSS収集
  • GDELTニュース収集
  • CISA KEV収集
  • NVD CVE収集
  • 共通モデルへの正規化
  • SQLite保存
  • URL重複排除
  • キーワードスコアリング
  • タグ付け
  • FastAPIダッシュボード
  • JSON API
  • 定期収集
  • Claude Code用プロンプト

このシステムは、MISPやOpenCTIの代替ではありません。

しかし、導入前の軽量レイヤーとしてはかなり使えます。

  • セキュリティ部門は脆弱性・KEV監視に使える
  • SREはクラウド障害・CVE監視に使える
  • 事業開発は業界ニュース監視に使える
  • PdMは競合・技術トレンド監視に使える
  • CSは顧客影響のあるリスク監視に使える
  • 経営は日次ブリーフの元データに使える

OSINTで大事なのは、情報量ではありません。

公開情報を、次の行動に変換することです。

その第一歩として、このくらい軽い仕組みから始めるのが現実的です。


参考リンク

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?