0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【RAG実装入門】Word/Excel/PowerPointを検索可能に!

Last updated at Posted at 2025-09-18

この記事は全面的にLLMを用いて作成しています。
動作確認した上での記録です。

FAISS+BM25+ローカルLLMで自前のナレッジベースを作る

はじめに

社内の共有フォルダやNASには、膨大なWord/Excel/PowerPointファイルが眠っています。
「必要な情報がどこにあるかわからない」「検索しても目的の資料がヒットしない」…
そんな悩みを解消するために、RAG(Retrieval Augmented Generation) を自前で構築してみました。

この記事では、以下の構成で「ローカルに完結する文書検索システム」を実装する手順を紹介します。

  • LibreOffice を利用したOffice文書のテキスト抽出
  • チャンク化+ノイズ除去で検索精度を高める前処理
  • 埋め込み生成FAISS でベクトル検索
  • BM25 を併用したハイブリッド検索
  • LLMに渡すプロンプト生成まで

最終的には、ローカルLLM(例: gpt-oss-20b)に繋げれば、
自分だけのChatGPTライクな検索エンジンが完成します。

注意
本記事のコードは、社内・個人のローカル環境での利用を想定しています。
実際の社内資料や個人情報は含まないダミーデータで試してください。


全体像


0. 環境など(ここだけ人間が記述)

この記事はchatGPT-5でコード作成、自分の環境で動作確認した後にchatGPT-5にまとめさせたものです。
データベースソフトは自分が慣れていないので、使わずRAGやLLMの使い方、出来そうなことを知るのが目的です。
ワードのdocファイルは、ライブラリの都合上docxに変換する必要があります。

オフィス文書の内、doc,docxだけを処理するプロトタイプです。

windowsPC
venv仮想環境でpython3.12
VScode利用(エディター内ターミナルはパワーシェルのvenv環境)
利用ライブラリ

requirements.txt
python-docx==1.1.2
tqdm==4.66.4
pandas==2.2.2
pywin32==306
# 埋め込みベクトル作成用
sentence-transformers
faiss-cpu
numpy
tqdm
# 訳語・略語に強化
rank-bm25

1. Office文書 → テキスト抽出

まずはWord/Excel/PowerPointをテキスト化します。
LibreOfficeをヘッドレスモードで呼び出すと、Officeが無くても安定して変換可能です。

# LibreOfficeの場所確認(Windows)
"C:\Program Files\LibreOffice\program\soffice.exe" --version

Pythonコード:chunk_office_safe.py

以下の処理を行います。

  • LibreOfficeでTXT変換
  • 不要な空白や全角スペースを削除(位置合わせスペースも除去)
  • チャンク化(重複オーバーラップ付き)
  • chunks.jsonl 形式で保存
# 主要ポイントのみ抜粋
def normalize_text_jp(s: str) -> str:
    """ 
    日本語文書向けの軽量クリーニング:
      - CRLF→LF
      - 全角スペース→半角
      - 行頭/行末の空白除去
      - 連続スペース/タブの圧縮
      - 連続空行を1つに
    """
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = s.replace("\u3000", " ")
    lines = []
    for line in s.split("\n"):
        line = line.strip()
        line = re.sub(r"[ \t]+", " ", line)
        lines.append(line)
    return "\n".join(lines).strip()
▶ コード全文を見る
chunk_office_safe.py
# chunk_office_safe.py
import argparse, os, subprocess, json, time, re
from pathlib import Path
from datetime import datetime
from typing import Dict, Iterator, Optional

# ===== 固定パス(LibreOffice) =====
SOFFICE = r"C:\Program Files\LibreOffice\program\soffice.exe"

# ===== 動作パラメータ =====
MAX_SRC_MB = 100                 # これ以上の .doc/.docx は変換スキップ
SLEEP_BETWEEN = 0.2              # 各ファイル処理間の小休止(I/Oスパイク抑制)
TXT_FILTER = "txt:Text"          # LibreOfficeのTXT出力フィルタ
READ_CHARS_PER_STEP = 1_000_000  # テキスト読み込み単位(文字)
MAX_CHUNKS_PER_FILE = 100_000    # 異常系の暴走防止
OUTPUT_JSONL_ENCODING = "utf-8-sig"  # JSONLはBOM付きUTF-8

