0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OCI Document Understanding を非同期ジョブで呼んで、日本語帳票を自前の構造化 JSON にする

0
Posted at

はじめに

PDF や画像の帳票(請求書・申請書など)から、テキストと表をまとめて取り出したい場面はよくあります。

このとき OCI には Document Understandingoci.ai_document)というサービスがあり、日本語の OCR と表抽出を高い精度で行えます。

ただ、いざ実装しようとすると、API が同期ではなく 非同期の processor job であること、入出力が Object Storage 経由であること、結果 JSON が camelCase の独自スキーマであることなど、地味にハマるポイントがいくつかあります。

この記事では、OCI Document Understanding を 1 つの文書解析関数として組み込むための実装パターンを、実務目線で整理します。フレームワークには依存しない、そのまま参考にできる形でコードも置きます。

対象は「OCI で帳票 OCR をやりたいが、processor job の扱い方が分からない」「OCR 結果を自前のスキーマに正規化して後段に渡したい」という方です。

結論

先に結論をまとめます。実装で押さえるべきは次の 6 点です。

  • 同期 API はないcreate_processor_job で投入 → get_processor_job を poll → 完了後に Object Storage から結果 JSON を読む、という非同期フローで組む。
  • 入出力は Object Storage 経由。入力ファイルを put し、出力は output_prefix/<job_id>/...json に書き出される。
  • language は BCP-47 に正規化するJPN のような ISO 3 文字ではなく ja を渡す。
  • 結果 JSON は camelCase の独自スキーマpages[].lines[].textpages[].tables[].headerRows/bodyRows を自前のスキーマへ 決定論的に remap する。
  • 座標は boundingPolygon.normalizedVertices(0〜1 の比率)で返ってくる。そのまま xyxy に正規化して持つ。
  • OCI SDK の呼び出しは 遅延 import + 別スレッドで隔離し、暗号化 PEM の 対話プロンプトを事前に止める。未設定・失敗時は安全に縮退させる。

以降で、それぞれを順番に実装していきます。

全体の流れ

処理の流れは次の通りです。

入力ファイル (PDF / 画像)
   |
   | 1. put_object
   v
Object Storage  (input bucket / input_prefix)
   |
   | 2. create_processor_job (features, language)
   v
OCI Document Understanding (非同期 processor job)
   |
   | 3. poll: get_processor_job -> lifecycle_state
   v
SUCCEEDED
   |
   | 4. list_objects + get_object
   v
Object Storage  (output bucket / output_prefix / <job_id> / *.json)
   |
   | 5. 結果 JSON を自前スキーマへ remap
   v
構造化 JSON (raw_text / elements / tables / pages)

ポイントは、OCI 側は「投入」と「結果取得」が完全に分離していることです。job を投入したら ID を受け取り、その ID を使って状態を poll し、完了したら output prefix 配下の JSON を拾いに行く、という 3 段構えになります。

前提として、SDK は pip install oci、認証は ~/.oci/config(API キー方式)が用意済みとします。

なぜ非同期 processor job なのか

OCI Document Understanding には、1 リクエストで即レスポンスを返す同期 API(インラインの analyze-document)もありますが、運用で使うなら processor job 一択だと考えています。

理由は次の通りです。

  • 複数ページ・複数ファイルをまとめて投げられる。input_location に複数 object を指定できる。
  • 入出力が Object Storage で完結するので、大きい PDF でも HTTP ペイロードに乗せ替えなくてよい。
  • 結果が JSON として永続化されるので、後から再取得・再処理がしやすい。

その代わり、「投入して、待って、取りに行く」というライフサイクル管理を自分で書く必要があります。ここが実装の本体です。

実装

ここからコードです。長いので折りたたみにしています。順番に置いていけば、そのまま 1 つのサービスクラスになります。

1. 設定(Config)

