0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ベクターPDFのフロー図を「診断→ルート分岐→ノード/エッジ抽出」する(PyMuPDF + OpenCV)

0
Last updated at Posted at 2026-01-20

複雑なフロー図(ループ、開始点が複数、3分岐、注釈/装飾が混在)を PDFから機械的に構造化したい。
でも「PDFの見た目」と「内部表現」はズレがちなので、いきなりノード/エッジ抽出に突入すると失敗します。

この記事では、まず ページが何で構成されているかを診断し、その結果に応じて処理ルートを切り替える構成にします。

  • Route A: ベクター図形(drawings)が主 → drawingsからノード/エッジ抽出
  • Route B: 画像 or 注釈が主 → ページを画像化してCV/OCRで抽出(必要なら注釈も併用)
  • Route C: clip/不可視ノイズが主 → 先にノイズ除去してからRoute Aへ

0. 前提と注意

  • 100%自動復元は現実的に難しいです(PDFは表現が自由すぎる)
  • なので 自動抽出 + デバッグオーバレイ画像で検算 + 最小限の手補正が実務で強いです
  • ここでは「構造化の基礎」を作ります

1. 環境

必須

  • Python 3.10+
  • PyMuPDF
pip install pymupdf pillow numpy

Route B で推奨(画像ベース)

pip install opencv-python pytesseract
  • OCRを使うなら Tesseract本体が必要です(日本語モデルも)

    • macOS: brew install tesseract + 日本語言語データ
    • Linux: apt install tesseract-ocr tesseract-ocr-jpn

※ OCRが要らない(PDFテキストが取れる)なら pytesseract は無くても動きます。


2. 診断:このページは何で構成されてる?

**「見えるのにdrawingsが取れない」「見えないのにノードっぽい矩形が取れる」**の正体はだいたい以下です:

  • clip用パス(線も塗りも無しなのに矩形パスだけ存在)
  • 透明(opacity=0)や背景同色(白)で見えない
  • 画像として貼られている(フロー図が丸ごとラスター化)
  • 注釈(annotation)やリンク領域に情報が寄っている
  • 線幅0(hairline):見た目は線があるのに抽出で落とすと消える

まずは 統計を出して切り分けします。


2.1 診断コード(JSONレポート + オーバレイPDFも任意で出力)

pdf_flow_toolkit.py として保存して使います(この記事後半のRouteコードも同ファイルに入っています)

(このファイルの全文は記事末尾に載せています)

使い方:

python pdf_flow_toolkit.py diagnose input.pdf --outdir out --overlay

出力:

  • out/report.json(ページごとの統計)
  • out/overlay.pdf(デバッグ用:bbox重ね描き)

診断の見方(超ざっくり):

  • imagesの占有率が高い → Route B(画像ベース)
  • annots/linksが多い → Route B(注釈/リンクも確認)
  • non_painted_or_clipが多い → Route C(ノイズ除去してからA)
  • visible_candidateが多い & text_blocksあり → Route A(ベクターで攻める)

3. Route分岐:どのルートで抽出するか?

ここでは機械的に3ルートに分けます。

  • Route A(ベクター復元)

    • visible_candidate が十分
    • text_blocks が取れる
    • 画像占有が低い
  • Route B(画像/注釈復元)

    • image_area_ratio が高い、または drawings_total が少ないのに見た目はある
    • annots_countlinks_count が多い
  • Route C(ノイズ除去→A)

    • non_painted_or_cliplikely_invisible_* が多く、ノード検出がノイズまみれになる

4. 抽出:各Routeで「ノードJSON」「エッジJSON」「デバッグ画像」を出す

出力は3点セットで統一します。

  • nodes.json

    • id, type, bbox, text
  • edges.json

    • id, src, dst, points(折れ線)
  • debug.png

    • 抽出できたノードbboxとエッジを重ね描き(検算用)

Route A:drawings(ベクター)からノード/エッジ抽出

実行:

python pdf_flow_toolkit.py extract --route A input.pdf --outdir outA

Route B:画像/注釈が主なPDFを画像化して抽出(CV + OCR)

実行:

python pdf_flow_toolkit.py extract --route B input.pdf --outdir outB

Route C:clip/不可視ノイズを除去してからRoute A相当で抽出

実行:

python pdf_flow_toolkit.py extract --route C input.pdf --outdir outC

5. ソースコード全文(診断 + Route A/B/C)

1ファイル完結です。あとから関数単位で分割しやすいように構成しています。

# ここに pdf_flow_toolkit.py の全文を貼る(次のチャンク参照)

6. まとめ

  • PDFは「見た目」と「内部」がズレるので 診断→ルート分岐が効く
  • Route A/B/C のどれで攻めるべきかが分かれば、抽出精度と工数が跳ね上がる
  • 実務では デバッグオーバレイ画像が最強(誤抽出の原因が一瞬で分かる)

pdf_flow_toolkit.py 全文(診断 + Route A/B/C)

ここから先を、そのまま pdf_flow_toolkit.py として保存できます。
JSONとデバッグ画像が必ず出るようにしています。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
pdf_flow_toolkit.py

目的:
  - PDFページの「構成要素(text/drawings/images/annots)」を診断する
  - 診断結果に応じたRoute(A/B/C)でノード/エッジを抽出する
  - どのノード/エッジを取れたか分かるデバッグ画像を出す