# ===== 空白・改行 正規化 =====
_WS_MULTI_RE = re.compile(r"[ \t\u3000]{2,}")  # 半角/全角スペース/タブの連続
_EOL_SPACE_RE = re.compile(r"[ \t]+(?=\r?\n)")  # 行末の空白
_EMPTYLINES_RE = re.compile(r"(?:\r?\n){3,}")   # 3行以上の連続空行

def normalize_text_jp(s: str) -> str:
    """
    日本語文書向けの軽量クリーニング:
      - CRLF→LF
      - 全角スペース→半角
      - 行頭/行末の空白除去
      - 連続スペース/タブ/全角スペースの圧縮(1つに)
      - 3連以上の空行を1つに圧縮(段落は保つ)
    """
    if not s:
        return s
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = s.replace("\u3000", " ")
    # 行単位で前後空白を削りつつ内部の長連続空白を圧縮
    lines = []
    for line in s.split("\n"):
        line = line.strip()                        # 行頭/末の空白を排除(位置合わせスペースを除去)
        line = _WS_MULTI_RE.sub(" ", line)         # 連続空白を1つに
        lines.append(line)
    s = "\n".join(lines)
    s = _EOL_SPACE_RE.sub("", s)                   # 行末の空白を削除
    s = _EMPTYLINES_RE.sub("\n\n", s)              # 空行は最大1つに
    return s.strip()

def ensure_soffice() -> bool:
    if not os.path.isfile(SOFFICE):
        print(f"[ERROR] soffice not found: {SOFFICE}")
        return False
    return True

def soffice_convert(src: Path, out_dir: Path, filter_str: str) -> Optional[Path]:
    out_dir.mkdir(parents=True, exist_ok=True)
    dst = out_dir / (src.stem + ".txt")
    cmd = [SOFFICE, "--headless", "--convert-to", filter_str, "--outdir", str(out_dir), str(src)]
    res = subprocess.run(cmd, capture_output=True, text=True)
    if res.returncode == 0 and dst.exists() and dst.stat().st_size > 0:
        return dst
    print(f"[WARN] convert fail: {src} code={res.returncode} stderr={res.stderr.strip()[:200]}")
    return None

def human_mb(n: int) -> float:
    try:
        return n / (1024 * 1024)
    except Exception:
        return 0.0

def stream_chunks_from_file(txt_path: Path, encoding: str, max_chars: int, overlap: int) -> Iterator[str]:
    """
    テキストをencodingで開き、max_chars上限でオーバーラップ付きに分割して逐次yield。
    分割の直前に normalize_text_jp を適用し、不要空白と余計な改行を除去。
    """
    buf = ""
    produced = 0
    errors_mode = "strict"

    def yield_piece(piece: str):
        nonlocal produced
        piece = normalize_text_jp(piece)
        if piece:
            yield piece
            produced += 1

    try:
        with txt_path.open("r", encoding=encoding, errors=errors_mode, newline="") as f:
            while True:
                chunk = f.read(READ_CHARS_PER_STEP)
                if not chunk:
                    break
                buf += chunk
                while len(buf) >= max_chars:
                    cut = buf.rfind("\n", 0, max_chars)
                    if cut == -1 or cut < int(max_chars * 0.6):
                        cut = max_chars
                    piece = buf[:cut]
                    for y in yield_piece(piece):
                        yield y
                    if produced > MAX_CHUNKS_PER_FILE:
                        return
                    start_next = max(0, cut - overlap)
                    buf = buf[start_next:]
    except UnicodeDecodeError:
        errors_mode = "replace"
        with txt_path.open("r", encoding=encoding, errors=errors_mode, newline="") as f:
            while True:
                chunk = f.read(READ_CHARS_PER_STEP)
                if not chunk:
                    break
                buf += chunk
                while len(buf) >= max_chars:
                    cut = buf.rfind("\n", 0, max_chars)
                    if cut == -1 or cut < int(max_chars * 0.6):
                        cut = max_chars
                    piece = buf[:cut]
                    for y in yield_piece(piece):
                        yield y
                    if produced > MAX_CHUNKS_PER_FILE:
                        return
                    start_next = max(0, cut - overlap)
                    buf = buf[start_next:]

    last = buf
    piece = normalize_text_jp(last)
    if piece:
        yield piece