まず、job 投入に必要な非機密設定を 1 つの dataclass にまとめます。compartment / namespace / バケットといった「環境ごとに変わるが秘密ではない」値だけを持たせ、認証情報そのものは持たせないのがコツです。

入力バケットが未指定なら Object Storage の汎用バケットへ、出力バケットが未指定なら入力バケットへ fallback する、という解決ロジックも入れておくと運用が楽になります。

config.py — 設定 dataclass と env からの構築
from __future__ import annotations

import json
import os
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field

_DEFAULT_FEATURES = ("DOCUMENT_TEXT_EXTRACTION", "TABLE_EXTRACTION")


def _float(value: str, default: float) -> float:
    try:
        return float(value)
    except (TypeError, ValueError):
        return default


@dataclass(frozen=True)
class DocumentUnderstandingConfig:
    """DU 実行に必要な非機密設定(認証情報は持たない)。"""

    compartment_id: str = ""
    fallback_compartment_id: str = ""
    namespace: str = ""
    fallback_namespace: str = ""
    input_bucket: str = ""
    fallback_input_bucket: str = ""
    output_bucket: str = ""
    input_prefix: str = "document-understanding/input"
    output_prefix: str = "document-understanding/output"
    language: str = "ja"
    features: Sequence[str] = field(default_factory=lambda: list(_DEFAULT_FEATURES))
    poll_interval_seconds: float = 5.0
    timeout_seconds: float = 600.0
    oci_config_file: str = "~/.oci/config"
    oci_config_profile: str = "DEFAULT"
    oci_region: str = ""
    object_storage_region: str = ""

    # --- 設定解決(空欄は fallback へ寄せる)---
    def resolve_compartment_id(self) -> str:
        return self.compartment_id.strip() or self.fallback_compartment_id.strip()

    def resolve_namespace(self) -> str:
        return self.namespace.strip() or self.fallback_namespace.strip()

    def resolve_input_bucket(self) -> str:
        return self.input_bucket.strip() or self.fallback_input_bucket.strip()

    def resolve_output_bucket(self) -> str:
        return self.output_bucket.strip() or self.resolve_input_bucket()

    def is_configured(self) -> bool:
        """compartment / namespace / 入力バケットが揃って初めて job を投入できる。"""
        return bool(
            self.resolve_compartment_id()
            and self.resolve_namespace()
            and self.resolve_input_bucket()
        )

    @classmethod
    def from_env(cls, env: Mapping[str, str] | None = None) -> "DocumentUnderstandingConfig":
        src = os.environ if env is None else env

        def _get(name: str, default: str = "") -> str:
            return str(src.get(name, default) or default)

        # features は JSON 配列 or カンマ区切りの両方を許容しておくと便利
        raw = _get("DU_FEATURES").strip()
        if raw:
            try:
                parsed = json.loads(raw)
                features = [str(x) for x in parsed] if isinstance(parsed, list) else []
            except (json.JSONDecodeError, ValueError):
                features = [p.strip() for p in raw.split(",") if p.strip()]
            features = features or list(_DEFAULT_FEATURES)
        else:
            features = list(_DEFAULT_FEATURES)

        return cls(
            compartment_id=_get("DU_COMPARTMENT_ID"),
            fallback_compartment_id=_get("OCI_COMPARTMENT_ID"),
            namespace=_get("DU_NAMESPACE"),
            fallback_namespace=_get("OBJECT_STORAGE_NAMESPACE"),
            input_bucket=_get("DU_INPUT_BUCKET"),
            fallback_input_bucket=_get("OBJECT_STORAGE_BUCKET"),
            output_bucket=_get("DU_OUTPUT_BUCKET"),
            input_prefix=_get("DU_INPUT_PREFIX", "document-understanding/input"),
            output_prefix=_get("DU_OUTPUT_PREFIX", "document-understanding/output"),
            language=_get("DU_LANGUAGE", "ja"),
            features=features,
            poll_interval_seconds=_float(_get("DU_POLL_INTERVAL_SECONDS"), 5.0),
            timeout_seconds=_float(_get("DU_TIMEOUT_SECONDS"), 600.0),
            oci_config_file=_get("OCI_CONFIG_FILE", "~/.oci/config"),
            oci_config_profile=_get("OCI_CONFIG_PROFILE", "DEFAULT"),
            oci_region=_get("OCI_REGION"),
            object_storage_region=_get("OCI_REGION") or _get("OBJECT_STORAGE_REGION"),
        )

