はじめに
PDF や画像の帳票(請求書・申請書など)から、テキストと表をまとめて取り出したい場面はよくあります。
このとき OCI には Document Understanding(oci.ai_document)というサービスがあり、日本語の OCR と表抽出を高い精度で行えます。
ただ、いざ実装しようとすると、API が同期ではなく 非同期の processor job であること、入出力が Object Storage 経由であること、結果 JSON が camelCase の独自スキーマであることなど、地味にハマるポイントがいくつかあります。
この記事では、OCI Document Understanding を 1 つの文書解析関数として組み込むための実装パターンを、実務目線で整理します。フレームワークには依存しない、そのまま参考にできる形でコードも置きます。
対象は「OCI で帳票 OCR をやりたいが、processor job の扱い方が分からない」「OCR 結果を自前のスキーマに正規化して後段に渡したい」という方です。
結論
先に結論をまとめます。実装で押さえるべきは次の 6 点です。
-
同期 API はない。
create_processor_jobで投入 →get_processor_jobを poll → 完了後に Object Storage から結果 JSON を読む、という非同期フローで組む。 -
入出力は Object Storage 経由。入力ファイルを put し、出力は
output_prefix/<job_id>/...jsonに書き出される。 -
language は BCP-47 に正規化する。
JPNのような ISO 3 文字ではなくjaを渡す。 - 結果 JSON は camelCase の独自スキーマ。
pages[].lines[].text、pages[].tables[].headerRows/bodyRowsを自前のスキーマへ 決定論的に remap する。 - 座標は
boundingPolygon.normalizedVertices(0〜1 の比率)で返ってくる。そのまま xyxy に正規化して持つ。 - OCI SDK の呼び出しは 遅延 import + 別スレッドで隔離し、暗号化 PEM の 対話プロンプトを事前に止める。未設定・失敗時は安全に縮退させる。
以降で、それぞれを順番に実装していきます。
全体の流れ
処理の流れは次の通りです。
入力ファイル (PDF / 画像)
|
| 1. put_object
v
Object Storage (input bucket / input_prefix)
|
| 2. create_processor_job (features, language)
v
OCI Document Understanding (非同期 processor job)
|
| 3. poll: get_processor_job -> lifecycle_state
v
SUCCEEDED
|
| 4. list_objects + get_object
v
Object Storage (output bucket / output_prefix / <job_id> / *.json)
|
| 5. 結果 JSON を自前スキーマへ remap
v
構造化 JSON (raw_text / elements / tables / pages)
ポイントは、OCI 側は「投入」と「結果取得」が完全に分離していることです。job を投入したら ID を受け取り、その ID を使って状態を poll し、完了したら output prefix 配下の JSON を拾いに行く、という 3 段構えになります。
前提として、SDK は pip install oci、認証は ~/.oci/config(API キー方式)が用意済みとします。
なぜ非同期 processor job なのか
OCI Document Understanding には、1 リクエストで即レスポンスを返す同期 API(インラインの analyze-document)もありますが、運用で使うなら processor job 一択だと考えています。
理由は次の通りです。
- 複数ページ・複数ファイルをまとめて投げられる。input_location に複数 object を指定できる。
- 入出力が Object Storage で完結するので、大きい PDF でも HTTP ペイロードに乗せ替えなくてよい。
- 結果が JSON として永続化されるので、後から再取得・再処理がしやすい。
その代わり、「投入して、待って、取りに行く」というライフサイクル管理を自分で書く必要があります。ここが実装の本体です。
実装
ここからコードです。長いので折りたたみにしています。順番に置いていけば、そのまま 1 つのサービスクラスになります。
1. 設定(Config)
まず、job 投入に必要な非機密設定を 1 つの dataclass にまとめます。compartment / namespace / バケットといった「環境ごとに変わるが秘密ではない」値だけを持たせ、認証情報そのものは持たせないのがコツです。
入力バケットが未指定なら Object Storage の汎用バケットへ、出力バケットが未指定なら入力バケットへ fallback する、という解決ロジックも入れておくと運用が楽になります。
config.py — 設定 dataclass と env からの構築
from __future__ import annotations
import json
import os
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
_DEFAULT_FEATURES = ("DOCUMENT_TEXT_EXTRACTION", "TABLE_EXTRACTION")
def _float(value: str, default: float) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
@dataclass(frozen=True)
class DocumentUnderstandingConfig:
"""DU 実行に必要な非機密設定(認証情報は持たない)。"""
compartment_id: str = ""
fallback_compartment_id: str = ""
namespace: str = ""
fallback_namespace: str = ""
input_bucket: str = ""
fallback_input_bucket: str = ""
output_bucket: str = ""
input_prefix: str = "document-understanding/input"
output_prefix: str = "document-understanding/output"
language: str = "ja"
features: Sequence[str] = field(default_factory=lambda: list(_DEFAULT_FEATURES))
poll_interval_seconds: float = 5.0
timeout_seconds: float = 600.0
oci_config_file: str = "~/.oci/config"
oci_config_profile: str = "DEFAULT"
oci_region: str = ""
object_storage_region: str = ""
# --- 設定解決(空欄は fallback へ寄せる)---
def resolve_compartment_id(self) -> str:
return self.compartment_id.strip() or self.fallback_compartment_id.strip()
def resolve_namespace(self) -> str:
return self.namespace.strip() or self.fallback_namespace.strip()
def resolve_input_bucket(self) -> str:
return self.input_bucket.strip() or self.fallback_input_bucket.strip()
def resolve_output_bucket(self) -> str:
return self.output_bucket.strip() or self.resolve_input_bucket()
def is_configured(self) -> bool:
"""compartment / namespace / 入力バケットが揃って初めて job を投入できる。"""
return bool(
self.resolve_compartment_id()
and self.resolve_namespace()
and self.resolve_input_bucket()
)
@classmethod
def from_env(cls, env: Mapping[str, str] | None = None) -> "DocumentUnderstandingConfig":
src = os.environ if env is None else env
def _get(name: str, default: str = "") -> str:
return str(src.get(name, default) or default)
# features は JSON 配列 or カンマ区切りの両方を許容しておくと便利
raw = _get("DU_FEATURES").strip()
if raw:
try:
parsed = json.loads(raw)
features = [str(x) for x in parsed] if isinstance(parsed, list) else []
except (json.JSONDecodeError, ValueError):
features = [p.strip() for p in raw.split(",") if p.strip()]
features = features or list(_DEFAULT_FEATURES)
else:
features = list(_DEFAULT_FEATURES)
return cls(
compartment_id=_get("DU_COMPARTMENT_ID"),
fallback_compartment_id=_get("OCI_COMPARTMENT_ID"),
namespace=_get("DU_NAMESPACE"),
fallback_namespace=_get("OBJECT_STORAGE_NAMESPACE"),
input_bucket=_get("DU_INPUT_BUCKET"),
fallback_input_bucket=_get("OBJECT_STORAGE_BUCKET"),
output_bucket=_get("DU_OUTPUT_BUCKET"),
input_prefix=_get("DU_INPUT_PREFIX", "document-understanding/input"),
output_prefix=_get("DU_OUTPUT_PREFIX", "document-understanding/output"),
language=_get("DU_LANGUAGE", "ja"),
features=features,
poll_interval_seconds=_float(_get("DU_POLL_INTERVAL_SECONDS"), 5.0),
timeout_seconds=_float(_get("DU_TIMEOUT_SECONDS"), 600.0),
oci_config_file=_get("OCI_CONFIG_FILE", "~/.oci/config"),
oci_config_profile=_get("OCI_CONFIG_PROFILE", "DEFAULT"),
oci_region=_get("OCI_REGION"),
object_storage_region=_get("OCI_REGION") or _get("OBJECT_STORAGE_REGION"),
)
2. OCI 認証を非対話でロードする(地味な必須対策)
ここは見落としがちですが、運用で確実に効いてくる対策です。
OCI SDK の from_file() で読んだ config をクライアントに渡したとき、API 秘密鍵の PEM が暗号化されていてパスフレーズが未設定だと、SDK が標準入力からパスフレーズを聞きに来ます。サーバープロセスやコンテナの中でこれをやられると、入力する相手がいないままハングします。
なので「クライアントを作る前に、暗号化 PEM かどうかを自前で覗いて、パスフレーズ未設定なら明示的なエラーで止める」というガードを入れておきます。
oci_auth.py — 暗号化 PEM の対話プロンプトを事前に止める
from collections.abc import Mapping
from pathlib import Path
from typing import Any
PASSPHRASE_KEYS = frozenset({"pass_phrase", "passphrase", "key_password"})
_PASSPHRASE_REQUIRED = (
"OCI API 秘密鍵 PEM が暗号化されています。"
" pass_phrase を OCI config に設定するか、パスフレーズなしの PEM を使ってください。"
)
class OciPrivateKeyPassPhraseRequiredError(RuntimeError):
"""SDK が pass phrase を対話入力する前に止めるための例外。"""
def load_oci_config_without_prompt(
oci_config_module: Any,
config_file: str,
profile: str,
*,
region: str | None = None,
) -> dict[str, Any]:
"""OCI config を読み、暗号化 PEM の対話プロンプトを事前に防ぐ。"""
config_path = Path(config_file).expanduser()
config = dict(oci_config_module.from_file(str(config_path), profile))
if region:
config["region"] = region
_assert_key_loadable_without_prompt(config, config_path)
return config
def _assert_key_loadable_without_prompt(config: Mapping[str, object], config_file: Path) -> None:
key_file = str(config.get("key_file", "") or "").strip()
if not key_file or _has_passphrase(config):
return
path = Path(key_file).expanduser()
if not path.is_absolute():
path = Path(config_file).expanduser().parent / path
if _pem_is_encrypted(path):
raise OciPrivateKeyPassPhraseRequiredError(_PASSPHRASE_REQUIRED)
def _pem_is_encrypted(path: Path) -> bool:
"""PEM の暗号化マーカーを先頭だけ読んで検出する。"""
try:
head = path.read_bytes()[:4096]
except OSError:
return False
text = head.decode("utf-8", errors="ignore").upper()
return "BEGIN ENCRYPTED PRIVATE KEY" in text or "PROC-TYPE: 4,ENCRYPTED" in text
def _has_passphrase(config: Mapping[str, object]) -> bool:
return any(str(config.get(k, "") or "").strip() for k in PASSPHRASE_KEYS)
3. processor job を投入する
入力ファイルを Object Storage に put してから、create_processor_job を呼びます。
ここで重要なのが processor_config の組み立てです。features には抽出したい機能(テキスト抽出・表抽出・key-value 抽出)のクラスインスタンスを並べ、language には BCP-47 形式を渡します。JPN のような ISO 639-2 の 3 文字コードを渡すと弾かれることがあるので、ja に正規化しておきます。
job 投入 — put_object + create_processor_job + language 正規化
import importlib
import mimetypes
# 設定 feature 名 -> SDK feature クラス名。未知の値は無視する。
_FEATURE_CLASS_BY_NAME = {
"DOCUMENT_TEXT_EXTRACTION": "DocumentTextExtractionFeature",
"TEXT_EXTRACTION": "DocumentTextExtractionFeature",
"TABLE_EXTRACTION": "DocumentTableExtractionFeature",
"DOCUMENT_TABLE_EXTRACTION": "DocumentTableExtractionFeature",
"KEY_VALUE_EXTRACTION": "DocumentKeyValueExtractionFeature",
}
_LANGUAGE_ALIASES = {"JPN": "ja", "JP": "ja", "JA_JP": "ja-JP"}
def _bcp47_language(value: str) -> str:
raw = value.strip()
if not raw:
return "ja"
key = raw.replace("-", "_").upper()
if key in _LANGUAGE_ALIASES:
return _LANGUAGE_ALIASES[key]
parts = [p for p in raw.replace("_", "-").split("-") if p]
if not parts:
return "ja"
language = parts[0].lower()
subtags = [p.upper() if len(p) == 2 else p for p in parts[1:]]
return "-".join([language, *subtags])
def _input_object_name(input_prefix: str, document_id: str, content_type: str) -> str:
prefix = input_prefix.strip().strip("/")
mime = content_type.split(";")[0].strip() if content_type else ""
ext = mimetypes.guess_extension(mime) if mime else None
safe_id = "".join(c for c in document_id if c.isalnum() or c in "-_") or "document"
name = f"{safe_id}{ext or ''}"
return f"{prefix}/{name}" if prefix else name
def _build_features(models, feature_names) -> list:
features = []
for name in feature_names:
class_name = _FEATURE_CLASS_BY_NAME.get(str(name).strip().upper())
cls = getattr(models, class_name, None) if class_name else None
if cls is not None:
features.append(cls())
if not features:
features.append(models.DocumentTextExtractionFeature())
return features
def submit_job(config, document_client, storage_client,
source_bytes: bytes, content_type: str, document_id: str) -> str:
"""入力を put し、processor job を作成して job id を返す(同期)。"""
models = importlib.import_module("oci.ai_document.models")
namespace = config.resolve_namespace()
input_bucket = config.resolve_input_bucket()
object_name = _input_object_name(config.input_prefix, document_id, content_type)
storage_client.put_object(
namespace, input_bucket, object_name, source_bytes,
content_type=content_type or "application/octet-stream",
)
details = models.CreateProcessorJobDetails(
compartment_id=config.resolve_compartment_id(),
input_location=models.ObjectStorageLocations(
object_locations=[
models.ObjectLocation(
namespace_name=namespace,
bucket_name=input_bucket,
object_name=object_name,
)
]
),
output_location=models.OutputLocation(
namespace_name=namespace,
bucket_name=config.resolve_output_bucket(),
prefix=config.output_prefix.strip(),
),
processor_config=models.GeneralProcessorConfig(
features=_build_features(models, config.features),
language=_bcp47_language(config.language),
),
)
response = document_client.create_processor_job(create_processor_job_details=details)
job = getattr(response, "data", response)
job_id = getattr(job, "id", None)
if not job_id:
raise RuntimeError("create_processor_job が job id を返しませんでした。")
return str(job_id)
4. 完了を待って、結果 JSON を読む
job ID が取れたら、get_processor_job の lifecycle_state を poll します。SUCCEEDED で抜け、FAILED / CANCELED または timeout なら例外にして縮退させます。
完了後の結果は、output_prefix/<job_id>/ 配下に .json として書き出されるので、list_objects で列挙して最初の JSON を get_object で読みます。SDK 版・テスト用 fake 版の両方で本文を取り出せるよう、content / raw のどちらにも対応しておきます。
完了待ち(poll)と結果 JSON の読み込み
import json
import time
_SUCCEEDED = {"SUCCEEDED"}
_TERMINAL_FAILURE = {"FAILED", "CANCELED", "CANCELLED"}
def await_job(config, document_client, job_id: str) -> None:
"""SUCCEEDED まで poll する。失敗 / timeout は例外で縮退させる。"""
interval = float(config.poll_interval_seconds)
deadline = time.monotonic() + float(config.timeout_seconds)
while True:
response = document_client.get_processor_job(job_id)
job = getattr(response, "data", response)
state = str(getattr(job, "lifecycle_state", "") or "").upper()
if state in _SUCCEEDED:
return
if state in _TERMINAL_FAILURE:
detail = str(getattr(job, "lifecycle_details", "") or "")
raise RuntimeError(f"DU job が失敗しました (state={state} {detail}).")
if time.monotonic() >= deadline:
raise TimeoutError("DU job が制限時間内に完了しませんでした。")
time.sleep(interval)
def _response_bytes(response) -> bytes | None:
data = getattr(response, "data", response)
raw = getattr(data, "content", None)
if raw is None:
raw = getattr(data, "raw", None)
if raw is None:
raw = data
if isinstance(raw, str):
return raw.encode("utf-8")
if isinstance(raw, (bytes, bytearray)):
return bytes(raw)
return None
def read_first_result(config, storage_client, job_id: str) -> dict | None:
"""output_prefix/<job_id>/ 配下の最初の結果 JSON を読む(同期)。"""
namespace = config.resolve_namespace()
output_bucket = config.resolve_output_bucket()
prefix = "/".join(p for p in (config.output_prefix.strip().strip("/"), job_id) if p)
listed = storage_client.list_objects(namespace, output_bucket, prefix=prefix)
objects = getattr(getattr(listed, "data", listed), "objects", []) or []
for entry in objects:
name = str(getattr(entry, "name", "") or "")
if not name.endswith(".json"):
continue
raw = _response_bytes(storage_client.get_object(namespace, output_bucket, name))
if raw is None:
continue
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, ValueError):
continue
if isinstance(parsed, dict):
return parsed
return None
5. 結果 JSON を自前スキーマへ remap する
ここがこの記事で一番伝えたいところです。
DU の結果 JSON は camelCase の独自スキーマで、pages[].lines[].text(行テキスト)、pages[].tables[].headerRows / bodyRows / footerRows(表)、pages[].words[].confidence(信頼度)、detectedDocumentTypes[].documentType(文書種別)といった構造になっています。
これを そのまま後段で使うのはつらいので、raw_text / elements / tables / pages という扱いやすい自前スキーマへ 決定論的に変換します。LLM などを挟まず、純粋なマッピングだけで写すのがポイントです(再現性が出る・テストしやすい・速い)。
座標は boundingPolygon.normalizedVertices という 0〜1 の比率の頂点列で返ってきます。頂点列を xyxy(左上・右下)に正規化して持っておくと、後で「引用箇所をプレビューでハイライトする」といった用途に使えます。
remap — 結果 JSON を raw_text / elements / tables / pages へ写す
import math
from collections.abc import Mapping, Sequence
def _as_seq(value) -> Sequence:
return value if isinstance(value, (list, tuple)) else ()
def _as_int(value, *, default: int) -> int:
if isinstance(value, bool):
return default
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
return default
def normalize_bbox(vertices) -> list[float] | None:
"""normalizedVertices ([{x, y}, ...]) を [x1, y1, x2, y2] へ正規化する。"""
if not isinstance(vertices, list) or not vertices:
return None
xs, ys = [], []
for v in vertices:
if not isinstance(v, Mapping):
return None
x, y = v.get("x"), v.get("y")
if not isinstance(x, (int, float)) or not isinstance(y, (int, float)):
return None
xs.append(float(x))
ys.append(float(y))
coords = [min(xs), min(ys), max(xs), max(ys)]
return coords if all(math.isfinite(c) for c in coords) else None
def _bounding_polygon(obj: Mapping) -> list | None:
polygon = obj.get("boundingPolygon")
if isinstance(polygon, Mapping):
nv = polygon.get("normalizedVertices")
if isinstance(nv, list) and nv:
return nv
return None
def _remap_table_cells(table: Mapping) -> list[dict]:
cells = []
for row_key in ("headerRows", "bodyRows", "footerRows"):
for row in _as_seq(table.get(row_key)):
if not isinstance(row, Mapping):
continue
for cell in _as_seq(row.get("cells")):
if not isinstance(cell, Mapping):
continue
payload = {
"row": max(_as_int(cell.get("rowIndex"), default=0), 0),
"col": max(_as_int(cell.get("columnIndex"), default=0), 0),
"text": str(cell.get("text", "")).strip(),
"row_span": max(_as_int(cell.get("rowSpan"), default=1), 1),
"col_span": max(_as_int(cell.get("columnSpan"), default=1), 1),
}
bbox = normalize_bbox(_bounding_polygon(cell))
if bbox is not None:
payload["bbox"] = bbox
cells.append(payload)
return cells
def _detected_document_type(result: Mapping) -> str:
for entry in _as_seq(result.get("detectedDocumentTypes")):
if isinstance(entry, Mapping):
doc_type = str(entry.get("documentType", "")).strip()
if doc_type:
return doc_type
return "ドキュメント"
def result_to_payload(result: Mapping) -> dict:
"""DU の結果 JSON を扱いやすい自前スキーマへ決定論で remap する。"""
page_texts, pages, elements, tables, confidences = [], [], [], [], []
table_counter = 0
order = 0
for index, page in enumerate(_as_seq(result.get("pages"))):
if not isinstance(page, Mapping):
continue
page_number = _as_int(page.get("pageNumber"), default=index + 1)
pages.append({"page_number": page_number})
line_texts = []
for line in _as_seq(page.get("lines")):
if not isinstance(line, Mapping):
continue
text = str(line.get("text", "")).strip()
if not text:
continue
line_texts.append(text)
element = {"kind": "text", "text": text, "order": order, "page_number": page_number}
bbox = normalize_bbox(_bounding_polygon(line))
if bbox is not None:
element["bbox"] = bbox
elements.append(element)
order += 1
if line_texts:
page_texts.append("\n".join(line_texts))
for word in _as_seq(page.get("words")):
if isinstance(word, Mapping) and isinstance(word.get("confidence"), (int, float)):
confidences.append(float(word["confidence"]))
for table in _as_seq(page.get("tables")):
if not isinstance(table, Mapping):
continue
table_counter += 1
tables.append({
"table_id": f"du-table-{table_counter}",
"page_number": page_number,
"cells": _remap_table_cells(table),
})
confidence = round(sum(confidences) / len(confidences), 4) if confidences else 0.0
return {
"raw_text": "\n\n".join(t for t in page_texts if t).strip(),
"document_type": _detected_document_type(result),
"confidence": confidence,
"elements": elements,
"pages": pages,
"tables": tables,
}
6. 全部つなぐ — 非同期 analyze() と遅延 import
最後に、ここまでを 1 つの analyze() にまとめます。
注意したいのは、OCI SDK の呼び出しは ブロッキングだということです。FastAPI などの async アプリにそのまま置くとイベントループを止めてしまうので、asyncio.to_thread() で別スレッドに逃がします。
また、SDK クライアントは 遅延 import + 遅延生成にしておき、コンストラクタで document_client / storage_client を差し込めるようにしておきます。こうするとテストで fake を注入でき、実 OCI なしで決定論的に検証できます。
service.py — analyze() 本体と遅延クライアント生成
import asyncio
import importlib
import logging
logger = logging.getLogger(__name__)
class DocumentUnderstandingService:
def __init__(self, config, *, document_client=None, storage_client=None):
self._config = config
self._document_client = document_client
self._storage_client = storage_client
def is_configured(self) -> bool:
return self._config.is_configured()
async def analyze(self, source_bytes: bytes, *, content_type: str, document_id: str) -> dict | None:
"""1 文書を DU で解析し、自前スキーマの payload を返す。失敗時は None。"""
if not self.is_configured():
logger.info("DU 未設定のため縮退します。")
return None
try:
job_id = await asyncio.to_thread(
submit_job, self._config, self._documents(), self._storage(),
source_bytes, content_type, document_id,
)
except Exception as exc: # 失敗時は安全に縮退
logger.warning("DU job 投入に失敗しました。", extra={"error": str(exc)})
return None
try:
await asyncio.to_thread(await_job, self._config, self._documents(), job_id)
result = await asyncio.to_thread(read_first_result, self._config, self._storage(), job_id)
except Exception as exc:
logger.warning("DU 完了待ち/結果取得に失敗しました。",
extra={"error": str(exc), "job_id": job_id})
return None
return result_to_payload(result) if result is not None else None
# --- 遅延 SDK 構築(テストでは注入済みクライアントを使う)---
def _documents(self):
if self._document_client is None:
oci_config = importlib.import_module("oci.config")
ai_document = importlib.import_module("oci.ai_document")
config = load_oci_config_without_prompt(
oci_config, self._config.oci_config_file, self._config.oci_config_profile,
region=self._config.oci_region.strip() or None,
)
self._document_client = ai_document.AIServiceDocumentClient(config)
return self._document_client
def _storage(self):
if self._storage_client is None:
oci_config = importlib.import_module("oci.config")
object_storage = importlib.import_module("oci.object_storage")
config = load_oci_config_without_prompt(
oci_config, self._config.oci_config_file, self._config.oci_config_profile,
region=self._config.object_storage_region.strip() or None,
)
self._storage_client = object_storage.ObjectStorageClient(config)
return self._storage_client
使い方はシンプルです。
import asyncio
config = DocumentUnderstandingConfig.from_env()
service = DocumentUnderstandingService(config)
with open("invoice.pdf", "rb") as f:
source = f.read()
payload = asyncio.run(
service.analyze(source, content_type="application/pdf", document_id="invoice-001")
)
print(payload["document_type"]) # 例: "INVOICE"
print(payload["raw_text"][:200])
print(len(payload["tables"]), "tables")
7. テスト — fake SDK を注入して決定論検証
実 OCI を叩かずにテストできるのが、クライアント注入方式の効きどころです。create_processor_job / get_processor_job / list_objects / get_object を持つ fake を用意し、get_processor_job がすぐ SUCCEEDED を返すようにすれば、remap までの全パスを高速かつ決定論的に検証できます。
test_service.py — fake SDK での検証
import asyncio
import json
import types
def _polygon(x1, y1, x2, y2):
return {"normalizedVertices": [
{"x": x1, "y": y1}, {"x": x2, "y": y1}, {"x": x2, "y": y2}, {"x": x1, "y": y2},
]}
DU_RESULT = {
"pages": [{
"pageNumber": 1,
"lines": [
{"text": "請求書", "boundingPolygon": _polygon(0.1, 0.05, 0.4, 0.10)},
{"text": "合計 1,200 円", "boundingPolygon": _polygon(0.1, 0.20, 0.6, 0.25)},
],
"words": [{"text": "請求書", "confidence": 0.99}],
"tables": [{
"headerRows": [{"cells": [
{"text": "品名", "rowIndex": 0, "columnIndex": 0,
"boundingPolygon": _polygon(0.1, 0.3, 0.3, 0.35)},
]}],
"bodyRows": [{"cells": [
{"text": "りんご", "rowIndex": 1, "columnIndex": 0},
]}],
}],
}],
"detectedDocumentTypes": [{"documentType": "INVOICE"}],
}
class FakeDocClient:
def create_processor_job(self, create_processor_job_details):
return types.SimpleNamespace(data=types.SimpleNamespace(id="job-1"))
def get_processor_job(self, job_id):
return types.SimpleNamespace(data=types.SimpleNamespace(lifecycle_state="SUCCEEDED"))
class FakeStorage:
def put_object(self, *a, **k):
return None
def list_objects(self, *a, **k):
obj = types.SimpleNamespace(name="document-understanding/output/job-1/result.json")
return types.SimpleNamespace(data=types.SimpleNamespace(objects=[obj]))
def get_object(self, *a, **k):
return types.SimpleNamespace(
data=types.SimpleNamespace(content=json.dumps(DU_RESULT).encode("utf-8"))
)
def test_analyze_remaps_lines_and_tables():
config = DocumentUnderstandingConfig(
compartment_id="ocid1.compartment.oc1..x",
namespace="ns", input_bucket="bucket", poll_interval_seconds=0,
)
service = DocumentUnderstandingService(
config, document_client=FakeDocClient(), storage_client=FakeStorage()
)
payload = asyncio.run(
service.analyze(b"%PDF-1.4", content_type="application/pdf", document_id="doc-1")
)
assert payload["document_type"] == "INVOICE"
assert "請求書" in payload["raw_text"]
assert payload["tables"] and payload["tables"][0]["cells"][0]["text"] == "品名"
assert payload["elements"][0]["bbox"] == [0.1, 0.05, 0.4, 0.10]
ハマりどころまとめ
実装中に踏みやすいポイントを最後にまとめます。
-
language は BCP-47。
JPNではなくja。ここは alias テーブルで吸収しておくと事故らない。 -
出力先は
output_prefix/<job_id>/配下。output_location に渡した prefix の直下ではなく、job ID のサブディレクトリに JSON が入る点に注意。 - 座標は normalizedVertices(0〜1 比率)。絶対座標は返ってこないので、表示側で必ずページ寸法と掛け合わせる前提で持つ。
- 暗号化 PEM の対話プロンプト。サーバー/コンテナでハングする原因になるので、クライアント生成前にガードする。
-
SDK 呼び出しはブロッキング。async アプリでは
asyncio.to_thread()でイベントループを止めない。 -
未設定・失敗時は縮退。
Noneを返して後段で別手段にフォールバックできるようにしておくと、運用が安定する。
まとめ
OCI Document Understanding を実務に組み込むときの要点は、次の通りでした。
- 同期 API ではなく 非同期 processor job(投入 → poll → Object Storage から結果取得)で組む。
- 結果 JSON は camelCase の独自スキーマなので、決定論的な remap で扱いやすい自前スキーマへ写す。
- 認証の非対話化・ブロッキング呼び出しの隔離・クライアント注入によるテスト容易性を最初から入れておくと、後がとても楽になる。
特に、remap を LLM などに任せず純粋なマッピングにしておくと、再現性とテスト性が一気に上がります。帳票 OCR を OCI で組む方の参考になれば幸いです。