Route:
  A: ベクター(drawings)主体 → drawings + PDF text を優先して抽出
  B: 画像/注釈主体 → ページレンダリング画像 + CV/OCR(必要なら注釈も併用)
  C: clip/不可視ノイズ主体 → まずノイズ除去してからA相当

依存:
  - 必須: pymupdf, pillow, numpy
  - RouteB推奨: opencv-python, pytesseract, (tesseract本体)
"""

from __future__ import annotations

import argparse
import json
import os
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple
from collections import Counter

import fitz  # PyMuPDF
import numpy as np
from PIL import Image, ImageDraw, ImageFont


# ============================================================
# 1) 設定(閾値・挙動をまとめておくと改修が楽)
# ============================================================

@dataclass
class Config:
    # --- 共通 ---
    render_zoom: float = 2.0  # デバッグ画像用のレンダリング倍率(大きいほど見やすいが重い)
    out_image_format: str = "png"

    # --- ノード候補のフィルタ ---
    node_min_area: float = 800.0      # 小さすぎる候補はノード扱いしない
    node_max_area_ratio: float = 0.6  # ページ面積の大半を占める矩形は背景/枠の可能性が高い

    # --- テキスト割当 ---
    text_inset: float = 2.0  # bbox内判定に少し余裕を持たせる(PDFのbboxズレ対策)

    # --- エッジ推定 ---
    snap_dist: float = 10.0  # 線端点をノードにスナップする距離閾値(PDF座標系)
    edge_min_len: float = 15.0  # 短すぎる線はノイズとして除外

    # --- Route C(ノイズ除去強化) ---
    ignore_non_painted: bool = True
    ignore_likely_invisible_white: bool = True
    ignore_tiny_drawings: bool = True

    # --- Route B(画像ベース) ---
    cv_min_contour_area: float = 900.0
    cv_max_contour_area_ratio: float = 0.6
    ocr_lang: str = "jpn"  # Tesseractの言語。環境により "jpn+eng" なども可


# ============================================================
# 2) データ構造(JSON出力の形を固定すると後工程が楽)
# ============================================================

@dataclass
class Node:
    id: str
    type: str              # "process" / "decision" / "terminator" / "unknown" など
    bbox: Tuple[float, float, float, float]  # (x0, y0, x1, y1) PDF座標
    text: str


@dataclass
class Edge:
    id: str
    src: str
    dst: str
    points: List[Tuple[float, float]]  # 折れ線(PDF座標)


# ============================================================
# 3) ユーティリティ
# ============================================================

def rect_area(r: fitz.Rect) -> float:
    return max(0.0, r.width) * max(0.0, r.height)

def safe_rect(x: Any) -> fitz.Rect:
    if isinstance(x, fitz.Rect):
        return x
    if isinstance(x, (list, tuple)) and len(x) == 4:
        return fitz.Rect(x)
    return fitz.Rect(0, 0, 0, 0)

def clamp(v: float, lo: float, hi: float) -> float:
    return max(lo, min(hi, v))

def is_near_white(color: Any, tol: float = 0.02) -> bool:
    if color is None:
        return False
    if isinstance(color, (int, float)):
        return abs(1.0 - float(color)) < tol
    if isinstance(color, (list, tuple)) and len(color) >= 3:
        try:
            return all(abs(1.0 - float(c)) < tol for c in color[:3])
        except Exception:
            return False
    return False

def ensure_dir(path: str) -> None:
    os.makedirs(path, exist_ok=True)

def dump_json(path: str, obj: Any) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)

def render_page_image(page: fitz.Page, zoom: float) -> Tuple[Image.Image, float]:
    """
    PyMuPDFのページを画像化して返す。
    返すscaleは「PDF座標 → 画像座標」への倍率(=zoom)。
    """
    mat = fitz.Matrix(zoom, zoom)
    pix = page.get_pixmap(matrix=mat, alpha=False)
    img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
    return img, zoom

def pdf_bbox_to_img(bbox: Tuple[float, float, float, float], scale: float) -> Tuple[int, int, int, int]:
    x0, y0, x1, y1 = bbox
    return int(x0 * scale), int(y0 * scale), int(x1 * scale), int(y1 * scale)

def dist_point_to_rect(p: Tuple[float, float], r: fitz.Rect) -> float:
    """
    点pから矩形rへの距離(外側なら最近点距離、内側なら0)
    """
    x, y = p
    dx = max(r.x0 - x, 0, x - r.x1)
    dy = max(r.y0 - y, 0, y - r.y1)
    return (dx*dx + dy*dy) ** 0.5

def segment_length(p1: Tuple[float,float], p2: Tuple[float,float]) -> float:
    return ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2) ** 0.5


# ============================================================
# 4) 診断(ページが何で構成されているか)
# ============================================================

def drawing_op_signature(d: Dict[str, Any]) -> Tuple[str, ...]:
    items = d.get("items", [])
    ops: List[str] = []
    for it in items:
        if isinstance(it, (list, tuple)) and len(it) >= 1 and isinstance(it[0], str):
            ops.append(it[0])
    return tuple(ops)

def classify_drawing(d: Dict[str, Any], page_rect: fitz.Rect, cfg: Config) -> str:
    r = safe_rect(d.get("rect"))
    stroke = d.get("color")
    fill = d.get("fill")
    width = d.get("width", None)
    stroke_op = d.get("stroke_opacity", d.get("opacity", 1.0))
    fill_op = d.get("fill_opacity", d.get("opacity", 1.0))

    has_stroke = (stroke is not None) and (stroke_op is None or stroke_op > 0)
    has_fill = (fill is not None) and (fill_op is None or fill_op > 0)

    if not page_rect.intersects(r):
        return "off_page"

    if rect_area(r) < 20:
        return "tiny_noise"

    if not has_stroke and not has_fill:
        return "non_painted_or_clip"

    if has_stroke and is_near_white(stroke) and (not has_fill or is_near_white(fill)):
        return "likely_invisible_white"

    if has_stroke and (width == 0 or (isinstance(width, (int, float)) and abs(width) < 1e-9)):
        return "hairline_stroke"

    return "visible_candidate"

def compute_text_stats(page: fitz.Page) -> Tuple[int, float]:
    d = page.get_text("dict")
    blocks = d.get("blocks", [])
    pr = page.rect
    page_area = rect_area(pr)

    area_sum = 0.0
    text_blocks = 0
    for b in blocks:
        if b.get("type") == 0:
            text_blocks += 1
            r = safe_rect(b.get("bbox"))
            area_sum += rect_area(r)

    ratio = (area_sum / page_area) if page_area > 0 else 0.0
    return text_blocks, clamp(ratio, 0.0, 1.0)

def compute_image_stats(page: fitz.Page) -> Tuple[int, int, float]:
    pr = page.rect
    page_area = rect_area(pr)

    imgs = page.get_images(full=True) or []
    images_count = len(imgs)

    placements = 0
    area_sum = 0.0
    for img in imgs:
        xref = img[0]
        rects = page.get_image_rects(xref) or []
        placements += len(rects)
        for r in rects:
            rr = pr & r
            area_sum += rect_area(rr)

    ratio = (area_sum / page_area) if page_area > 0 else 0.0
    return images_count, placements, clamp(ratio, 0.0, 1.0)

def compute_annot_stats(page: fitz.Page) -> Tuple[int, int]:
    ann_count = 0
    try:
        annots = page.annots()
        if annots:
            for _ in annots:
                ann_count += 1
    except Exception:
        ann_count = -1

    links = page.get_links() or []
    return ann_count, len(links)

def recommend_route(
    *,
    drawings_total: int,
    visible_candidate: int,
    non_painted_or_clip: int,
    likely_invisible_white: int,
    hairline_stroke: int,
    text_blocks: int,
    images_count: int,
    image_area_ratio: float,
    annots_count: int,
    links_count: int,
) -> Tuple[str, List[str]]:
    why: List[str] = []

    # --- Route B の兆候 ---
    strong_B = False
    if image_area_ratio >= 0.35:
        strong_B = True
        why.append(f"画像配置の占有率が高い(image_area_ratio={image_area_ratio:.2f})→ 図が画像化されている可能性")

    if images_count > 0 and visible_candidate < 10 and drawings_total < 30:
        strong_B = True
        why.append("imagesはあるがdraw(可視候補)が少ない → 画像ベース or 注釈appearanceの疑い")

    if annots_count not in (0, -1) and annots_count > 0:
        strong_B = True
        why.append(f"注釈が存在(annots_count={annots_count})→ 注釈側に情報がいる可能性")

    if links_count > 0:
        why.append(f"リンク領域が存在(links_count={links_count})→ 注釈/リンクに構造が偏っている可能性")

    # --- Route C の兆候 ---
    clip_ratio = (non_painted_or_clip / drawings_total) if drawings_total > 0 else 0.0
    inv_ratio = ((likely_invisible_white + non_painted_or_clip) / drawings_total) if drawings_total > 0 else 0.0
    strong_C = False
    if drawings_total >= 50 and (clip_ratio >= 0.30 or inv_ratio >= 0.45):
        strong_C = True
        why.append(f"不可視/clip疑いが多い(clip_ratio={clip_ratio:.2f}, inv_ratio={inv_ratio:.2f})→ 先にノイズ除去が必要")

    # --- Route A の兆候 ---
    strong_A = (visible_candidate >= 30) and (text_blocks > 0) and (image_area_ratio < 0.2)

    if strong_B and not strong_A:
        if text_blocks == 0:
            why.append("テキストが抽出できない → OCRの必要性が高い")
        return "B", why

    if strong_C and not strong_A:
        return "C", why

    # どちらでもない → Aから試す
    if strong_A:
        why.append("可視drawが十分 & テキストあり & 画像占有が低い → ベクター復元(Route A)が刺さる")
    else:
        why.append("明確にB/Cではない → まずRoute Aで試し、足りなければB/Cを併用")
    if hairline_stroke > 0:
        why.append(f"hairline(width==0)が検出({hairline_stroke}件)→ 抽出時に捨てない実装が必要")
    return "A", why

def diagnose_pdf(pdf_path: str, outdir: str, cfg: Config, overlay: bool) -> None:
    ensure_dir(outdir)
    doc = fitz.open(pdf_path)
    report = []

    overlay_pages = []

    for page_no in range(doc.page_count):
        page = doc[page_no]
        pr = page.rect

        text_blocks, text_area_ratio = compute_text_stats(page)
        images_count, placements, image_area_ratio = compute_image_stats(page)
        annots_count, links_count = compute_annot_stats(page)

        drawings = page.get_drawings()
        drawings_total = len(drawings)

        cls_counter = Counter()
        op_hist = Counter()

        for d in drawings:
            cls = classify_drawing(d, pr, cfg)
            cls_counter[cls] += 1
            sig = drawing_op_signature(d)
            if sig:
                op_hist[str(sig)] += 1

        route, why = recommend_route(
            drawings_total=drawings_total,
            visible_candidate=cls_counter["visible_candidate"],
            non_painted_or_clip=cls_counter["non_painted_or_clip"],
            likely_invisible_white=cls_counter["likely_invisible_white"],
            hairline_stroke=cls_counter["hairline_stroke"],
            text_blocks=text_blocks,
            images_count=images_count,
            image_area_ratio=image_area_ratio,
            annots_count=annots_count,
            links_count=links_count,
        )

        page_info = {
            "page_no": page_no,
            "page_size": [pr.width, pr.height],
            "text_blocks": text_blocks,
            "text_area_ratio": text_area_ratio,
            "drawings_total": drawings_total,
            "drawings_class": dict(cls_counter),
            "drawings_op_hist_top20": dict(op_hist.most_common(20)),
            "images_count": images_count,
            "image_placements": placements,
            "image_area_ratio": image_area_ratio,
            "annots_count": annots_count,
            "links_count": links_count,
            "recommended_route": route,
            "why": why,
        }
        report.append(page_info)
        dump_json(os.path.join(outdir, f"page_{page_no:03d}.json"), page_info)

        print(f"[diagnose page {page_no}] Route={route} drawings={drawings_total} "
              f"visible={cls_counter['visible_candidate']} clip={cls_counter['non_painted_or_clip']} "
              f"images_area={image_area_ratio:.2f} text_blocks={text_blocks} annots={annots_count} links={links_count}")
        for w in why[:3]:
            print("  -", w)

        if overlay:
            overlay_pages.append(page_no)

    dump_json(os.path.join(outdir, "report.json"), report)

    if overlay:
        # overlay.pdf:bboxを重ねて「どこに何があるか」検算する
        out_pdf = fitz.open()
        for page_no in overlay_pages:
            p = doc[page_no]
            img, scale = render_page_image(p, cfg.render_zoom)
            draw = ImageDraw.Draw(img)

            # text bbox
            try:
                tb = p.get_text("dict").get("blocks", [])
                for b in tb:
                    if b.get("type") == 0:
                        r = safe_rect(b.get("bbox"))
                        bb = (r.x0, r.y0, r.x1, r.y1)
                        x0,y0,x1,y1 = pdf_bbox_to_img(bb, scale)
                        draw.rectangle([x0,y0,x1,y1], outline="blue", width=2)
            except Exception:
                pass

            # image bbox
            try:
                imgs = p.get_images(full=True) or []
                for im in imgs:
                    xref = im[0]
                    for r in p.get_image_rects(xref) or []:
                        bb = (r.x0, r.y0, r.x1, r.y1)
                        x0,y0,x1,y1 = pdf_bbox_to_img(bb, scale)
                        draw.rectangle([x0,y0,x1,y1], outline="green", width=2)
            except Exception:
                pass

            # drawings bbox(重いので少し抑制したい場合はここを調整)
            try:
                drs = p.get_drawings()
                for d in drs[:800]:
                    r = safe_rect(d.get("rect"))
                    if rect_area(r) < 50:
                        continue
                    bb = (r.x0, r.y0, r.x1, r.y1)
                    x0,y0,x1,y1 = pdf_bbox_to_img(bb, scale)
                    draw.rectangle([x0,y0,x1,y1], outline="red", width=1)
            except Exception:
                pass

            # PIL画像 → PDFページとして貼り付け
            pix = fitz.Pixmap(fitz.csRGB, img.size[0], img.size[1], img.tobytes(), False)
            newp = out_pdf.new_page(width=p.rect.width, height=p.rect.height)
            # 画像の貼付(scaleを戻してページ全体に合わせる)
            rect = fitz.Rect(0, 0, p.rect.width, p.rect.height)
            newp.insert_image(rect, pixmap=pix)

        overlay_path = os.path.join(outdir, "overlay.pdf")
        out_pdf.save(overlay_path)
        out_pdf.close()
        print("overlay saved:", overlay_path)

    doc.close()


# ============================================================
# 5) Route A/C:ベクター(drawings)からノード/エッジ抽出
# ============================================================

def extract_text_blocks(page: fitz.Page) -> List[Tuple[fitz.Rect, str]]:
    """
    PDF内テキスト(OCR不要)のブロックを抽出する。
    Route A/Cでノード内ラベル割当の基礎になる。
    """
    blocks = page.get_text("blocks")  # (x0,y0,x1,y1, "text", block_no, block_type, ...)
    out: List[Tuple[fitz.Rect, str]] = []
    for b in blocks:
        if len(b) >= 5:
            r = fitz.Rect(b[0], b[1], b[2], b[3])
            text = (b[4] or "").strip()
            if text:
                out.append((r, text))
    return out

def drawing_is_usable_vector(d: Dict[str, Any], page_rect: fitz.Rect, cfg: Config, route: str) -> bool:
    """
    Route A: なるべく広く拾う
    Route C: ノイズになりやすいものを除外してから拾う
    """
    cls = classify_drawing(d, page_rect, cfg)

    if route == "A":
        # Aは基本的に拾ってから後段でフィルタ
        return cls not in ("off_page",)

    # Route C: ノイズ除去を強める
    if cls == "off_page":
        return False
    if cfg.ignore_tiny_drawings and cls == "tiny_noise":
        return False
    if cfg.ignore_non_painted and cls == "non_painted_or_clip":
        return False
    if cfg.ignore_likely_invisible_white and cls == "likely_invisible_white":
        return False
    return True

def node_candidates_from_drawings(page: fitz.Page, cfg: Config, route: str) -> List[fitz.Rect]:
    """
    ノード候補bboxをdrawingsから集める。
    - まずdrawingsの外接矩形(rect)を候補として集める
    - 大きすぎ/小さすぎを除外
    - 近い/重なるbboxはマージ(ノードの二重線などを吸収)
    """
    pr = page.rect
    page_area = rect_area(pr)
    drawings = page.get_drawings()

    rects: List[fitz.Rect] = []
    for d in drawings:
        if not drawing_is_usable_vector(d, pr, cfg, route):
            continue
        r = safe_rect(d.get("rect"))

        a = rect_area(r)
        if a < cfg.node_min_area:
            continue
        if a > page_area * cfg.node_max_area_ratio:
            continue
        rects.append(r)

    # 重なりが多いので「ほぼ同じbbox」をまとめる
    merged = merge_overlapping_rects(rects, iou_thresh=0.6, inflate=1.5)
    return merged

def merge_overlapping_rects(rects: List[fitz.Rect], iou_thresh: float, inflate: float) -> List[fitz.Rect]:
    """
    簡易なbboxマージ。
    - inflate: 近接しているものもマージしたいので少し膨らませてIOU判定する
    """
    def iou(a: fitz.Rect, b: fitz.Rect) -> float:
        inter = a & b
        if inter.is_empty:
            return 0.0
        union_area = rect_area(a) + rect_area(b) - rect_area(inter)
        return rect_area(inter) / union_area if union_area > 0 else 0.0

    out: List[fitz.Rect] = []
    rects_sorted = sorted(rects, key=lambda r: (r.y0, r.x0))
    used = [False] * len(rects_sorted)

    for i, r in enumerate(rects_sorted):
        if used[i]:
            continue
        cur = fitz.Rect(r)
        used[i] = True

        changed = True
        while changed:
            changed = False
            for j, s in enumerate(rects_sorted):
                if used[j]:
                    continue
                # 少し膨らませて近接を拾う
                cur_infl = fitz.Rect(cur.x0 - inflate, cur.y0 - inflate, cur.x1 + inflate, cur.y1 + inflate)
                s_infl = fitz.Rect(s.x0 - inflate, s.y0 - inflate, s.x1 + inflate, s.y1 + inflate)
                if iou(cur_infl, s_infl) >= iou_thresh:
                    # unionへ拡張
                    cur |= s
                    used[j] = True
                    changed = True

        out.append(cur)

    return out

def assign_text_to_nodes(nodes: List[fitz.Rect], text_blocks: List[Tuple[fitz.Rect, str]], cfg: Config) -> List[Node]:
    """
    ノードbboxの内部にあるテキストを割り当て、Node構造にする。
    (ノード種別推定は簡易。必要なら形状判定を後から追加しやすい)
    """
    out: List[Node] = []
    for idx, nr in enumerate(nodes):
        # insetを持たせた内側判定(bboxズレ対策)
        inner = fitz.Rect(nr.x0 + cfg.text_inset, nr.y0 + cfg.text_inset, nr.x1 - cfg.text_inset, nr.y1 - cfg.text_inset)

        texts = []
        for tr, t in text_blocks:
            # テキストbbox中心がノード内に入っていれば採用
            cx = (tr.x0 + tr.x1) / 2
            cy = (tr.y0 + tr.y1) / 2
            if inner.contains(fitz.Point(cx, cy)):
                texts.append(t)

        text = "\n".join(texts).strip()

        # 種別推定(超簡易)
        # ここは後から「菱形=decision」などに差し替えやすい場所
        node_type = "unknown"
        out.append(Node(
            id=f"N{idx:04d}",
            type=node_type,
            bbox=(nr.x0, nr.y0, nr.x1, nr.y1),
            text=text
        ))
    return out

def extract_line_segments_from_drawings(page: fitz.Page, cfg: Config, route: str) -> List[Tuple[Tuple[float,float], Tuple[float,float]]]:
    """
    drawingsのitemsから線分を拾う。
    - 'l' (line) の連続を線分にする
    - 短すぎる線は捨てる
    """
    pr = page.rect
    segments = []
    for d in page.get_drawings():
        if not drawing_is_usable_vector(d, pr, cfg, route):
            continue
        items = d.get("items", [])
        last_pt = None
        for it in items:
            if not (isinstance(it, (list, tuple)) and len(it) >= 1):
                continue
            op = it[0]
            if op == "l" and len(it) >= 3:
                p1 = it[1]
                p2 = it[2]
                # p1/p2がPoint互換のタプルと仮定
                a = (float(p1[0]), float(p1[1]))
                b = (float(p2[0]), float(p2[1]))
                if segment_length(a, b) >= cfg.edge_min_len:
                    segments.append((a, b))
                last_pt = b
            else:
                last_pt = None
    return segments

def nearest_node_id(point: Tuple[float,float], nodes: List[Node], cfg: Config) -> Optional[str]:
    """
    点が近いノードを探す(距離がcfg.snap_dist以内)。
    """
    best = None
    best_d = 1e9
    for n in nodes:
        r = fitz.Rect(*n.bbox)
        d = dist_point_to_rect(point, r)
        if d < best_d:
            best_d = d
            best = n.id
    if best is not None and best_d <= cfg.snap_dist:
        return best
    return None

def build_edges_from_segments(segments, nodes: List[Node], cfg: Config) -> List[Edge]:
    """
    線分の端点をノードへスナップしてエッジ化する。
    - 端点→ノードが両方見つかればエッジ候補
    - 同じノード内ならループ(src==dst)として残す(後で捨ててもOK)
    """
    edges: List[Edge] = []
    eidx = 0
    for a, b in segments:
        src = nearest_node_id(a, nodes, cfg)
        dst = nearest_node_id(b, nodes, cfg)

        if src is None or dst is None:
            continue

        edges.append(Edge(
            id=f"E{eidx:05d}",
            src=src,
            dst=dst,
            points=[a, b]
        ))
        eidx += 1
    return edges

def draw_debug_overlay(
    page: fitz.Page,
    nodes: List[Node],
    edges: List[Edge],
    out_path: str,
    cfg: Config,
    title: str = ""
) -> None:
    """
    デバッグ画像(抽出結果の重ね描き)を出力する。
    """
    img, scale = render_page_image(page, cfg.render_zoom)
    draw = ImageDraw.Draw(img)

    # ノードbbox(赤)
    for n in nodes:
        x0,y0,x1,y1 = pdf_bbox_to_img(n.bbox, scale)
        draw.rectangle([x0,y0,x1,y1], outline="red", width=3)
        # idを左上に(見やすさ優先)
        draw.text((x0+2, y0+2), n.id, fill="red")

    # エッジ(青)
    for e in edges:
        pts = [(int(x*scale), int(y*scale)) for (x,y) in e.points]
        if len(pts) >= 2:
            draw.line(pts, fill="blue", width=3)
            # エッジIDも近くに
            mx = int((pts[0][0] + pts[-1][0]) / 2)
            my = int((pts[0][1] + pts[-1][1]) / 2)
            draw.text((mx, my), e.id, fill="blue")

    if title:
        draw.text((10, 10), title, fill="black")

    img.save(out_path)

def export_page_result(outdir: str, page_no: int, nodes: List[Node], edges: List[Edge], debug_img_path: str) -> None:
    page_dir = os.path.join(outdir, f"page_{page_no:03d}")
    ensure_dir(page_dir)

    dump_json(os.path.join(page_dir, "nodes.json"), [asdict(n) for n in nodes])
    dump_json(os.path.join(page_dir, "edges.json"), [asdict(e) for e in edges])

    # debug画像は既に保存済みの想定(ここではパスだけ出しても良い)
    dump_json(os.path.join(page_dir, "artifacts.json"), {
        "nodes": "nodes.json",
        "edges": "edges.json",
        "debug_image": os.path.basename(debug_img_path),
    })


def extract_route_A_or_C(pdf_path: str, outdir: str, cfg: Config, route: str) -> None:
    """
    Route A/C 共通実装。
    違いは drawing_is_usable_vector() のフィルタ強度だけ。
    """
    ensure_dir(outdir)
    doc = fitz.open(pdf_path)

    for page_no in range(doc.page_count):
        page = doc[page_no]

        # 1) ノード候補bboxを収集
        node_rects = node_candidates_from_drawings(page, cfg, route)

        # 2) テキスト抽出(PDFにテキストが入っている場合は高精度)
        text_blocks = extract_text_blocks(page)

        # 3) ノードへテキスト割当
        nodes = assign_text_to_nodes(node_rects, text_blocks, cfg)

        # 4) エッジ候補(線分)→ノードへスナップしてエッジ化
        segments = extract_line_segments_from_drawings(page, cfg, route)
        edges = build_edges_from_segments(segments, nodes, cfg)

        # 5) デバッグ画像
        page_dir = os.path.join(outdir, f"page_{page_no:03d}")
        ensure_dir(page_dir)
        debug_path = os.path.join(page_dir, f"debug.{cfg.out_image_format}")
        draw_debug_overlay(page, nodes, edges, debug_path, cfg, title=f"Route {route} page {page_no}")

        # 6) JSON出力
        export_page_result(outdir, page_no, nodes, edges, debug_path)

        print(f"[Route {route}] page {page_no}: nodes={len(nodes)} edges={len(edges)} debug={debug_path}")

    doc.close()


# ============================================================
# 6) Route B:画像/注釈主体 → 画像化してCV/OCRで抽出
# ============================================================

def try_import_cv():
    """
    Route BはOpenCV / pytesseractが無いと機能が落ちるので遅延importにする。
    """
    try:
        import cv2  # type: ignore
    except Exception as e:
        raise RuntimeError("Route B requires opencv-python. Install: pip install opencv-python") from e

    try:
        import pytesseract  # type: ignore
    except Exception:
        pytesseract = None  # OCR無しでも枠だけは取れるようにする

    return cv2, pytesseract

def cv_detect_node_contours(img: Image.Image, cfg: Config) -> List[Tuple[int,int,int,int]]:
    """
    画像からノード候補(矩形/菱形っぽい外接矩形)を検出する(簡易)。
    - 輪郭抽出 → 面積フィルタ → 外接矩形として返す
    改修ポイント:
      - ここに「4頂点近似」「角度判定」「菱形判定」などを追加すると精度が上がる
    """
    cv2, _ = try_import_cv()
    np_img = np.array(img)

    gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY)
    blur = cv2.GaussianBlur(gray, (5,5), 0)

    # 線画は2値化で輪郭が取りやすい
    th = cv2.adaptiveThreshold(blur, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                               cv2.THRESH_BINARY_INV, 31, 7)

    # 線の途切れを少しつなぐ
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3))
    th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, kernel, iterations=2)

    contours, _hier = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    h, w = gray.shape[:2]
    img_area = float(h * w)
    out = []

    for c in contours:
        area = float(cv2.contourArea(c))
        if area < cfg.cv_min_contour_area:
            continue
        if area > img_area * cfg.cv_max_contour_area_ratio:
            continue
        x,y,bw,bh = cv2.boundingRect(c)
        # 極端に細いもの(線)を避ける
        if bw < 10 or bh < 10:
            continue
        out.append((x, y, x+bw, y+bh))

    # 重なりbboxを簡易マージ(NMS的に)
    out = merge_bboxes_iou(out, iou_thresh=0.4)
    return out

def merge_bboxes_iou(bboxes: List[Tuple[int,int,int,int]], iou_thresh: float) -> List[Tuple[int,int,int,int]]:
    """
    画像bbox版の簡易マージ
    """
    def iou(a,b) -> float:
        ax0,ay0,ax1,ay1 = a
        bx0,by0,bx1,by1 = b
        ix0,iy0 = max(ax0,bx0), max(ay0,by0)
        ix1,iy1 = min(ax1,bx1), min(ay1,by1)
        iw, ih = max(0, ix1-ix0), max(0, iy1-iy0)
        inter = iw*ih
        if inter <= 0:
            return 0.0
        ua = (ax1-ax0)*(ay1-ay0)
        ub = (bx1-bx0)*(by1-by0)
        return inter / float(ua + ub - inter)

    out = []
    used = [False]*len(bboxes)
    for i,a in enumerate(bboxes):
        if used[i]:
            continue
        x0,y0,x1,y1 = a
        used[i] = True
        changed = True
        while changed:
            changed = False
            for j,b in enumerate(bboxes):
                if used[j]:
                    continue
                if iou((x0,y0,x1,y1), b) >= iou_thresh:
                    bx0,by0,bx1,by1 = b
                    x0,y0 = min(x0,bx0), min(y0,by0)
                    x1,y1 = max(x1,bx1), max(y1,by1)
                    used[j] = True
                    changed = True
        out.append((x0,y0,x1,y1))
    return out

def ocr_text_in_bbox(img: Image.Image, bbox: Tuple[int,int,int,int], cfg: Config) -> str:
    """
    bbox領域のOCR(pytesseractが無ければ空文字)
    改修ポイント:
      - 前処理(拡大、2値化、ノイズ除去)を追加すると精度が上がる
      - OCRのconfig(psm等)も調整余地あり
    """
    _cv2, pytesseract = try_import_cv()
    if pytesseract is None:
        return ""

    x0,y0,x1,y1 = bbox
    crop = img.crop((x0, y0, x1, y1))

    # OCR向けに少し拡大
    scale = 2
    crop = crop.resize((crop.size[0]*scale, crop.size[1]*scale), resample=Image.BICUBIC)

    text = pytesseract.image_to_string(crop, lang=cfg.ocr_lang)
    return (text or "").strip()

def cv_detect_edges(img: Image.Image, cfg: Config) -> List[Tuple[Tuple[int,int], Tuple[int,int]]]:
    """
    画像から線分(エッジ候補)を検出(HoughLinesP)。
    ループや曲線は弱いので、改修ポイントとして「骨格化 + 経路追跡」などを追加可能。
    """
    cv2, _ = try_import_cv()
    np_img = np.array(img)

    gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY)
    blur = cv2.GaussianBlur(gray, (5,5), 0)
    edges = cv2.Canny(blur, 50, 150)

    # 線分検出
    lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100,
                            minLineLength=40, maxLineGap=10)
    out = []
    if lines is None:
        return out

    for l in lines:
        x1,y1,x2,y2 = l[0]
        # あまりに短い線はノイズ
        if ((x1-x2)**2 + (y1-y2)**2) ** 0.5 < 30:
            continue
        out.append(((x1,y1), (x2,y2)))
    return out

def img_point_to_nearest_node(point: Tuple[int,int], node_bboxes: List[Tuple[int,int,int,int]], snap_px: int) -> Optional[int]:
    """
    画像座標で、点に最も近いノードbboxを返す(index)。
    """
    px,py = point
    best = None
    best_d = 1e9
    for i,(x0,y0,x1,y1) in enumerate(node_bboxes):
        # 点→矩形距離
        dx = max(x0 - px, 0, px - x1)
        dy = max(y0 - py, 0, py - y1)
        d = (dx*dx + dy*dy) ** 0.5
        if d < best_d:
            best_d = d
            best = i
    if best is not None and best_d <= snap_px:
        return best
    return None

def extract_route_B(pdf_path: str, outdir: str, cfg: Config) -> None:
    """
    Route B:
      - ページを画像化
      - CVでノード候補(bbox)検出
      - OCRでノード内テキスト抽出(可能なら)
      - CVで線分検出 → ノードへスナップしてエッジ化
      - デバッグ画像で検算
    """
    ensure_dir(outdir)
    doc = fitz.open(pdf_path)

    for page_no in range(doc.page_count):
        page = doc[page_no]

        # 1) ページ全体を画像化(ここがRoute Bの基礎)
        img, scale = render_page_image(page, cfg.render_zoom)

        # 2) ノードbbox検出(画像座標)
        node_bboxes = cv_detect_node_contours(img, cfg)

        # 3) OCRでノード内テキスト
        nodes: List[Node] = []
        for i, bb in enumerate(node_bboxes):
            text = ocr_text_in_bbox(img, bb, cfg)
            # 画像bbox → PDF座標bboxへ戻す(scaleで割る)
            x0,y0,x1,y1 = bb
            pdf_bb = (x0/scale, y0/scale, x1/scale, y1/scale)
            nodes.append(Node(
                id=f"N{ i:04d }",
                type="unknown",
                bbox=pdf_bb,
                text=text
            ))

        # 4) 線分(画像座標)検出 → ノードへスナップしてエッジ化
        lines = cv_detect_edges(img, cfg)
        edges: List[Edge] = []
        eidx = 0

        snap_px = int(cfg.snap_dist * scale)  # PDF閾値を画像閾値へ
        for (p1, p2) in lines:
            i_src = img_point_to_nearest_node(p1, node_bboxes, snap_px)
            i_dst = img_point_to_nearest_node(p2, node_bboxes, snap_px)
            if i_src is None or i_dst is None:
                continue

            # 画像点 → PDF点
            a = (p1[0]/scale, p1[1]/scale)
            b = (p2[0]/scale, p2[1]/scale)

            edges.append(Edge(
                id=f"E{eidx:05d}",
                src=f"N{i_src:04d}",
                dst=f"N{i_dst:04d}",
                points=[a, b]
            ))
            eidx += 1

        # 5) デバッグ画像
        page_dir = os.path.join(outdir, f"page_{page_no:03d}")
        ensure_dir(page_dir)
        debug_path = os.path.join(page_dir, f"debug.{cfg.out_image_format}")
        draw_debug_overlay(page, nodes, edges, debug_path, cfg, title=f"Route B page {page_no}")

        # 6) JSON出力
        export_page_result(outdir, page_no, nodes, edges, debug_path)

        print(f"[Route B] page {page_no}: nodes={len(nodes)} edges={len(edges)} debug={debug_path}")

    doc.close()


# ============================================================
# 7) CLI
# ============================================================

def main():
    parser = argparse.ArgumentParser()
    sub = parser.add_subparsers(dest="cmd", required=True)

    p_diag = sub.add_parser("diagnose", help="diagnose PDF composition")
    p_diag.add_argument("pdf", help="input pdf path")
    p_diag.add_argument("--outdir", default="out_diag", help="output dir")
    p_diag.add_argument("--overlay", action="store_true", help="export overlay.pdf for visual inspection")

    p_ext = sub.add_parser("extract", help="extract nodes/edges by route")
    p_ext.add_argument("pdf", help="input pdf path")
    p_ext.add_argument("--outdir", default="out_extract", help="output dir")
    p_ext.add_argument("--route", choices=["auto","A","B","C"], default="auto", help="route selection")

    args = parser.parse_args()
    cfg = Config()

    if args.cmd == "diagnose":
        diagnose_pdf(args.pdf, args.outdir, cfg, overlay=args.overlay)
        return

    if args.cmd == "extract":
        # autoの場合は1ページ目診断でルートを決める(必要ならページごとに変えてもOK)
        route = args.route
        if route == "auto":
            doc = fitz.open(args.pdf)
            page = doc[0]
            pr = page.rect

            text_blocks, _ = compute_text_stats(page)
            images_count, placements, image_area_ratio = compute_image_stats(page)
            annots_count, links_count = compute_annot_stats(page)

            drawings = page.get_drawings()
            cls_counter = Counter()
            for d in drawings:
                cls_counter[classify_drawing(d, pr, cfg)] += 1

            route, why = recommend_route(
                drawings_total=len(drawings),
                visible_candidate=cls_counter["visible_candidate"],
                non_painted_or_clip=cls_counter["non_painted_or_clip"],
                likely_invisible_white=cls_counter["likely_invisible_white"],
                hairline_stroke=cls_counter["hairline_stroke"],
                text_blocks=text_blocks,
                images_count=images_count,
                image_area_ratio=image_area_ratio,
                annots_count=annots_count,
                links_count=links_count,
            )
            doc.close()
            print("[auto route]", route)
            for w in why:
                print(" -", w)

        if route == "A":
            extract_route_A_or_C(args.pdf, args.outdir, cfg, route="A")
        elif route == "B":
            extract_route_B(args.pdf, args.outdir, cfg)
        elif route == "C":
            extract_route_A_or_C(args.pdf, args.outdir, cfg, route="C")
        return


if __name__ == "__main__":
    main()

Routeごとの改修ポイント

  • Route A 改修ポイント

    • ノード種別推定(矩形/菱形/楕円)を node_candidates_from_drawings に追加
    • エッジの結合(複数線分をポリライン化)
    • 矢尻検出(方向推定)やYes/Noラベル抽出
  • Route B 改修ポイント

    • cv_detect_node_contours に4頂点近似→角度判定を追加して「菱形=decision」を判別
    • 線検出を Hough から「骨格化+経路追跡」に変えるとループに強くなる
    • OCR前処理(2値化/拡大/ノイズ除去)で認識率UP
  • Route C 改修ポイント

    • drawing_is_usable_vector の除外条件をPDF群に合わせて調整
    • 背景グリッド(等間隔の長線群)除外などを追加するとノード誤検出が減る

補足(大事)

  • 上のRoute A/Cは「まずは動く」ことを優先して 線分をそのままエッジ化しています。複雑なループや途中折れは、線分の結合/経路追跡で精度が上がります。
  • Route Bは CV/OCRの下駄が要るので、環境差が出ます

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?