2. OCI 認証を非対話でロードする(地味な必須対策)

ここは見落としがちですが、運用で確実に効いてくる対策です。

OCI SDK の from_file() で読んだ config をクライアントに渡したとき、API 秘密鍵の PEM が暗号化されていてパスフレーズが未設定だと、SDK が標準入力からパスフレーズを聞きに来ます。サーバープロセスやコンテナの中でこれをやられると、入力する相手がいないままハングします。

なので「クライアントを作る前に、暗号化 PEM かどうかを自前で覗いて、パスフレーズ未設定なら明示的なエラーで止める」というガードを入れておきます。

oci_auth.py — 暗号化 PEM の対話プロンプトを事前に止める
from collections.abc import Mapping
from pathlib import Path
from typing import Any

PASSPHRASE_KEYS = frozenset({"pass_phrase", "passphrase", "key_password"})

_PASSPHRASE_REQUIRED = (
    "OCI API 秘密鍵 PEM が暗号化されています。"
    " pass_phrase を OCI config に設定するか、パスフレーズなしの PEM を使ってください。"
)


class OciPrivateKeyPassPhraseRequiredError(RuntimeError):
    """SDK が pass phrase を対話入力する前に止めるための例外。"""


def load_oci_config_without_prompt(
    oci_config_module: Any,
    config_file: str,
    profile: str,
    *,
    region: str | None = None,
) -> dict[str, Any]:
    """OCI config を読み、暗号化 PEM の対話プロンプトを事前に防ぐ。"""
    config_path = Path(config_file).expanduser()
    config = dict(oci_config_module.from_file(str(config_path), profile))
    if region:
        config["region"] = region
    _assert_key_loadable_without_prompt(config, config_path)
    return config


def _assert_key_loadable_without_prompt(config: Mapping[str, object], config_file: Path) -> None:
    key_file = str(config.get("key_file", "") or "").strip()
    if not key_file or _has_passphrase(config):
        return
    path = Path(key_file).expanduser()
    if not path.is_absolute():
        path = Path(config_file).expanduser().parent / path
    if _pem_is_encrypted(path):
        raise OciPrivateKeyPassPhraseRequiredError(_PASSPHRASE_REQUIRED)


def _pem_is_encrypted(path: Path) -> bool:
    """PEM の暗号化マーカーを先頭だけ読んで検出する。"""
    try:
        head = path.read_bytes()[:4096]
    except OSError:
        return False
    text = head.decode("utf-8", errors="ignore").upper()
    return "BEGIN ENCRYPTED PRIVATE KEY" in text or "PROC-TYPE: 4,ENCRYPTED" in text


def _has_passphrase(config: Mapping[str, object]) -> bool:
    return any(str(config.get(k, "") or "").strip() for k in PASSPHRASE_KEYS)

3. processor job を投入する

入力ファイルを Object Storage に put してから、create_processor_job を呼びます。

ここで重要なのが processor_config の組み立てです。features には抽出したい機能(テキスト抽出・表抽出・key-value 抽出)のクラスインスタンスを並べ、language には BCP-47 形式を渡します。JPN のような ISO 639-2 の 3 文字コードを渡すと弾かれることがあるので、ja に正規化しておきます。

job 投入 — put_object + create_processor_job + language 正規化
import importlib
import mimetypes

