複雑なフロー図(ループ、開始点が複数、3分岐、注釈/装飾が混在)を PDFから機械的に構造化したい。
でも「PDFの見た目」と「内部表現」はズレがちなので、いきなりノード/エッジ抽出に突入すると失敗します。
この記事では、まず ページが何で構成されているかを診断し、その結果に応じて処理ルートを切り替える構成にします。
-
Route A: ベクター図形(drawings)が主 →
drawingsからノード/エッジ抽出 - Route B: 画像 or 注釈が主 → ページを画像化してCV/OCRで抽出(必要なら注釈も併用)
- Route C: clip/不可視ノイズが主 → 先にノイズ除去してからRoute Aへ
0. 前提と注意
- 100%自動復元は現実的に難しいです(PDFは表現が自由すぎる)
- なので 自動抽出 + デバッグオーバレイ画像で検算 + 最小限の手補正が実務で強いです
- ここでは「構造化の基礎」を作ります
1. 環境
必須
- Python 3.10+
- PyMuPDF
pip install pymupdf pillow numpy
Route B で推奨(画像ベース)
pip install opencv-python pytesseract
-
OCRを使うなら Tesseract本体が必要です(日本語モデルも)
- macOS:
brew install tesseract+ 日本語言語データ - Linux:
apt install tesseract-ocr tesseract-ocr-jpn
- macOS:
※ OCRが要らない(PDFテキストが取れる)なら pytesseract は無くても動きます。
2. 診断:このページは何で構成されてる?
**「見えるのにdrawingsが取れない」「見えないのにノードっぽい矩形が取れる」**の正体はだいたい以下です:
- clip用パス(線も塗りも無しなのに矩形パスだけ存在)
- 透明(opacity=0)や背景同色(白)で見えない
- 画像として貼られている(フロー図が丸ごとラスター化)
- 注釈(annotation)やリンク領域に情報が寄っている
- 線幅0(hairline):見た目は線があるのに抽出で落とすと消える
まずは 統計を出して切り分けします。
2.1 診断コード(JSONレポート + オーバレイPDFも任意で出力)
pdf_flow_toolkit.pyとして保存して使います(この記事後半のRouteコードも同ファイルに入っています)
(このファイルの全文は記事末尾に載せています)
使い方:
python pdf_flow_toolkit.py diagnose input.pdf --outdir out --overlay
出力:
-
out/report.json(ページごとの統計) -
out/overlay.pdf(デバッグ用:bbox重ね描き)
診断の見方(超ざっくり):
- imagesの占有率が高い → Route B(画像ベース)
- annots/linksが多い → Route B(注釈/リンクも確認)
- non_painted_or_clipが多い → Route C(ノイズ除去してからA)
- visible_candidateが多い & text_blocksあり → Route A(ベクターで攻める)
3. Route分岐:どのルートで抽出するか?
ここでは機械的に3ルートに分けます。
-
Route A(ベクター復元)
-
visible_candidateが十分 -
text_blocksが取れる - 画像占有が低い
-
-
Route B(画像/注釈復元)
-
image_area_ratioが高い、またはdrawings_totalが少ないのに見た目はある -
annots_countやlinks_countが多い
-
-
Route C(ノイズ除去→A)
-
non_painted_or_clipやlikely_invisible_*が多く、ノード検出がノイズまみれになる
-
4. 抽出:各Routeで「ノードJSON」「エッジJSON」「デバッグ画像」を出す
出力は3点セットで統一します。
-
nodes.json-
id,type,bbox,text
-
-
edges.json-
id,src,dst,points(折れ線)
-
-
debug.png- 抽出できたノードbboxとエッジを重ね描き(検算用)
Route A:drawings(ベクター)からノード/エッジ抽出
実行:
python pdf_flow_toolkit.py extract --route A input.pdf --outdir outA
Route B:画像/注釈が主なPDFを画像化して抽出(CV + OCR)
実行:
python pdf_flow_toolkit.py extract --route B input.pdf --outdir outB
Route C:clip/不可視ノイズを除去してからRoute A相当で抽出
実行:
python pdf_flow_toolkit.py extract --route C input.pdf --outdir outC
5. ソースコード全文(診断 + Route A/B/C)
1ファイル完結です。あとから関数単位で分割しやすいように構成しています。
# ここに pdf_flow_toolkit.py の全文を貼る(次のチャンク参照)
6. まとめ
- PDFは「見た目」と「内部」がズレるので 診断→ルート分岐が効く
- Route A/B/C のどれで攻めるべきかが分かれば、抽出精度と工数が跳ね上がる
- 実務では デバッグオーバレイ画像が最強(誤抽出の原因が一瞬で分かる)
pdf_flow_toolkit.py 全文(診断 + Route A/B/C)
ここから先を、そのまま
pdf_flow_toolkit.pyとして保存できます。
JSONとデバッグ画像が必ず出るようにしています。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
pdf_flow_toolkit.py
目的:
- PDFページの「構成要素(text/drawings/images/annots)」を診断する
- 診断結果に応じたRoute(A/B/C)でノード/エッジを抽出する
- どのノード/エッジを取れたか分かるデバッグ画像を出す
Route:
A: ベクター(drawings)主体 → drawings + PDF text を優先して抽出
B: 画像/注釈主体 → ページレンダリング画像 + CV/OCR(必要なら注釈も併用)
C: clip/不可視ノイズ主体 → まずノイズ除去してからA相当
依存:
- 必須: pymupdf, pillow, numpy
- RouteB推奨: opencv-python, pytesseract, (tesseract本体)
"""
from __future__ import annotations
import argparse
import json
import os
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple
from collections import Counter
import fitz # PyMuPDF
import numpy as np
from PIL import Image, ImageDraw, ImageFont
# ============================================================
# 1) 設定(閾値・挙動をまとめておくと改修が楽)
# ============================================================
@dataclass
class Config:
# --- 共通 ---
render_zoom: float = 2.0 # デバッグ画像用のレンダリング倍率(大きいほど見やすいが重い)
out_image_format: str = "png"
# --- ノード候補のフィルタ ---
node_min_area: float = 800.0 # 小さすぎる候補はノード扱いしない
node_max_area_ratio: float = 0.6 # ページ面積の大半を占める矩形は背景/枠の可能性が高い
# --- テキスト割当 ---
text_inset: float = 2.0 # bbox内判定に少し余裕を持たせる(PDFのbboxズレ対策)
# --- エッジ推定 ---
snap_dist: float = 10.0 # 線端点をノードにスナップする距離閾値(PDF座標系)
edge_min_len: float = 15.0 # 短すぎる線はノイズとして除外
# --- Route C(ノイズ除去強化) ---
ignore_non_painted: bool = True
ignore_likely_invisible_white: bool = True
ignore_tiny_drawings: bool = True
# --- Route B(画像ベース) ---
cv_min_contour_area: float = 900.0
cv_max_contour_area_ratio: float = 0.6
ocr_lang: str = "jpn" # Tesseractの言語。環境により "jpn+eng" なども可
# ============================================================
# 2) データ構造(JSON出力の形を固定すると後工程が楽)
# ============================================================
@dataclass
class Node:
id: str
type: str # "process" / "decision" / "terminator" / "unknown" など
bbox: Tuple[float, float, float, float] # (x0, y0, x1, y1) PDF座標
text: str
@dataclass
class Edge:
id: str
src: str
dst: str
points: List[Tuple[float, float]] # 折れ線(PDF座標)
# ============================================================
# 3) ユーティリティ
# ============================================================
def rect_area(r: fitz.Rect) -> float:
return max(0.0, r.width) * max(0.0, r.height)
def safe_rect(x: Any) -> fitz.Rect:
if isinstance(x, fitz.Rect):
return x
if isinstance(x, (list, tuple)) and len(x) == 4:
return fitz.Rect(x)
return fitz.Rect(0, 0, 0, 0)
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def is_near_white(color: Any, tol: float = 0.02) -> bool:
if color is None:
return False
if isinstance(color, (int, float)):
return abs(1.0 - float(color)) < tol
if isinstance(color, (list, tuple)) and len(color) >= 3:
try:
return all(abs(1.0 - float(c)) < tol for c in color[:3])
except Exception:
return False
return False
def ensure_dir(path: str) -> None:
os.makedirs(path, exist_ok=True)
def dump_json(path: str, obj: Any) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
def render_page_image(page: fitz.Page, zoom: float) -> Tuple[Image.Image, float]:
"""
PyMuPDFのページを画像化して返す。
返すscaleは「PDF座標 → 画像座標」への倍率(=zoom)。
"""
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat, alpha=False)
img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples)
return img, zoom
def pdf_bbox_to_img(bbox: Tuple[float, float, float, float], scale: float) -> Tuple[int, int, int, int]:
x0, y0, x1, y1 = bbox
return int(x0 * scale), int(y0 * scale), int(x1 * scale), int(y1 * scale)
def dist_point_to_rect(p: Tuple[float, float], r: fitz.Rect) -> float:
"""
点pから矩形rへの距離(外側なら最近点距離、内側なら0)
"""
x, y = p
dx = max(r.x0 - x, 0, x - r.x1)
dy = max(r.y0 - y, 0, y - r.y1)
return (dx*dx + dy*dy) ** 0.5
def segment_length(p1: Tuple[float,float], p2: Tuple[float,float]) -> float:
return ((p1[0]-p2[0])**2 + (p1[1]-p2[1])**2) ** 0.5
# ============================================================
# 4) 診断(ページが何で構成されているか)
# ============================================================
def drawing_op_signature(d: Dict[str, Any]) -> Tuple[str, ...]:
items = d.get("items", [])
ops: List[str] = []
for it in items:
if isinstance(it, (list, tuple)) and len(it) >= 1 and isinstance(it[0], str):
ops.append(it[0])
return tuple(ops)
def classify_drawing(d: Dict[str, Any], page_rect: fitz.Rect, cfg: Config) -> str:
r = safe_rect(d.get("rect"))
stroke = d.get("color")
fill = d.get("fill")
width = d.get("width", None)
stroke_op = d.get("stroke_opacity", d.get("opacity", 1.0))
fill_op = d.get("fill_opacity", d.get("opacity", 1.0))
has_stroke = (stroke is not None) and (stroke_op is None or stroke_op > 0)
has_fill = (fill is not None) and (fill_op is None or fill_op > 0)
if not page_rect.intersects(r):
return "off_page"
if rect_area(r) < 20:
return "tiny_noise"
if not has_stroke and not has_fill:
return "non_painted_or_clip"
if has_stroke and is_near_white(stroke) and (not has_fill or is_near_white(fill)):
return "likely_invisible_white"
if has_stroke and (width == 0 or (isinstance(width, (int, float)) and abs(width) < 1e-9)):
return "hairline_stroke"
return "visible_candidate"
def compute_text_stats(page: fitz.Page) -> Tuple[int, float]:
d = page.get_text("dict")
blocks = d.get("blocks", [])
pr = page.rect
page_area = rect_area(pr)
area_sum = 0.0
text_blocks = 0
for b in blocks:
if b.get("type") == 0:
text_blocks += 1
r = safe_rect(b.get("bbox"))
area_sum += rect_area(r)
ratio = (area_sum / page_area) if page_area > 0 else 0.0
return text_blocks, clamp(ratio, 0.0, 1.0)
def compute_image_stats(page: fitz.Page) -> Tuple[int, int, float]:
pr = page.rect
page_area = rect_area(pr)
imgs = page.get_images(full=True) or []
images_count = len(imgs)
placements = 0
area_sum = 0.0
for img in imgs:
xref = img[0]
rects = page.get_image_rects(xref) or []
placements += len(rects)
for r in rects:
rr = pr & r
area_sum += rect_area(rr)
ratio = (area_sum / page_area) if page_area > 0 else 0.0
return images_count, placements, clamp(ratio, 0.0, 1.0)
def compute_annot_stats(page: fitz.Page) -> Tuple[int, int]:
ann_count = 0
try:
annots = page.annots()
if annots:
for _ in annots:
ann_count += 1
except Exception:
ann_count = -1
links = page.get_links() or []
return ann_count, len(links)
def recommend_route(
*,
drawings_total: int,
visible_candidate: int,
non_painted_or_clip: int,
likely_invisible_white: int,
hairline_stroke: int,
text_blocks: int,
images_count: int,
image_area_ratio: float,
annots_count: int,
links_count: int,
) -> Tuple[str, List[str]]:
why: List[str] = []
# --- Route B の兆候 ---
strong_B = False
if image_area_ratio >= 0.35:
strong_B = True
why.append(f"画像配置の占有率が高い(image_area_ratio={image_area_ratio:.2f})→ 図が画像化されている可能性")
if images_count > 0 and visible_candidate < 10 and drawings_total < 30:
strong_B = True
why.append("imagesはあるがdraw(可視候補)が少ない → 画像ベース or 注釈appearanceの疑い")
if annots_count not in (0, -1) and annots_count > 0:
strong_B = True
why.append(f"注釈が存在(annots_count={annots_count})→ 注釈側に情報がいる可能性")
if links_count > 0:
why.append(f"リンク領域が存在(links_count={links_count})→ 注釈/リンクに構造が偏っている可能性")
# --- Route C の兆候 ---
clip_ratio = (non_painted_or_clip / drawings_total) if drawings_total > 0 else 0.0
inv_ratio = ((likely_invisible_white + non_painted_or_clip) / drawings_total) if drawings_total > 0 else 0.0
strong_C = False
if drawings_total >= 50 and (clip_ratio >= 0.30 or inv_ratio >= 0.45):
strong_C = True
why.append(f"不可視/clip疑いが多い(clip_ratio={clip_ratio:.2f}, inv_ratio={inv_ratio:.2f})→ 先にノイズ除去が必要")
# --- Route A の兆候 ---
strong_A = (visible_candidate >= 30) and (text_blocks > 0) and (image_area_ratio < 0.2)
if strong_B and not strong_A:
if text_blocks == 0:
why.append("テキストが抽出できない → OCRの必要性が高い")
return "B", why
if strong_C and not strong_A:
return "C", why
# どちらでもない → Aから試す
if strong_A:
why.append("可視drawが十分 & テキストあり & 画像占有が低い → ベクター復元(Route A)が刺さる")
else:
why.append("明確にB/Cではない → まずRoute Aで試し、足りなければB/Cを併用")
if hairline_stroke > 0:
why.append(f"hairline(width==0)が検出({hairline_stroke}件)→ 抽出時に捨てない実装が必要")
return "A", why
def diagnose_pdf(pdf_path: str, outdir: str, cfg: Config, overlay: bool) -> None:
ensure_dir(outdir)
doc = fitz.open(pdf_path)
report = []
overlay_pages = []
for page_no in range(doc.page_count):
page = doc[page_no]
pr = page.rect
text_blocks, text_area_ratio = compute_text_stats(page)
images_count, placements, image_area_ratio = compute_image_stats(page)
annots_count, links_count = compute_annot_stats(page)
drawings = page.get_drawings()
drawings_total = len(drawings)
cls_counter = Counter()
op_hist = Counter()
for d in drawings:
cls = classify_drawing(d, pr, cfg)
cls_counter[cls] += 1
sig = drawing_op_signature(d)
if sig:
op_hist[str(sig)] += 1
route, why = recommend_route(
drawings_total=drawings_total,
visible_candidate=cls_counter["visible_candidate"],
non_painted_or_clip=cls_counter["non_painted_or_clip"],
likely_invisible_white=cls_counter["likely_invisible_white"],
hairline_stroke=cls_counter["hairline_stroke"],
text_blocks=text_blocks,
images_count=images_count,
image_area_ratio=image_area_ratio,
annots_count=annots_count,
links_count=links_count,
)
page_info = {
"page_no": page_no,
"page_size": [pr.width, pr.height],
"text_blocks": text_blocks,
"text_area_ratio": text_area_ratio,
"drawings_total": drawings_total,
"drawings_class": dict(cls_counter),
"drawings_op_hist_top20": dict(op_hist.most_common(20)),
"images_count": images_count,
"image_placements": placements,
"image_area_ratio": image_area_ratio,
"annots_count": annots_count,
"links_count": links_count,
"recommended_route": route,
"why": why,
}
report.append(page_info)
dump_json(os.path.join(outdir, f"page_{page_no:03d}.json"), page_info)
print(f"[diagnose page {page_no}] Route={route} drawings={drawings_total} "
f"visible={cls_counter['visible_candidate']} clip={cls_counter['non_painted_or_clip']} "
f"images_area={image_area_ratio:.2f} text_blocks={text_blocks} annots={annots_count} links={links_count}")
for w in why[:3]:
print(" -", w)
if overlay:
overlay_pages.append(page_no)
dump_json(os.path.join(outdir, "report.json"), report)
if overlay:
# overlay.pdf:bboxを重ねて「どこに何があるか」検算する
out_pdf = fitz.open()
for page_no in overlay_pages:
p = doc[page_no]
img, scale = render_page_image(p, cfg.render_zoom)
draw = ImageDraw.Draw(img)
# text bbox
try:
tb = p.get_text("dict").get("blocks", [])
for b in tb:
if b.get("type") == 0:
r = safe_rect(b.get("bbox"))
bb = (r.x0, r.y0, r.x1, r.y1)
x0,y0,x1,y1 = pdf_bbox_to_img(bb, scale)
draw.rectangle([x0,y0,x1,y1], outline="blue", width=2)
except Exception:
pass
# image bbox
try:
imgs = p.get_images(full=True) or []
for im in imgs:
xref = im[0]
for r in p.get_image_rects(xref) or []:
bb = (r.x0, r.y0, r.x1, r.y1)
x0,y0,x1,y1 = pdf_bbox_to_img(bb, scale)
draw.rectangle([x0,y0,x1,y1], outline="green", width=2)
except Exception:
pass
# drawings bbox(重いので少し抑制したい場合はここを調整)
try:
drs = p.get_drawings()
for d in drs[:800]:
r = safe_rect(d.get("rect"))
if rect_area(r) < 50:
continue
bb = (r.x0, r.y0, r.x1, r.y1)
x0,y0,x1,y1 = pdf_bbox_to_img(bb, scale)
draw.rectangle([x0,y0,x1,y1], outline="red", width=1)
except Exception:
pass
# PIL画像 → PDFページとして貼り付け
pix = fitz.Pixmap(fitz.csRGB, img.size[0], img.size[1], img.tobytes(), False)
newp = out_pdf.new_page(width=p.rect.width, height=p.rect.height)
# 画像の貼付(scaleを戻してページ全体に合わせる)
rect = fitz.Rect(0, 0, p.rect.width, p.rect.height)
newp.insert_image(rect, pixmap=pix)
overlay_path = os.path.join(outdir, "overlay.pdf")
out_pdf.save(overlay_path)
out_pdf.close()
print("overlay saved:", overlay_path)
doc.close()
# ============================================================
# 5) Route A/C:ベクター(drawings)からノード/エッジ抽出
# ============================================================
def extract_text_blocks(page: fitz.Page) -> List[Tuple[fitz.Rect, str]]:
"""
PDF内テキスト(OCR不要)のブロックを抽出する。
Route A/Cでノード内ラベル割当の基礎になる。
"""
blocks = page.get_text("blocks") # (x0,y0,x1,y1, "text", block_no, block_type, ...)
out: List[Tuple[fitz.Rect, str]] = []
for b in blocks:
if len(b) >= 5:
r = fitz.Rect(b[0], b[1], b[2], b[3])
text = (b[4] or "").strip()
if text:
out.append((r, text))
return out
def drawing_is_usable_vector(d: Dict[str, Any], page_rect: fitz.Rect, cfg: Config, route: str) -> bool:
"""
Route A: なるべく広く拾う
Route C: ノイズになりやすいものを除外してから拾う
"""
cls = classify_drawing(d, page_rect, cfg)
if route == "A":
# Aは基本的に拾ってから後段でフィルタ
return cls not in ("off_page",)
# Route C: ノイズ除去を強める
if cls == "off_page":
return False
if cfg.ignore_tiny_drawings and cls == "tiny_noise":
return False
if cfg.ignore_non_painted and cls == "non_painted_or_clip":
return False
if cfg.ignore_likely_invisible_white and cls == "likely_invisible_white":
return False
return True
def node_candidates_from_drawings(page: fitz.Page, cfg: Config, route: str) -> List[fitz.Rect]:
"""
ノード候補bboxをdrawingsから集める。
- まずdrawingsの外接矩形(rect)を候補として集める
- 大きすぎ/小さすぎを除外
- 近い/重なるbboxはマージ(ノードの二重線などを吸収)
"""
pr = page.rect
page_area = rect_area(pr)
drawings = page.get_drawings()
rects: List[fitz.Rect] = []
for d in drawings:
if not drawing_is_usable_vector(d, pr, cfg, route):
continue
r = safe_rect(d.get("rect"))
a = rect_area(r)
if a < cfg.node_min_area:
continue
if a > page_area * cfg.node_max_area_ratio:
continue
rects.append(r)
# 重なりが多いので「ほぼ同じbbox」をまとめる
merged = merge_overlapping_rects(rects, iou_thresh=0.6, inflate=1.5)
return merged
def merge_overlapping_rects(rects: List[fitz.Rect], iou_thresh: float, inflate: float) -> List[fitz.Rect]:
"""
簡易なbboxマージ。
- inflate: 近接しているものもマージしたいので少し膨らませてIOU判定する
"""
def iou(a: fitz.Rect, b: fitz.Rect) -> float:
inter = a & b
if inter.is_empty:
return 0.0
union_area = rect_area(a) + rect_area(b) - rect_area(inter)
return rect_area(inter) / union_area if union_area > 0 else 0.0
out: List[fitz.Rect] = []
rects_sorted = sorted(rects, key=lambda r: (r.y0, r.x0))
used = [False] * len(rects_sorted)
for i, r in enumerate(rects_sorted):
if used[i]:
continue
cur = fitz.Rect(r)
used[i] = True
changed = True
while changed:
changed = False
for j, s in enumerate(rects_sorted):
if used[j]:
continue
# 少し膨らませて近接を拾う
cur_infl = fitz.Rect(cur.x0 - inflate, cur.y0 - inflate, cur.x1 + inflate, cur.y1 + inflate)
s_infl = fitz.Rect(s.x0 - inflate, s.y0 - inflate, s.x1 + inflate, s.y1 + inflate)
if iou(cur_infl, s_infl) >= iou_thresh:
# unionへ拡張
cur |= s
used[j] = True
changed = True
out.append(cur)
return out
def assign_text_to_nodes(nodes: List[fitz.Rect], text_blocks: List[Tuple[fitz.Rect, str]], cfg: Config) -> List[Node]:
"""
ノードbboxの内部にあるテキストを割り当て、Node構造にする。
(ノード種別推定は簡易。必要なら形状判定を後から追加しやすい)
"""
out: List[Node] = []
for idx, nr in enumerate(nodes):
# insetを持たせた内側判定(bboxズレ対策)
inner = fitz.Rect(nr.x0 + cfg.text_inset, nr.y0 + cfg.text_inset, nr.x1 - cfg.text_inset, nr.y1 - cfg.text_inset)
texts = []
for tr, t in text_blocks:
# テキストbbox中心がノード内に入っていれば採用
cx = (tr.x0 + tr.x1) / 2
cy = (tr.y0 + tr.y1) / 2
if inner.contains(fitz.Point(cx, cy)):
texts.append(t)
text = "\n".join(texts).strip()
# 種別推定(超簡易)
# ここは後から「菱形=decision」などに差し替えやすい場所
node_type = "unknown"
out.append(Node(
id=f"N{idx:04d}",
type=node_type,
bbox=(nr.x0, nr.y0, nr.x1, nr.y1),
text=text
))
return out
def extract_line_segments_from_drawings(page: fitz.Page, cfg: Config, route: str) -> List[Tuple[Tuple[float,float], Tuple[float,float]]]:
"""
drawingsのitemsから線分を拾う。
- 'l' (line) の連続を線分にする
- 短すぎる線は捨てる
"""
pr = page.rect
segments = []
for d in page.get_drawings():
if not drawing_is_usable_vector(d, pr, cfg, route):
continue
items = d.get("items", [])
last_pt = None
for it in items:
if not (isinstance(it, (list, tuple)) and len(it) >= 1):
continue
op = it[0]
if op == "l" and len(it) >= 3:
p1 = it[1]
p2 = it[2]
# p1/p2がPoint互換のタプルと仮定
a = (float(p1[0]), float(p1[1]))
b = (float(p2[0]), float(p2[1]))
if segment_length(a, b) >= cfg.edge_min_len:
segments.append((a, b))
last_pt = b
else:
last_pt = None
return segments
def nearest_node_id(point: Tuple[float,float], nodes: List[Node], cfg: Config) -> Optional[str]:
"""
点が近いノードを探す(距離がcfg.snap_dist以内)。
"""
best = None
best_d = 1e9
for n in nodes:
r = fitz.Rect(*n.bbox)
d = dist_point_to_rect(point, r)
if d < best_d:
best_d = d
best = n.id
if best is not None and best_d <= cfg.snap_dist:
return best
return None
def build_edges_from_segments(segments, nodes: List[Node], cfg: Config) -> List[Edge]:
"""
線分の端点をノードへスナップしてエッジ化する。
- 端点→ノードが両方見つかればエッジ候補
- 同じノード内ならループ(src==dst)として残す(後で捨ててもOK)
"""
edges: List[Edge] = []
eidx = 0
for a, b in segments:
src = nearest_node_id(a, nodes, cfg)
dst = nearest_node_id(b, nodes, cfg)
if src is None or dst is None:
continue
edges.append(Edge(
id=f"E{eidx:05d}",
src=src,
dst=dst,
points=[a, b]
))
eidx += 1
return edges
def draw_debug_overlay(
page: fitz.Page,
nodes: List[Node],
edges: List[Edge],
out_path: str,
cfg: Config,
title: str = ""
) -> None:
"""
デバッグ画像(抽出結果の重ね描き)を出力する。
"""
img, scale = render_page_image(page, cfg.render_zoom)
draw = ImageDraw.Draw(img)
# ノードbbox(赤)
for n in nodes:
x0,y0,x1,y1 = pdf_bbox_to_img(n.bbox, scale)
draw.rectangle([x0,y0,x1,y1], outline="red", width=3)
# idを左上に(見やすさ優先)
draw.text((x0+2, y0+2), n.id, fill="red")
# エッジ(青)
for e in edges:
pts = [(int(x*scale), int(y*scale)) for (x,y) in e.points]
if len(pts) >= 2:
draw.line(pts, fill="blue", width=3)
# エッジIDも近くに
mx = int((pts[0][0] + pts[-1][0]) / 2)
my = int((pts[0][1] + pts[-1][1]) / 2)
draw.text((mx, my), e.id, fill="blue")
if title:
draw.text((10, 10), title, fill="black")
img.save(out_path)
def export_page_result(outdir: str, page_no: int, nodes: List[Node], edges: List[Edge], debug_img_path: str) -> None:
page_dir = os.path.join(outdir, f"page_{page_no:03d}")
ensure_dir(page_dir)
dump_json(os.path.join(page_dir, "nodes.json"), [asdict(n) for n in nodes])
dump_json(os.path.join(page_dir, "edges.json"), [asdict(e) for e in edges])
# debug画像は既に保存済みの想定(ここではパスだけ出しても良い)
dump_json(os.path.join(page_dir, "artifacts.json"), {
"nodes": "nodes.json",
"edges": "edges.json",
"debug_image": os.path.basename(debug_img_path),
})
def extract_route_A_or_C(pdf_path: str, outdir: str, cfg: Config, route: str) -> None:
"""
Route A/C 共通実装。
違いは drawing_is_usable_vector() のフィルタ強度だけ。
"""
ensure_dir(outdir)
doc = fitz.open(pdf_path)
for page_no in range(doc.page_count):
page = doc[page_no]
# 1) ノード候補bboxを収集
node_rects = node_candidates_from_drawings(page, cfg, route)
# 2) テキスト抽出(PDFにテキストが入っている場合は高精度)
text_blocks = extract_text_blocks(page)
# 3) ノードへテキスト割当
nodes = assign_text_to_nodes(node_rects, text_blocks, cfg)
# 4) エッジ候補(線分)→ノードへスナップしてエッジ化
segments = extract_line_segments_from_drawings(page, cfg, route)
edges = build_edges_from_segments(segments, nodes, cfg)
# 5) デバッグ画像
page_dir = os.path.join(outdir, f"page_{page_no:03d}")
ensure_dir(page_dir)
debug_path = os.path.join(page_dir, f"debug.{cfg.out_image_format}")
draw_debug_overlay(page, nodes, edges, debug_path, cfg, title=f"Route {route} page {page_no}")
# 6) JSON出力
export_page_result(outdir, page_no, nodes, edges, debug_path)
print(f"[Route {route}] page {page_no}: nodes={len(nodes)} edges={len(edges)} debug={debug_path}")
doc.close()
# ============================================================
# 6) Route B:画像/注釈主体 → 画像化してCV/OCRで抽出
# ============================================================
def try_import_cv():
"""
Route BはOpenCV / pytesseractが無いと機能が落ちるので遅延importにする。
"""
try:
import cv2 # type: ignore
except Exception as e:
raise RuntimeError("Route B requires opencv-python. Install: pip install opencv-python") from e
try:
import pytesseract # type: ignore
except Exception:
pytesseract = None # OCR無しでも枠だけは取れるようにする
return cv2, pytesseract
def cv_detect_node_contours(img: Image.Image, cfg: Config) -> List[Tuple[int,int,int,int]]:
"""
画像からノード候補(矩形/菱形っぽい外接矩形)を検出する(簡易)。
- 輪郭抽出 → 面積フィルタ → 外接矩形として返す
改修ポイント:
- ここに「4頂点近似」「角度判定」「菱形判定」などを追加すると精度が上がる
"""
cv2, _ = try_import_cv()
np_img = np.array(img)
gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY)
blur = cv2.GaussianBlur(gray, (5,5), 0)
# 線画は2値化で輪郭が取りやすい
th = cv2.adaptiveThreshold(blur, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY_INV, 31, 7)
# 線の途切れを少しつなぐ
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3,3))
th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, kernel, iterations=2)
contours, _hier = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
h, w = gray.shape[:2]
img_area = float(h * w)
out = []
for c in contours:
area = float(cv2.contourArea(c))
if area < cfg.cv_min_contour_area:
continue
if area > img_area * cfg.cv_max_contour_area_ratio:
continue
x,y,bw,bh = cv2.boundingRect(c)
# 極端に細いもの(線)を避ける
if bw < 10 or bh < 10:
continue
out.append((x, y, x+bw, y+bh))
# 重なりbboxを簡易マージ(NMS的に)
out = merge_bboxes_iou(out, iou_thresh=0.4)
return out
def merge_bboxes_iou(bboxes: List[Tuple[int,int,int,int]], iou_thresh: float) -> List[Tuple[int,int,int,int]]:
"""
画像bbox版の簡易マージ
"""
def iou(a,b) -> float:
ax0,ay0,ax1,ay1 = a
bx0,by0,bx1,by1 = b
ix0,iy0 = max(ax0,bx0), max(ay0,by0)
ix1,iy1 = min(ax1,bx1), min(ay1,by1)
iw, ih = max(0, ix1-ix0), max(0, iy1-iy0)
inter = iw*ih
if inter <= 0:
return 0.0
ua = (ax1-ax0)*(ay1-ay0)
ub = (bx1-bx0)*(by1-by0)
return inter / float(ua + ub - inter)
out = []
used = [False]*len(bboxes)
for i,a in enumerate(bboxes):
if used[i]:
continue
x0,y0,x1,y1 = a
used[i] = True
changed = True
while changed:
changed = False
for j,b in enumerate(bboxes):
if used[j]:
continue
if iou((x0,y0,x1,y1), b) >= iou_thresh:
bx0,by0,bx1,by1 = b
x0,y0 = min(x0,bx0), min(y0,by0)
x1,y1 = max(x1,bx1), max(y1,by1)
used[j] = True
changed = True
out.append((x0,y0,x1,y1))
return out
def ocr_text_in_bbox(img: Image.Image, bbox: Tuple[int,int,int,int], cfg: Config) -> str:
"""
bbox領域のOCR(pytesseractが無ければ空文字)
改修ポイント:
- 前処理(拡大、2値化、ノイズ除去)を追加すると精度が上がる
- OCRのconfig(psm等)も調整余地あり
"""
_cv2, pytesseract = try_import_cv()
if pytesseract is None:
return ""
x0,y0,x1,y1 = bbox
crop = img.crop((x0, y0, x1, y1))
# OCR向けに少し拡大
scale = 2
crop = crop.resize((crop.size[0]*scale, crop.size[1]*scale), resample=Image.BICUBIC)
text = pytesseract.image_to_string(crop, lang=cfg.ocr_lang)
return (text or "").strip()
def cv_detect_edges(img: Image.Image, cfg: Config) -> List[Tuple[Tuple[int,int], Tuple[int,int]]]:
"""
画像から線分(エッジ候補)を検出(HoughLinesP)。
ループや曲線は弱いので、改修ポイントとして「骨格化 + 経路追跡」などを追加可能。
"""
cv2, _ = try_import_cv()
np_img = np.array(img)
gray = cv2.cvtColor(np_img, cv2.COLOR_RGB2GRAY)
blur = cv2.GaussianBlur(gray, (5,5), 0)
edges = cv2.Canny(blur, 50, 150)
# 線分検出
lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=100,
minLineLength=40, maxLineGap=10)
out = []
if lines is None:
return out
for l in lines:
x1,y1,x2,y2 = l[0]
# あまりに短い線はノイズ
if ((x1-x2)**2 + (y1-y2)**2) ** 0.5 < 30:
continue
out.append(((x1,y1), (x2,y2)))
return out
def img_point_to_nearest_node(point: Tuple[int,int], node_bboxes: List[Tuple[int,int,int,int]], snap_px: int) -> Optional[int]:
"""
画像座標で、点に最も近いノードbboxを返す(index)。
"""
px,py = point
best = None
best_d = 1e9
for i,(x0,y0,x1,y1) in enumerate(node_bboxes):
# 点→矩形距離
dx = max(x0 - px, 0, px - x1)
dy = max(y0 - py, 0, py - y1)
d = (dx*dx + dy*dy) ** 0.5
if d < best_d:
best_d = d
best = i
if best is not None and best_d <= snap_px:
return best
return None
def extract_route_B(pdf_path: str, outdir: str, cfg: Config) -> None:
"""
Route B:
- ページを画像化
- CVでノード候補(bbox)検出
- OCRでノード内テキスト抽出(可能なら)
- CVで線分検出 → ノードへスナップしてエッジ化
- デバッグ画像で検算
"""
ensure_dir(outdir)
doc = fitz.open(pdf_path)
for page_no in range(doc.page_count):
page = doc[page_no]
# 1) ページ全体を画像化(ここがRoute Bの基礎)
img, scale = render_page_image(page, cfg.render_zoom)
# 2) ノードbbox検出(画像座標)
node_bboxes = cv_detect_node_contours(img, cfg)
# 3) OCRでノード内テキスト
nodes: List[Node] = []
for i, bb in enumerate(node_bboxes):
text = ocr_text_in_bbox(img, bb, cfg)
# 画像bbox → PDF座標bboxへ戻す(scaleで割る)
x0,y0,x1,y1 = bb
pdf_bb = (x0/scale, y0/scale, x1/scale, y1/scale)
nodes.append(Node(
id=f"N{ i:04d }",
type="unknown",
bbox=pdf_bb,
text=text
))
# 4) 線分(画像座標)検出 → ノードへスナップしてエッジ化
lines = cv_detect_edges(img, cfg)
edges: List[Edge] = []
eidx = 0
snap_px = int(cfg.snap_dist * scale) # PDF閾値を画像閾値へ
for (p1, p2) in lines:
i_src = img_point_to_nearest_node(p1, node_bboxes, snap_px)
i_dst = img_point_to_nearest_node(p2, node_bboxes, snap_px)
if i_src is None or i_dst is None:
continue
# 画像点 → PDF点
a = (p1[0]/scale, p1[1]/scale)
b = (p2[0]/scale, p2[1]/scale)
edges.append(Edge(
id=f"E{eidx:05d}",
src=f"N{i_src:04d}",
dst=f"N{i_dst:04d}",
points=[a, b]
))
eidx += 1
# 5) デバッグ画像
page_dir = os.path.join(outdir, f"page_{page_no:03d}")
ensure_dir(page_dir)
debug_path = os.path.join(page_dir, f"debug.{cfg.out_image_format}")
draw_debug_overlay(page, nodes, edges, debug_path, cfg, title=f"Route B page {page_no}")
# 6) JSON出力
export_page_result(outdir, page_no, nodes, edges, debug_path)
print(f"[Route B] page {page_no}: nodes={len(nodes)} edges={len(edges)} debug={debug_path}")
doc.close()
# ============================================================
# 7) CLI
# ============================================================
def main():
parser = argparse.ArgumentParser()
sub = parser.add_subparsers(dest="cmd", required=True)
p_diag = sub.add_parser("diagnose", help="diagnose PDF composition")
p_diag.add_argument("pdf", help="input pdf path")
p_diag.add_argument("--outdir", default="out_diag", help="output dir")
p_diag.add_argument("--overlay", action="store_true", help="export overlay.pdf for visual inspection")
p_ext = sub.add_parser("extract", help="extract nodes/edges by route")
p_ext.add_argument("pdf", help="input pdf path")
p_ext.add_argument("--outdir", default="out_extract", help="output dir")
p_ext.add_argument("--route", choices=["auto","A","B","C"], default="auto", help="route selection")
args = parser.parse_args()
cfg = Config()
if args.cmd == "diagnose":
diagnose_pdf(args.pdf, args.outdir, cfg, overlay=args.overlay)
return
if args.cmd == "extract":
# autoの場合は1ページ目診断でルートを決める(必要ならページごとに変えてもOK)
route = args.route
if route == "auto":
doc = fitz.open(args.pdf)
page = doc[0]
pr = page.rect
text_blocks, _ = compute_text_stats(page)
images_count, placements, image_area_ratio = compute_image_stats(page)
annots_count, links_count = compute_annot_stats(page)
drawings = page.get_drawings()
cls_counter = Counter()
for d in drawings:
cls_counter[classify_drawing(d, pr, cfg)] += 1
route, why = recommend_route(
drawings_total=len(drawings),
visible_candidate=cls_counter["visible_candidate"],
non_painted_or_clip=cls_counter["non_painted_or_clip"],
likely_invisible_white=cls_counter["likely_invisible_white"],
hairline_stroke=cls_counter["hairline_stroke"],
text_blocks=text_blocks,
images_count=images_count,
image_area_ratio=image_area_ratio,
annots_count=annots_count,
links_count=links_count,
)
doc.close()
print("[auto route]", route)
for w in why:
print(" -", w)
if route == "A":
extract_route_A_or_C(args.pdf, args.outdir, cfg, route="A")
elif route == "B":
extract_route_B(args.pdf, args.outdir, cfg)
elif route == "C":
extract_route_A_or_C(args.pdf, args.outdir, cfg, route="C")
return
if __name__ == "__main__":
main()
Routeごとの改修ポイント
-
Route A 改修ポイント
- ノード種別推定(矩形/菱形/楕円)を
node_candidates_from_drawingsに追加 - エッジの結合(複数線分をポリライン化)
- 矢尻検出(方向推定)やYes/Noラベル抽出
- ノード種別推定(矩形/菱形/楕円)を
-
Route B 改修ポイント
-
cv_detect_node_contoursに4頂点近似→角度判定を追加して「菱形=decision」を判別 - 線検出を Hough から「骨格化+経路追跡」に変えるとループに強くなる
- OCR前処理(2値化/拡大/ノイズ除去)で認識率UP
-
-
Route C 改修ポイント
-
drawing_is_usable_vectorの除外条件をPDF群に合わせて調整 - 背景グリッド(等間隔の長線群)除外などを追加するとノード誤検出が減る
-
補足(大事)
- 上のRoute A/Cは「まずは動く」ことを優先して 線分をそのままエッジ化しています。複雑なループや途中折れは、線分の結合/経路追跡で精度が上がります。
- Route Bは CV/OCRの下駄が要るので、環境差が出ます