def detect_encoding(txt_path: Path) -> str:
    with txt_path.open("rb") as fb:
        head = fb.read(128 * 1024)
    if head.startswith(b"\xef\xbb\xbf"):
        return "utf-8-sig"
    if head.startswith(b"\xff\xfe"):
        return "utf-16-le"
    if head.startswith(b"\xfe\xff"):
        return "utf-16-be"
    try:
        head.decode("utf-8", errors="strict")
        return "utf-8"
    except UnicodeDecodeError:
        pass
    try:
        head.decode("cp932", errors="strict")
        return "cp932"
    except UnicodeDecodeError:
        pass
    return "cp932"

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True)
    ap.add_argument("--out", required=True)
    ap.add_argument("--max-chars", type=int, default=1400)
    ap.add_argument("--overlap", type=int, default=200)
    args = ap.parse_args()

    in_dir = Path(args.input)
    out_dir = Path(args.out); out_dir.mkdir(parents=True, exist_ok=True)
    conv_dir = out_dir / "_txt"; conv_dir.mkdir(parents=True, exist_ok=True)

    if not ensure_soffice():
        return

    targets = [p for p in in_dir.rglob("*")
               if p.is_file() and p.suffix.lower() in (".doc", ".docx")]
    filtered = []
    for p in targets:
        try:
            size_mb = human_mb(p.stat().st_size)
        except Exception:
            size_mb = 0
        if size_mb > MAX_SRC_MB:
            print(f"[SKIP] large file > {MAX_SRC_MB}MB: {p} ({size_mb:.1f}MB)")
            continue
        filtered.append(p)

    print(f"[INFO] files: {len(targets)}  filtered: {len(filtered)}  out={conv_dir}")

    # 1) 変換
    txt_files = []
    for i, src in enumerate(filtered, 1):
        print(f"[CONVERT] {i}/{len(filtered)} {src}")
        dst = soffice_convert(src, conv_dir, TXT_FILTER)
        if dst:
            txt_files.append(dst)
        time.sleep(SLEEP_BETWEEN)

    # 2) JSONL 逐次書き出し
    jsonl_path = out_dir / "chunks.jsonl"
    summary_counts: Dict[str, int] = {}

    with jsonl_path.open("w", encoding=OUTPUT_JSONL_ENCODING, newline="\n") as jf:
        for i, txt in enumerate(txt_files, 1):
            enc = detect_encoding(txt)
            print(f"[CHUNK] {i}/{len(txt_files)} {txt.name}  (encoding={enc})")
            chunk_idx = 0
            try:
                for piece in stream_chunks_from_file(txt, enc, args.max_chars, args.overlap):
                    record = {
                        "file_path": str(txt),
                        "file_name": txt.name,
                        "ext": ".txt",
                        "heading": None,
                        "chunk_index": chunk_idx,
                        "text": piece,
                        "n_chars": len(piece),
                        "approx_tokens": max(1, len(piece)//2),
                    }
                    jf.write(json.dumps(record, ensure_ascii=False) + "\n")
                    chunk_idx += 1
            except MemoryError:
                print(f"[WARN] MemoryError while chunking: {txt}. Skipped remainder.")
                continue
            except Exception as e:
                print(f"[WARN] chunk error ({txt}): {e}. Skipped remainder.")
                continue

            summary_counts[txt.name] = chunk_idx

    # 3) summary.csv(件数のみ)
    try:
        import pandas as pd
        rows = [{"file_name": k, "chunks": v} for k, v in summary_counts.items()]
        from datetime import datetime as _dt
        df = pd.DataFrame(rows)
        df["generated_at"] = _dt.now().isoformat(timespec="seconds")
        df.to_csv(out_dir / "summary.csv", index=False, encoding="utf-8-sig")
    except Exception as e:
        print(f"[WARN] failed to write summary.csv: {e}")

    print(f"[DONE] JSONL: {jsonl_path}")
    print(f"[DONE] CSV:   {out_dir/'summary.csv'}")

if __name__ == "__main__":
    main()

# 使い方 python .\chunk_office_safe.py --input .\tests --out .\output

実行例:

python .\chunk_office_safe.py --input .\tests --out .\output

結果フォルダ output/ には次が生成されます。

  • chunks.jsonl …… チャンク化された本文データ
  • _txt/ …… LibreOfficeが生成した中間TXT
  • summary.csv …… ファイルごとのチャンク数サマリ

2. ベクトル検索(FAISS)

次に、テキストチャンクを埋め込みベクトル化してFAISSに登録します。
モデルは日本語にも強い BAAI/bge-m3 を使用します。

pip install sentence-transformers faiss-cpu numpy tqdm
python .\build_faiss_index.py --input .\output\chunks.jsonl --outdir .\index --model BAAI/bge-m3
▶ コード全文を見る
build_faiss_index.py
import argparse, json, os, faiss, numpy as np
from pathlib import Path
from typing import Dict, Iterable, List
from tqdm import tqdm

def batched(it: Iterable[dict], n: int):
    buf = []
    for r in it:
        buf.append(r)
        if len(buf) >= n:
            yield buf
            buf = []
    if buf:
        yield buf

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True, help="chunks.jsonl(UTF-8/BOM可)")
    ap.add_argument("--outdir", required=True, help="出力フォルダ")
    ap.add_argument("--model", default="BAAI/bge-m3")
    ap.add_argument("--batch", type=int, default=256)
    ap.add_argument("--normalize", action="store_true", default=True)
    args = ap.parse_args()

    in_path = Path(args.input)
    out_dir = Path(args.outdir); out_dir.mkdir(parents=True, exist_ok=True)

    # 出力ファイル(すべて BOMなし UTF-8)
    faiss_path = out_dir / "vectors.faiss"
    meta_path  = out_dir / "index_meta.jsonl"          # 行ごとのメタ(本文プレビューのみ)
    offs_path  = out_dir / "index_meta.offsets.npy"    # 各行の「開始位置」配列(int64, 0起点)
    info_path  = out_dir / "index_info.json"

    # モデル
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer(args.model)
    dim = model.get_sentence_embedding_dimension()

    # FAISS(内積。正規化すればコサイン)
    index = faiss.IndexFlatIP(dim)

    # メタは「バイナリ」で開き、start = f.tell() をオフセットとして保存
    meta_f = meta_path.open("wb")
    offsets: List[int] = []

    def gen_records():
        # 入力は UTF-8(BOMあり/なし) どちらでもOK
        with in_path.open("r", encoding="utf-8-sig", errors="strict") as f:
            for line in f:
                rec = json.loads(line)
                txt = rec.get("text", "")
                if not txt:
                    continue
                yield rec

    total = 0
    for batch in batched(gen_records(), args.batch):
        texts = [r["text"] for r in batch]
        embs = model.encode(texts, batch_size=args.batch,
                            normalize_embeddings=True if args.normalize else False)
        if isinstance(embs, list):
            embs = np.array(embs, dtype="float32")
        else:
            embs = embs.astype("float32", copy=False)
        index.add(embs)

        for r in batch:
            preview = r.get("text", "").replace("\r", " ").replace("\n", " ")
            if len(preview) > 200:
                preview = preview[:200] + " ..."
            meta = {
                "file_path": r.get("file_path"),
                "file_name": r.get("file_name"),
                "ext": r.get("ext"),
                "heading": r.get("heading"),
                "chunk_index": r.get("chunk_index"),
                "n_chars": r.get("n_chars"),
                "approx_tokens": r.get("approx_tokens"),
                "preview": preview
            }
            # 書く前に開始位置を採取(BOMなしUTF-8)
            start = meta_f.tell()
            s = (json.dumps(meta, ensure_ascii=False) + "\n").encode("utf-8")
            meta_f.write(s)
            offsets.append(start)
        total += len(batch)

    meta_f.flush(); meta_f.close()
    np.save(offs_path, np.array(offsets, dtype=np.int64))
    faiss.write_index(index, str(faiss_path))

    info: Dict = {
        "model": args.model,
        "dim": int(dim),
        "normalize": bool(args.normalize),
        "count": int(total),
        "faiss_index": faiss_path.name,
        "meta_file": meta_path.name,
        "offsets_file": offs_path.name,
        "encoding": "utf-8"  # BOMなし
    }
    info_path.write_text(json.dumps(info, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"[DONE] vectors={total} dim={dim}")
    print(f"[OUT] {faiss_path}")
    print(f"[OUT] {meta_path}")
    print(f"[OUT] {offs_path}")
    print(f"[OUT] {info_path}")

if __name__ == "__main__":
    main()


3. 文章検索(BM25)

ベクトル検索は意味的な近さを見つけますが、
日本語の固有名詞などは単語一致が有効な場合もあります。

そこで、従来型のBM25も構築してハイブリッド検索を行います。

pip install rank-bm25
python .\build_bm25_index.py --input .\output\chunks.jsonl --out .\index\bm25.pkl
▶ コード全文を見る
build_bm25_index.py
import argparse, json, pickle
from pathlib import Path
from typing import List
from tqdm import tqdm

# 日本語でも効く簡易トークナイザ(文字bi-gram+英数語)
import re
def tokenize(text: str) -> List[str]:
    text = text.strip()
    if not text:
        return []
    # 英数語(ASCIIの連続)はそのまま単語化
    words = re.findall(r"[A-Za-z0-9_]+", text)
    # それ以外は文字bi-gram(日本語向け、句読点は除外)
    s = re.sub(r"[\s\r\n\t]", "", text)
    s = re.sub(r"[、。,.,.・//\-–—::;;\(\)\[\]{}<>「」『』“”\"\']", "", s)
    bigrams = [s[i:i+2] for i in range(len(s)-1)] if len(s) >= 2 else ([s] if s else [])
    return words + bigrams

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--input", required=True, help="chunks.jsonl(UTF-8/BOM可)")
    ap.add_argument("--out", required=True, help="出力 pickle(例 .\\index\\bm25.pkl)")
    ap.add_argument("--max-docs", type=int, default=0, help="先頭N件だけ学習(0=全件)")
    args = ap.parse_args()

    inp = Path(args.input)
    outp = Path(args.out)
    outp.parent.mkdir(parents=True, exist_ok=True)

    docs_tokens: List[List[str]] = []
    meta_min: List[dict] = []

    with inp.open("r", encoding="utf-8-sig", errors="strict") as f:
        for i, line in enumerate(tqdm(f, desc="Reading chunks.jsonl")):
            rec = json.loads(line)
            text = rec.get("text", "")
            if not text:
                continue
            docs_tokens.append(tokenize(text))
            # 参照に必要な最小メタだけ保持(ファイル名・行番号)
            meta_min.append({
                "line_no": i,  # chunks.jsonl 上の行番号(0起点)
                "file_name": rec.get("file_name"),
                "chunk_index": rec.get("chunk_index"),
            })
            if args.max_docs and len(docs_tokens) >= args.max_docs:
                break

    # Pickle 化(DB不要)
    payload = {
        "docs_tokens": docs_tokens,   # List[List[str]]
        "meta_min": meta_min,         # List[dict]
        "source_jsonl": str(inp.resolve())
    }
    with outp.open("wb") as pf:
        pickle.dump(payload, pf)
    print(f"[DONE] BM25 corpus saved: {outp}  docs={len(docs_tokens)}")

if __name__ == "__main__":
    main()


4. ハイブリッド検索&RRF融合

  • FAISS で「意味検索」上位を取得
  • BM25 で「単語一致」上位を取得
  • RRF(Reciprocal Rank Fusion)」で順位を融合

5. LLM用プロンプト生成

最後に、Top-kチャンクをまとめてRAGプロンプトを作成します。
このプロンプトをローカルLLMに入力すれば、参照元付きで回答が得られます。

python .\prepare_prompt.py `
  --chunks .\output\chunks.jsonl `
  --indexdir .\index `
  --bm25 .\index\bm25.pkl `
  --query "偏光フィルタの特性と測定手順" `
  --topk 10 `
  --context-chars 6000 `
  --per-chunk-max 1200
▶ コード全文を見る
prepare_prompt.py
import argparse
import json
import pickle
import faiss
import numpy as np
import re
from pathlib import Path
from typing import Dict, List, Tuple
from rank_bm25 import BM25Okapi

# ===== プロンプト用 クリーニング =====
_WS_MULTI_RE = re.compile(r"[ \t\u3000]{2,}")
_EMPTYLINES_RE = re.compile(r"(?:\r?\n){3,}")
_EOL_SPACE_RE = re.compile(r"[ \t]+(?=\r?\n)")

def clean_for_prompt(s: str) -> str:
    if not s:
        return s
    s = s.replace("\r\n", "\n").replace("\r", "\n")
    s = s.replace("\u3000", " ")
    # 行単位で前後空白を削り、連続空白を圧縮
    lines = []
    for line in s.split("\n"):
        line = line.strip()
        line = _WS_MULTI_RE.sub(" ", line)
        lines.append(line)
    s = "\n".join(lines)
    s = _EOL_SPACE_RE.sub("", s)
    # 3つ以上の空行は1つに(段落は保つ)
    s = _EMPTYLINES_RE.sub("\n\n", s)
    # 先頭・末尾の空行を除去
    return s.strip()

# ===== 日本語向けの軽量トークナイザ(BM25用)=====
import re as _re
def tokenize(text: str) -> List[str]:
    text = (text or "").strip()
    if not text:
        return []
    words = _re.findall(r"[A-Za-z0-9_]+", text)
    s = _re.sub(r"[\s\r\n\t]", "", text)
    s = _re.sub(r"[、。,.,.・//\\\-–—::;;\(\)\[\]{}<>「」『』“”\"\']", "", s)
    bigrams = [s[i:i+2] for i in range(len(s)-1)] if len(s) >= 2 else ([s] if s else [])
    return words + bigrams

# ===== 付帯ユーティリティ =====
def load_info(indexdir: Path) -> Dict:
    return json.loads((indexdir / "index_info.json").read_text(encoding="utf-8"))

def open_meta(indexdir: Path):
    f = (indexdir / "index_meta.jsonl").open("rb")
    offs = np.load(indexdir / "index_meta.offsets.npy")
    return f, offs

def read_meta_line(f, offsets: np.ndarray, i: int) -> Dict:
    if i < 0 or i >= len(offsets):
        return {}
    start = int(offsets[i])
    f.seek(start)
    line = f.readline()
    if not line:
        return {}
    return json.loads(line.decode("utf-8"))

def read_chunk_text_by_lineno(chunks_jsonl: Path, line_no: int) -> Dict:
    with chunks_jsonl.open("r", encoding="utf-8-sig") as f:
        for i, line in enumerate(f):
            if i == line_no:
                return json.loads(line)
    return {}

def rrf_fusion(faiss_list: List[int], bm25_list: List[int], k: int = 60, topn: int = 20) -> List[int]:
    from collections import defaultdict
    score = defaultdict(float)
    for rank, idx in enumerate(faiss_list):
        score[int(idx)] += 1.0 / (k + rank + 1)
    for rank, idx in enumerate(bm25_list):
        score[int(idx)] += 1.0 / (k + rank + 1)
    fused = sorted(score.items(), key=lambda x: x[1], reverse=True)
    return [idx for idx, _ in fused[:topn]]

def trim_text(s: str, limit: int) -> str:
    if len(s) <= limit:
        return s
    return s[:limit] + ""

def build_prompt(query: str, contexts: List[Tuple[Dict, Dict]], max_chars: int) -> str:
    header = (
        "あなたは技術文書を要約・参照するアシスタントです。以下の「コンテキスト」だけを根拠に、"
        "質問に日本語で簡潔に回答し、最後に必ず参照元一覧(ファイル名とチャンク番号)を示してください。\n"
        "推測は避け、根拠が足りない場合は『不明』と答えてください。\n"
    )
    qline = f"\n【質問】\n{clean_for_prompt(query)}\n"
    # コンテキストは文書ごとに1行空ける。文書内の連続空行は圧縮済み。
    ctx_lines = ["\n【コンテキスト】"]
    for i, (meta, rec) in enumerate(contexts, start=1):
        fn = rec.get("file_name")
        ci = rec.get("chunk_index")
        ctx = clean_for_prompt(rec.get("text", ""))
        ctx_lines.append(f"--- 文書{i}{fn}(chunk#{ci}) ---\n{ctx}")
    ctx_block = "\n\n".join(ctx_lines) + "\n"
    instruct = (
        "\n【出力フォーマット】\n"
        "回答本文。\n"
        "\n"
        "【参照元】\n"
        "- <ファイル名> chunk#<番号>\n"
        "- ...\n"
    )
    draft = clean_for_prompt(header + qline + ctx_block + instruct)
    if len(draft) <= max_chars:
        return draft
    # 超過時はそのまま(選抜側で制御している想定)
    return draft

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--chunks", required=True)
    ap.add_argument("--indexdir", required=True)
    ap.add_argument("--bm25", required=True)
    ap.add_argument("--query", required=True)
    ap.add_argument("--topk", type=int, default=10)
    ap.add_argument("--context-chars", type=int, default=6000)
    ap.add_argument("--per-chunk-max", type=int, default=1200)
    args = ap.parse_args()

    chunks_jsonl = Path(args.chunks)
    indexdir = Path(args.indexdir)
    bm25_pkl = Path(args.bm25)

    # === FAISS ===
    info = load_info(indexdir)
    index = faiss.read_index(str(indexdir / info["faiss_index"]))
    from sentence_transformers import SentenceTransformer
    model = SentenceTransformer(info["model"])
    meta_f, offsets = open_meta(indexdir)

    # === BM25 ===
    payload = pickle.loads(bm25_pkl.read_bytes())
    docs_tokens: List[List[str]] = payload["docs_tokens"]
    meta_min: List[dict] = payload["meta_min"]
    bm25 = BM25Okapi(docs_tokens)

    # === 検索 ===
    q_emb = model.encode([args.query], normalize_embeddings=bool(info.get("normalize", True))).astype("float32")
    D, I = index.search(q_emb, max(args.topk * 5, 20))
    faiss_list = [int(x) for x in I[0] if x >= 0]

    qtok = tokenize(args.query)
    scores = bm25.get_scores(qtok)
    bm25_rank = np.argsort(scores)[::-1]
    bm25_list = list(bm25_rank[:max(len(faiss_list), args.topk * 5)])

    fused_idx = rrf_fusion(faiss_list, bm25_list, k=60, topn=max(args.topk * 3, 30))

    # === 選抜(文字数予算&1チャンクあたりの上限)===
    picked: List[Tuple[Dict, Dict]] = []
    budget = args.context_chars
    for idx in fused_idx:
        if idx < 0 or idx >= len(offsets):
            continue
        meta = read_meta_line(meta_f, offsets, idx)

        line_no = meta_min[idx]["line_no"] if idx < len(meta_min) else idx
        rec = read_chunk_text_by_lineno(chunks_jsonl, line_no)
        if not rec:
            continue

        text = clean_for_prompt(rec.get("text", ""))
        if len(text) > args.per_chunk_max:
            text = trim_text(text, args.per_chunk_max)
        rec["text"] = text

        need = len(text) + 200
        if need > budget and picked:
            break

        picked.append((meta, rec))
        budget -= need
        if len(picked) >= args.topk:
            break

    prompt = build_prompt(args.query, picked, args.context_chars)

    # === 保存 ===
    outdir = Path("prompts"); outdir.mkdir(exist_ok=True)
    (outdir / "latest_prompt.txt").write_text(prompt, encoding="utf-8")
    bundle = {
        "query": args.query,
        "prompt": prompt,
        "citations": [
            {
                "file_name": r.get("file_name"),
                "chunk_index": r.get("chunk_index"),
                "file_path": r.get("file_path"),
            }
            for (_, r) in picked
        ],
        "model_hint": info["model"],
    }
    (outdir / "latest_prompt.json").write_text(json.dumps(bundle, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"[DONE] prompts/latest_prompt.txt")
    print(f"[DONE] prompts/latest_prompt.json")

if __name__ == "__main__":
    main()

生成物:

  • prompts/latest_prompt.txt …… LLMにそのまま入力
  • prompts/latest_prompt.json …… 構造化データ(出典メタ付き)

6. ローカルLLMに接続する

ここまでの流れで準備した latest_prompt.txt
ローカルLLM(例: gpt-oss-20b)に入力するだけです。


まとめ

ステップ ツール 出力
Office→TXT LibreOffice _txt/*.txt
正規化&チャンク化 Python chunks.jsonl
埋め込み生成 SentenceTransformers + FAISS index/vectors.faiss
キーワード検索 BM25 index/bm25.pkl
プロンプト生成 Python prompts/latest_prompt.txt

おわりに

RAGは「LLMの頭脳」と「検索エンジンの記憶」を組み合わせる強力なアプローチです。
完全ローカルでナレッジ活用ができるため、安心して社内で運用できます。

終わりに(人間が記述)

LLMで初めてのことにも挑戦しやすくなってきました。
今後は、
・ワード以外の文書も処理する。
・データベースを利用する。
・図や写真も検索対象とする。CLIP等を使う?
などの課題があると思います。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?