# 設定 feature 名 -> SDK feature クラス名。未知の値は無視する。
_FEATURE_CLASS_BY_NAME = {
    "DOCUMENT_TEXT_EXTRACTION": "DocumentTextExtractionFeature",
    "TEXT_EXTRACTION": "DocumentTextExtractionFeature",
    "TABLE_EXTRACTION": "DocumentTableExtractionFeature",
    "DOCUMENT_TABLE_EXTRACTION": "DocumentTableExtractionFeature",
    "KEY_VALUE_EXTRACTION": "DocumentKeyValueExtractionFeature",
}
_LANGUAGE_ALIASES = {"JPN": "ja", "JP": "ja", "JA_JP": "ja-JP"}


def _bcp47_language(value: str) -> str:
    raw = value.strip()
    if not raw:
        return "ja"
    key = raw.replace("-", "_").upper()
    if key in _LANGUAGE_ALIASES:
        return _LANGUAGE_ALIASES[key]
    parts = [p for p in raw.replace("_", "-").split("-") if p]
    if not parts:
        return "ja"
    language = parts[0].lower()
    subtags = [p.upper() if len(p) == 2 else p for p in parts[1:]]
    return "-".join([language, *subtags])


def _input_object_name(input_prefix: str, document_id: str, content_type: str) -> str:
    prefix = input_prefix.strip().strip("/")
    mime = content_type.split(";")[0].strip() if content_type else ""
    ext = mimetypes.guess_extension(mime) if mime else None
    safe_id = "".join(c for c in document_id if c.isalnum() or c in "-_") or "document"
    name = f"{safe_id}{ext or ''}"
    return f"{prefix}/{name}" if prefix else name


def _build_features(models, feature_names) -> list:
    features = []
    for name in feature_names:
        class_name = _FEATURE_CLASS_BY_NAME.get(str(name).strip().upper())
        cls = getattr(models, class_name, None) if class_name else None
        if cls is not None:
            features.append(cls())
    if not features:
        features.append(models.DocumentTextExtractionFeature())
    return features


def submit_job(config, document_client, storage_client,
               source_bytes: bytes, content_type: str, document_id: str) -> str:
    """入力を put し、processor job を作成して job id を返す(同期)。"""
    models = importlib.import_module("oci.ai_document.models")
    namespace = config.resolve_namespace()
    input_bucket = config.resolve_input_bucket()
    object_name = _input_object_name(config.input_prefix, document_id, content_type)

    storage_client.put_object(
        namespace, input_bucket, object_name, source_bytes,
        content_type=content_type or "application/octet-stream",
    )

    details = models.CreateProcessorJobDetails(
        compartment_id=config.resolve_compartment_id(),
        input_location=models.ObjectStorageLocations(
            object_locations=[
                models.ObjectLocation(
                    namespace_name=namespace,
                    bucket_name=input_bucket,
                    object_name=object_name,
                )
            ]
        ),
        output_location=models.OutputLocation(
            namespace_name=namespace,
            bucket_name=config.resolve_output_bucket(),
            prefix=config.output_prefix.strip(),
        ),
        processor_config=models.GeneralProcessorConfig(
            features=_build_features(models, config.features),
            language=_bcp47_language(config.language),
        ),
    )
    response = document_client.create_processor_job(create_processor_job_details=details)
    job = getattr(response, "data", response)
    job_id = getattr(job, "id", None)
    if not job_id:
        raise RuntimeError("create_processor_job が job id を返しませんでした。")
    return str(job_id)

4. 完了を待って、結果 JSON を読む

job ID が取れたら、get_processor_joblifecycle_state を poll します。SUCCEEDED で抜け、FAILED / CANCELED または timeout なら例外にして縮退させます。

完了後の結果は、output_prefix/<job_id>/ 配下に .json として書き出されるので、list_objects で列挙して最初の JSON を get_object で読みます。SDK 版・テスト用 fake 版の両方で本文を取り出せるよう、content / raw のどちらにも対応しておきます。

完了待ち(poll)と結果 JSON の読み込み
import json
import time

_SUCCEEDED = {"SUCCEEDED"}
_TERMINAL_FAILURE = {"FAILED", "CANCELED", "CANCELLED"}


def await_job(config, document_client, job_id: str) -> None:
    """SUCCEEDED まで poll する。失敗 / timeout は例外で縮退させる。"""
    interval = float(config.poll_interval_seconds)
    deadline = time.monotonic() + float(config.timeout_seconds)
    while True:
        response = document_client.get_processor_job(job_id)
        job = getattr(response, "data", response)
        state = str(getattr(job, "lifecycle_state", "") or "").upper()
        if state in _SUCCEEDED:
            return
        if state in _TERMINAL_FAILURE:
            detail = str(getattr(job, "lifecycle_details", "") or "")
            raise RuntimeError(f"DU job が失敗しました (state={state} {detail}).")
        if time.monotonic() >= deadline:
            raise TimeoutError("DU job が制限時間内に完了しませんでした。")
        time.sleep(interval)


def _response_bytes(response) -> bytes | None:
    data = getattr(response, "data", response)
    raw = getattr(data, "content", None)
    if raw is None:
        raw = getattr(data, "raw", None)
    if raw is None:
        raw = data
    if isinstance(raw, str):
        return raw.encode("utf-8")
    if isinstance(raw, (bytes, bytearray)):
        return bytes(raw)
    return None


def read_first_result(config, storage_client, job_id: str) -> dict | None:
    """output_prefix/<job_id>/ 配下の最初の結果 JSON を読む(同期)。"""
    namespace = config.resolve_namespace()
    output_bucket = config.resolve_output_bucket()
    prefix = "/".join(p for p in (config.output_prefix.strip().strip("/"), job_id) if p)

    listed = storage_client.list_objects(namespace, output_bucket, prefix=prefix)
    objects = getattr(getattr(listed, "data", listed), "objects", []) or []
    for entry in objects:
        name = str(getattr(entry, "name", "") or "")
        if not name.endswith(".json"):
            continue
        raw = _response_bytes(storage_client.get_object(namespace, output_bucket, name))
        if raw is None:
            continue
        try:
            parsed = json.loads(raw)
        except (json.JSONDecodeError, ValueError):
            continue
        if isinstance(parsed, dict):
            return parsed
    return None

5. 結果 JSON を自前スキーマへ remap する

ここがこの記事で一番伝えたいところです。

DU の結果 JSON は camelCase の独自スキーマで、pages[].lines[].text(行テキスト)、pages[].tables[].headerRows / bodyRows / footerRows(表)、pages[].words[].confidence(信頼度)、detectedDocumentTypes[].documentType(文書種別)といった構造になっています。

これを そのまま後段で使うのはつらいので、raw_text / elements / tables / pages という扱いやすい自前スキーマへ 決定論的に変換します。LLM などを挟まず、純粋なマッピングだけで写すのがポイントです(再現性が出る・テストしやすい・速い)。

座標は boundingPolygon.normalizedVertices という 0〜1 の比率の頂点列で返ってきます。頂点列を xyxy(左上・右下)に正規化して持っておくと、後で「引用箇所をプレビューでハイライトする」といった用途に使えます。

remap — 結果 JSON を raw_text / elements / tables / pages へ写す
import math
from collections.abc import Mapping, Sequence


def _as_seq(value) -> Sequence:
    return value if isinstance(value, (list, tuple)) else ()


def _as_int(value, *, default: int) -> int:
    if isinstance(value, bool):
        return default
    if isinstance(value, int):
        return value
    if isinstance(value, float):
        return int(value)
    return default


def normalize_bbox(vertices) -> list[float] | None:
    """normalizedVertices ([{x, y}, ...]) を [x1, y1, x2, y2] へ正規化する。"""
    if not isinstance(vertices, list) or not vertices:
        return None
    xs, ys = [], []
    for v in vertices:
        if not isinstance(v, Mapping):
            return None
        x, y = v.get("x"), v.get("y")
        if not isinstance(x, (int, float)) or not isinstance(y, (int, float)):
            return None
        xs.append(float(x))
        ys.append(float(y))
    coords = [min(xs), min(ys), max(xs), max(ys)]
    return coords if all(math.isfinite(c) for c in coords) else None


def _bounding_polygon(obj: Mapping) -> list | None:
    polygon = obj.get("boundingPolygon")
    if isinstance(polygon, Mapping):
        nv = polygon.get("normalizedVertices")
        if isinstance(nv, list) and nv:
            return nv
    return None


def _remap_table_cells(table: Mapping) -> list[dict]:
    cells = []
    for row_key in ("headerRows", "bodyRows", "footerRows"):
        for row in _as_seq(table.get(row_key)):
            if not isinstance(row, Mapping):
                continue
            for cell in _as_seq(row.get("cells")):
                if not isinstance(cell, Mapping):
                    continue
                payload = {
                    "row": max(_as_int(cell.get("rowIndex"), default=0), 0),
                    "col": max(_as_int(cell.get("columnIndex"), default=0), 0),
                    "text": str(cell.get("text", "")).strip(),
                    "row_span": max(_as_int(cell.get("rowSpan"), default=1), 1),
                    "col_span": max(_as_int(cell.get("columnSpan"), default=1), 1),
                }
                bbox = normalize_bbox(_bounding_polygon(cell))
                if bbox is not None:
                    payload["bbox"] = bbox
                cells.append(payload)
    return cells


def _detected_document_type(result: Mapping) -> str:
    for entry in _as_seq(result.get("detectedDocumentTypes")):
        if isinstance(entry, Mapping):
            doc_type = str(entry.get("documentType", "")).strip()
            if doc_type:
                return doc_type
    return "ドキュメント"


def result_to_payload(result: Mapping) -> dict:
    """DU の結果 JSON を扱いやすい自前スキーマへ決定論で remap する。"""
    page_texts, pages, elements, tables, confidences = [], [], [], [], []
    table_counter = 0
    order = 0

    for index, page in enumerate(_as_seq(result.get("pages"))):
        if not isinstance(page, Mapping):
            continue
        page_number = _as_int(page.get("pageNumber"), default=index + 1)
        pages.append({"page_number": page_number})

        line_texts = []
        for line in _as_seq(page.get("lines")):
            if not isinstance(line, Mapping):
                continue
            text = str(line.get("text", "")).strip()
            if not text:
                continue
            line_texts.append(text)
            element = {"kind": "text", "text": text, "order": order, "page_number": page_number}
            bbox = normalize_bbox(_bounding_polygon(line))
            if bbox is not None:
                element["bbox"] = bbox
            elements.append(element)
            order += 1
        if line_texts:
            page_texts.append("\n".join(line_texts))

        for word in _as_seq(page.get("words")):
            if isinstance(word, Mapping) and isinstance(word.get("confidence"), (int, float)):
                confidences.append(float(word["confidence"]))

        for table in _as_seq(page.get("tables")):
            if not isinstance(table, Mapping):
                continue
            table_counter += 1
            tables.append({
                "table_id": f"du-table-{table_counter}",
                "page_number": page_number,
                "cells": _remap_table_cells(table),
            })

    confidence = round(sum(confidences) / len(confidences), 4) if confidences else 0.0
    return {
        "raw_text": "\n\n".join(t for t in page_texts if t).strip(),
        "document_type": _detected_document_type(result),
        "confidence": confidence,
        "elements": elements,
        "pages": pages,
        "tables": tables,
    }

6. 全部つなぐ — 非同期 analyze() と遅延 import

最後に、ここまでを 1 つの analyze() にまとめます。

注意したいのは、OCI SDK の呼び出しは ブロッキングだということです。FastAPI などの async アプリにそのまま置くとイベントループを止めてしまうので、asyncio.to_thread() で別スレッドに逃がします。

また、SDK クライアントは 遅延 import + 遅延生成にしておき、コンストラクタで document_client / storage_client を差し込めるようにしておきます。こうするとテストで fake を注入でき、実 OCI なしで決定論的に検証できます。

service.py — analyze() 本体と遅延クライアント生成
import asyncio
import importlib
import logging

logger = logging.getLogger(__name__)


class DocumentUnderstandingService:
    def __init__(self, config, *, document_client=None, storage_client=None):
        self._config = config
        self._document_client = document_client
        self._storage_client = storage_client

    def is_configured(self) -> bool:
        return self._config.is_configured()

    async def analyze(self, source_bytes: bytes, *, content_type: str, document_id: str) -> dict | None:
        """1 文書を DU で解析し、自前スキーマの payload を返す。失敗時は None。"""
        if not self.is_configured():
            logger.info("DU 未設定のため縮退します。")
            return None
        try:
            job_id = await asyncio.to_thread(
                submit_job, self._config, self._documents(), self._storage(),
                source_bytes, content_type, document_id,
            )
        except Exception as exc:  # 失敗時は安全に縮退
            logger.warning("DU job 投入に失敗しました。", extra={"error": str(exc)})
            return None
        try:
            await asyncio.to_thread(await_job, self._config, self._documents(), job_id)
            result = await asyncio.to_thread(read_first_result, self._config, self._storage(), job_id)
        except Exception as exc:
            logger.warning("DU 完了待ち/結果取得に失敗しました。",
                           extra={"error": str(exc), "job_id": job_id})
            return None
        return result_to_payload(result) if result is not None else None

    # --- 遅延 SDK 構築(テストでは注入済みクライアントを使う)---
    def _documents(self):
        if self._document_client is None:
            oci_config = importlib.import_module("oci.config")
            ai_document = importlib.import_module("oci.ai_document")
            config = load_oci_config_without_prompt(
                oci_config, self._config.oci_config_file, self._config.oci_config_profile,
                region=self._config.oci_region.strip() or None,
            )
            self._document_client = ai_document.AIServiceDocumentClient(config)
        return self._document_client

    def _storage(self):
        if self._storage_client is None:
            oci_config = importlib.import_module("oci.config")
            object_storage = importlib.import_module("oci.object_storage")
            config = load_oci_config_without_prompt(
                oci_config, self._config.oci_config_file, self._config.oci_config_profile,
                region=self._config.object_storage_region.strip() or None,
            )
            self._storage_client = object_storage.ObjectStorageClient(config)
        return self._storage_client

使い方はシンプルです。

import asyncio

config = DocumentUnderstandingConfig.from_env()
service = DocumentUnderstandingService(config)

with open("invoice.pdf", "rb") as f:
    source = f.read()

payload = asyncio.run(
    service.analyze(source, content_type="application/pdf", document_id="invoice-001")
)
print(payload["document_type"])   # 例: "INVOICE"
print(payload["raw_text"][:200])
print(len(payload["tables"]), "tables")

7. テスト — fake SDK を注入して決定論検証

実 OCI を叩かずにテストできるのが、クライアント注入方式の効きどころです。create_processor_job / get_processor_job / list_objects / get_object を持つ fake を用意し、get_processor_job がすぐ SUCCEEDED を返すようにすれば、remap までの全パスを高速かつ決定論的に検証できます。

test_service.py — fake SDK での検証
import asyncio
import json
import types


def _polygon(x1, y1, x2, y2):
    return {"normalizedVertices": [
        {"x": x1, "y": y1}, {"x": x2, "y": y1}, {"x": x2, "y": y2}, {"x": x1, "y": y2},
    ]}


DU_RESULT = {
    "pages": [{
        "pageNumber": 1,
        "lines": [
            {"text": "請求書", "boundingPolygon": _polygon(0.1, 0.05, 0.4, 0.10)},
            {"text": "合計 1,200 円", "boundingPolygon": _polygon(0.1, 0.20, 0.6, 0.25)},
        ],
        "words": [{"text": "請求書", "confidence": 0.99}],
        "tables": [{
            "headerRows": [{"cells": [
                {"text": "品名", "rowIndex": 0, "columnIndex": 0,
                 "boundingPolygon": _polygon(0.1, 0.3, 0.3, 0.35)},
            ]}],
            "bodyRows": [{"cells": [
                {"text": "りんご", "rowIndex": 1, "columnIndex": 0},
            ]}],
        }],
    }],
    "detectedDocumentTypes": [{"documentType": "INVOICE"}],
}


class FakeDocClient:
    def create_processor_job(self, create_processor_job_details):
        return types.SimpleNamespace(data=types.SimpleNamespace(id="job-1"))

    def get_processor_job(self, job_id):
        return types.SimpleNamespace(data=types.SimpleNamespace(lifecycle_state="SUCCEEDED"))


class FakeStorage:
    def put_object(self, *a, **k):
        return None

    def list_objects(self, *a, **k):
        obj = types.SimpleNamespace(name="document-understanding/output/job-1/result.json")
        return types.SimpleNamespace(data=types.SimpleNamespace(objects=[obj]))

    def get_object(self, *a, **k):
        return types.SimpleNamespace(
            data=types.SimpleNamespace(content=json.dumps(DU_RESULT).encode("utf-8"))
        )


def test_analyze_remaps_lines_and_tables():
    config = DocumentUnderstandingConfig(
        compartment_id="ocid1.compartment.oc1..x",
        namespace="ns", input_bucket="bucket", poll_interval_seconds=0,
    )
    service = DocumentUnderstandingService(
        config, document_client=FakeDocClient(), storage_client=FakeStorage()
    )
    payload = asyncio.run(
        service.analyze(b"%PDF-1.4", content_type="application/pdf", document_id="doc-1")
    )
    assert payload["document_type"] == "INVOICE"
    assert "請求書" in payload["raw_text"]
    assert payload["tables"] and payload["tables"][0]["cells"][0]["text"] == "品名"
    assert payload["elements"][0]["bbox"] == [0.1, 0.05, 0.4, 0.10]

ハマりどころまとめ

実装中に踏みやすいポイントを最後にまとめます。

  • language は BCP-47JPN ではなく ja。ここは alias テーブルで吸収しておくと事故らない。
  • 出力先は output_prefix/<job_id>/ 配下。output_location に渡した prefix の直下ではなく、job ID のサブディレクトリに JSON が入る点に注意。
  • 座標は normalizedVertices(0〜1 比率)。絶対座標は返ってこないので、表示側で必ずページ寸法と掛け合わせる前提で持つ。
  • 暗号化 PEM の対話プロンプト。サーバー/コンテナでハングする原因になるので、クライアント生成前にガードする。
  • SDK 呼び出しはブロッキング。async アプリでは asyncio.to_thread() でイベントループを止めない。
  • 未設定・失敗時は縮退None を返して後段で別手段にフォールバックできるようにしておくと、運用が安定する。

まとめ

OCI Document Understanding を実務に組み込むときの要点は、次の通りでした。

  • 同期 API ではなく 非同期 processor job(投入 → poll → Object Storage から結果取得)で組む。
  • 結果 JSON は camelCase の独自スキーマなので、決定論的な remap で扱いやすい自前スキーマへ写す。
  • 認証の非対話化・ブロッキング呼び出しの隔離・クライアント注入によるテスト容易性を最初から入れておくと、後がとても楽になる。

特に、remap を LLM などに任せず純粋なマッピングにしておくと、再現性とテスト性が一気に上がります。帳票 OCR を OCI で組む方の参考になれば幸いです。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?