この記事は全面的にLLMを用いて作成しています。
動作確認した上での記録です。
FAISS+BM25+ローカルLLMで自前のナレッジベースを作る
はじめに
社内の共有フォルダやNASには、膨大なWord/Excel/PowerPointファイルが眠っています。
「必要な情報がどこにあるかわからない」「検索しても目的の資料がヒットしない」…
そんな悩みを解消するために、RAG(Retrieval Augmented Generation) を自前で構築してみました。
この記事では、以下の構成で「ローカルに完結する文書検索システム」を実装する手順を紹介します。
- LibreOffice を利用したOffice文書のテキスト抽出
- チャンク化+ノイズ除去で検索精度を高める前処理
- 埋め込み生成+FAISS でベクトル検索
- BM25 を併用したハイブリッド検索
- LLMに渡すプロンプト生成まで
最終的には、ローカルLLM(例: gpt-oss-20b
)に繋げれば、
自分だけのChatGPTライクな検索エンジンが完成します。
⚠ 注意
本記事のコードは、社内・個人のローカル環境での利用を想定しています。
実際の社内資料や個人情報は含まないダミーデータで試してください。
全体像
0. 環境など(ここだけ人間が記述)
この記事はchatGPT-5でコード作成、自分の環境で動作確認した後にchatGPT-5にまとめさせたものです。
データベースソフトは自分が慣れていないので、使わずRAGやLLMの使い方、出来そうなことを知るのが目的です。
ワードのdocファイルは、ライブラリの都合上docxに変換する必要があります。
オフィス文書の内、doc,docxだけを処理するプロトタイプです。
windowsPC
venv仮想環境でpython3.12
VScode利用(エディター内ターミナルはパワーシェルのvenv環境)
利用ライブラリ
python-docx==1.1.2
tqdm==4.66.4
pandas==2.2.2
pywin32==306
# 埋め込みベクトル作成用
sentence-transformers
faiss-cpu
numpy
tqdm
# 訳語・略語に強化
rank-bm25
1. Office文書 → テキスト抽出
まずはWord/Excel/PowerPointをテキスト化します。
LibreOfficeをヘッドレスモードで呼び出すと、Officeが無くても安定して変換可能です。
# LibreOfficeの場所確認(Windows)
"C:\Program Files\LibreOffice\program\soffice.exe" --version
Pythonコード:chunk_office_safe.py
以下の処理を行います。
- LibreOfficeでTXT変換
- 不要な空白や全角スペースを削除(位置合わせスペースも除去)
- チャンク化(重複オーバーラップ付き)
-
chunks.jsonl
形式で保存
# 主要ポイントのみ抜粋
def normalize_text_jp(s: str) -> str:
"""
日本語文書向けの軽量クリーニング:
- CRLF→LF
- 全角スペース→半角
- 行頭/行末の空白除去
- 連続スペース/タブの圧縮
- 連続空行を1つに
"""
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = s.replace("\u3000", " ")
lines = []
for line in s.split("\n"):
line = line.strip()
line = re.sub(r"[ \t]+", " ", line)
lines.append(line)
return "\n".join(lines).strip()
▶ コード全文を見る
# chunk_office_safe.py
import argparse, os, subprocess, json, time, re
from pathlib import Path
from datetime import datetime
from typing import Dict, Iterator, Optional
# ===== 固定パス(LibreOffice) =====
SOFFICE = r"C:\Program Files\LibreOffice\program\soffice.exe"
# ===== 動作パラメータ =====
MAX_SRC_MB = 100 # これ以上の .doc/.docx は変換スキップ
SLEEP_BETWEEN = 0.2 # 各ファイル処理間の小休止(I/Oスパイク抑制)
TXT_FILTER = "txt:Text" # LibreOfficeのTXT出力フィルタ
READ_CHARS_PER_STEP = 1_000_000 # テキスト読み込み単位(文字)
MAX_CHUNKS_PER_FILE = 100_000 # 異常系の暴走防止
OUTPUT_JSONL_ENCODING = "utf-8-sig" # JSONLはBOM付きUTF-8
# ===== 空白・改行 正規化 =====
_WS_MULTI_RE = re.compile(r"[ \t\u3000]{2,}") # 半角/全角スペース/タブの連続
_EOL_SPACE_RE = re.compile(r"[ \t]+(?=\r?\n)") # 行末の空白
_EMPTYLINES_RE = re.compile(r"(?:\r?\n){3,}") # 3行以上の連続空行
def normalize_text_jp(s: str) -> str:
"""
日本語文書向けの軽量クリーニング:
- CRLF→LF
- 全角スペース→半角
- 行頭/行末の空白除去
- 連続スペース/タブ/全角スペースの圧縮(1つに)
- 3連以上の空行を1つに圧縮(段落は保つ)
"""
if not s:
return s
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = s.replace("\u3000", " ")
# 行単位で前後空白を削りつつ内部の長連続空白を圧縮
lines = []
for line in s.split("\n"):
line = line.strip() # 行頭/末の空白を排除(位置合わせスペースを除去)
line = _WS_MULTI_RE.sub(" ", line) # 連続空白を1つに
lines.append(line)
s = "\n".join(lines)
s = _EOL_SPACE_RE.sub("", s) # 行末の空白を削除
s = _EMPTYLINES_RE.sub("\n\n", s) # 空行は最大1つに
return s.strip()
def ensure_soffice() -> bool:
if not os.path.isfile(SOFFICE):
print(f"[ERROR] soffice not found: {SOFFICE}")
return False
return True
def soffice_convert(src: Path, out_dir: Path, filter_str: str) -> Optional[Path]:
out_dir.mkdir(parents=True, exist_ok=True)
dst = out_dir / (src.stem + ".txt")
cmd = [SOFFICE, "--headless", "--convert-to", filter_str, "--outdir", str(out_dir), str(src)]
res = subprocess.run(cmd, capture_output=True, text=True)
if res.returncode == 0 and dst.exists() and dst.stat().st_size > 0:
return dst
print(f"[WARN] convert fail: {src} code={res.returncode} stderr={res.stderr.strip()[:200]}")
return None
def human_mb(n: int) -> float:
try:
return n / (1024 * 1024)
except Exception:
return 0.0
def stream_chunks_from_file(txt_path: Path, encoding: str, max_chars: int, overlap: int) -> Iterator[str]:
"""
テキストをencodingで開き、max_chars上限でオーバーラップ付きに分割して逐次yield。
分割の直前に normalize_text_jp を適用し、不要空白と余計な改行を除去。
"""
buf = ""
produced = 0
errors_mode = "strict"
def yield_piece(piece: str):
nonlocal produced
piece = normalize_text_jp(piece)
if piece:
yield piece
produced += 1
try:
with txt_path.open("r", encoding=encoding, errors=errors_mode, newline="") as f:
while True:
chunk = f.read(READ_CHARS_PER_STEP)
if not chunk:
break
buf += chunk
while len(buf) >= max_chars:
cut = buf.rfind("\n", 0, max_chars)
if cut == -1 or cut < int(max_chars * 0.6):
cut = max_chars
piece = buf[:cut]
for y in yield_piece(piece):
yield y
if produced > MAX_CHUNKS_PER_FILE:
return
start_next = max(0, cut - overlap)
buf = buf[start_next:]
except UnicodeDecodeError:
errors_mode = "replace"
with txt_path.open("r", encoding=encoding, errors=errors_mode, newline="") as f:
while True:
chunk = f.read(READ_CHARS_PER_STEP)
if not chunk:
break
buf += chunk
while len(buf) >= max_chars:
cut = buf.rfind("\n", 0, max_chars)
if cut == -1 or cut < int(max_chars * 0.6):
cut = max_chars
piece = buf[:cut]
for y in yield_piece(piece):
yield y
if produced > MAX_CHUNKS_PER_FILE:
return
start_next = max(0, cut - overlap)
buf = buf[start_next:]
last = buf
piece = normalize_text_jp(last)
if piece:
yield piece
def detect_encoding(txt_path: Path) -> str:
with txt_path.open("rb") as fb:
head = fb.read(128 * 1024)
if head.startswith(b"\xef\xbb\xbf"):
return "utf-8-sig"
if head.startswith(b"\xff\xfe"):
return "utf-16-le"
if head.startswith(b"\xfe\xff"):
return "utf-16-be"
try:
head.decode("utf-8", errors="strict")
return "utf-8"
except UnicodeDecodeError:
pass
try:
head.decode("cp932", errors="strict")
return "cp932"
except UnicodeDecodeError:
pass
return "cp932"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--input", required=True)
ap.add_argument("--out", required=True)
ap.add_argument("--max-chars", type=int, default=1400)
ap.add_argument("--overlap", type=int, default=200)
args = ap.parse_args()
in_dir = Path(args.input)
out_dir = Path(args.out); out_dir.mkdir(parents=True, exist_ok=True)
conv_dir = out_dir / "_txt"; conv_dir.mkdir(parents=True, exist_ok=True)
if not ensure_soffice():
return
targets = [p for p in in_dir.rglob("*")
if p.is_file() and p.suffix.lower() in (".doc", ".docx")]
filtered = []
for p in targets:
try:
size_mb = human_mb(p.stat().st_size)
except Exception:
size_mb = 0
if size_mb > MAX_SRC_MB:
print(f"[SKIP] large file > {MAX_SRC_MB}MB: {p} ({size_mb:.1f}MB)")
continue
filtered.append(p)
print(f"[INFO] files: {len(targets)} filtered: {len(filtered)} out={conv_dir}")
# 1) 変換
txt_files = []
for i, src in enumerate(filtered, 1):
print(f"[CONVERT] {i}/{len(filtered)} {src}")
dst = soffice_convert(src, conv_dir, TXT_FILTER)
if dst:
txt_files.append(dst)
time.sleep(SLEEP_BETWEEN)
# 2) JSONL 逐次書き出し
jsonl_path = out_dir / "chunks.jsonl"
summary_counts: Dict[str, int] = {}
with jsonl_path.open("w", encoding=OUTPUT_JSONL_ENCODING, newline="\n") as jf:
for i, txt in enumerate(txt_files, 1):
enc = detect_encoding(txt)
print(f"[CHUNK] {i}/{len(txt_files)} {txt.name} (encoding={enc})")
chunk_idx = 0
try:
for piece in stream_chunks_from_file(txt, enc, args.max_chars, args.overlap):
record = {
"file_path": str(txt),
"file_name": txt.name,
"ext": ".txt",
"heading": None,
"chunk_index": chunk_idx,
"text": piece,
"n_chars": len(piece),
"approx_tokens": max(1, len(piece)//2),
}
jf.write(json.dumps(record, ensure_ascii=False) + "\n")
chunk_idx += 1
except MemoryError:
print(f"[WARN] MemoryError while chunking: {txt}. Skipped remainder.")
continue
except Exception as e:
print(f"[WARN] chunk error ({txt}): {e}. Skipped remainder.")
continue
summary_counts[txt.name] = chunk_idx
# 3) summary.csv(件数のみ)
try:
import pandas as pd
rows = [{"file_name": k, "chunks": v} for k, v in summary_counts.items()]
from datetime import datetime as _dt
df = pd.DataFrame(rows)
df["generated_at"] = _dt.now().isoformat(timespec="seconds")
df.to_csv(out_dir / "summary.csv", index=False, encoding="utf-8-sig")
except Exception as e:
print(f"[WARN] failed to write summary.csv: {e}")
print(f"[DONE] JSONL: {jsonl_path}")
print(f"[DONE] CSV: {out_dir/'summary.csv'}")
if __name__ == "__main__":
main()
# 使い方 python .\chunk_office_safe.py --input .\tests --out .\output
実行例:
python .\chunk_office_safe.py --input .\tests --out .\output
結果フォルダ output/
には次が生成されます。
-
chunks.jsonl
…… チャンク化された本文データ -
_txt/
…… LibreOfficeが生成した中間TXT -
summary.csv
…… ファイルごとのチャンク数サマリ
2. ベクトル検索(FAISS)
次に、テキストチャンクを埋め込みベクトル化してFAISSに登録します。
モデルは日本語にも強い BAAI/bge-m3 を使用します。
pip install sentence-transformers faiss-cpu numpy tqdm
python .\build_faiss_index.py --input .\output\chunks.jsonl --outdir .\index --model BAAI/bge-m3
▶ コード全文を見る
import argparse, json, os, faiss, numpy as np
from pathlib import Path
from typing import Dict, Iterable, List
from tqdm import tqdm
def batched(it: Iterable[dict], n: int):
buf = []
for r in it:
buf.append(r)
if len(buf) >= n:
yield buf
buf = []
if buf:
yield buf
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--input", required=True, help="chunks.jsonl(UTF-8/BOM可)")
ap.add_argument("--outdir", required=True, help="出力フォルダ")
ap.add_argument("--model", default="BAAI/bge-m3")
ap.add_argument("--batch", type=int, default=256)
ap.add_argument("--normalize", action="store_true", default=True)
args = ap.parse_args()
in_path = Path(args.input)
out_dir = Path(args.outdir); out_dir.mkdir(parents=True, exist_ok=True)
# 出力ファイル(すべて BOMなし UTF-8)
faiss_path = out_dir / "vectors.faiss"
meta_path = out_dir / "index_meta.jsonl" # 行ごとのメタ(本文プレビューのみ)
offs_path = out_dir / "index_meta.offsets.npy" # 各行の「開始位置」配列(int64, 0起点)
info_path = out_dir / "index_info.json"
# モデル
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(args.model)
dim = model.get_sentence_embedding_dimension()
# FAISS(内積。正規化すればコサイン)
index = faiss.IndexFlatIP(dim)
# メタは「バイナリ」で開き、start = f.tell() をオフセットとして保存
meta_f = meta_path.open("wb")
offsets: List[int] = []
def gen_records():
# 入力は UTF-8(BOMあり/なし) どちらでもOK
with in_path.open("r", encoding="utf-8-sig", errors="strict") as f:
for line in f:
rec = json.loads(line)
txt = rec.get("text", "")
if not txt:
continue
yield rec
total = 0
for batch in batched(gen_records(), args.batch):
texts = [r["text"] for r in batch]
embs = model.encode(texts, batch_size=args.batch,
normalize_embeddings=True if args.normalize else False)
if isinstance(embs, list):
embs = np.array(embs, dtype="float32")
else:
embs = embs.astype("float32", copy=False)
index.add(embs)
for r in batch:
preview = r.get("text", "").replace("\r", " ").replace("\n", " ")
if len(preview) > 200:
preview = preview[:200] + " ..."
meta = {
"file_path": r.get("file_path"),
"file_name": r.get("file_name"),
"ext": r.get("ext"),
"heading": r.get("heading"),
"chunk_index": r.get("chunk_index"),
"n_chars": r.get("n_chars"),
"approx_tokens": r.get("approx_tokens"),
"preview": preview
}
# 書く前に開始位置を採取(BOMなしUTF-8)
start = meta_f.tell()
s = (json.dumps(meta, ensure_ascii=False) + "\n").encode("utf-8")
meta_f.write(s)
offsets.append(start)
total += len(batch)
meta_f.flush(); meta_f.close()
np.save(offs_path, np.array(offsets, dtype=np.int64))
faiss.write_index(index, str(faiss_path))
info: Dict = {
"model": args.model,
"dim": int(dim),
"normalize": bool(args.normalize),
"count": int(total),
"faiss_index": faiss_path.name,
"meta_file": meta_path.name,
"offsets_file": offs_path.name,
"encoding": "utf-8" # BOMなし
}
info_path.write_text(json.dumps(info, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[DONE] vectors={total} dim={dim}")
print(f"[OUT] {faiss_path}")
print(f"[OUT] {meta_path}")
print(f"[OUT] {offs_path}")
print(f"[OUT] {info_path}")
if __name__ == "__main__":
main()
3. 文章検索(BM25)
ベクトル検索は意味的な近さを見つけますが、
日本語の固有名詞などは単語一致が有効な場合もあります。
そこで、従来型のBM25も構築してハイブリッド検索を行います。
pip install rank-bm25
python .\build_bm25_index.py --input .\output\chunks.jsonl --out .\index\bm25.pkl
▶ コード全文を見る
import argparse, json, pickle
from pathlib import Path
from typing import List
from tqdm import tqdm
# 日本語でも効く簡易トークナイザ(文字bi-gram+英数語)
import re
def tokenize(text: str) -> List[str]:
text = text.strip()
if not text:
return []
# 英数語(ASCIIの連続)はそのまま単語化
words = re.findall(r"[A-Za-z0-9_]+", text)
# それ以外は文字bi-gram(日本語向け、句読点は除外)
s = re.sub(r"[\s\r\n\t]", "", text)
s = re.sub(r"[、。,.,.・//\-–—::;;\(\)\[\]{}<>「」『』“”\"\']", "", s)
bigrams = [s[i:i+2] for i in range(len(s)-1)] if len(s) >= 2 else ([s] if s else [])
return words + bigrams
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--input", required=True, help="chunks.jsonl(UTF-8/BOM可)")
ap.add_argument("--out", required=True, help="出力 pickle(例 .\\index\\bm25.pkl)")
ap.add_argument("--max-docs", type=int, default=0, help="先頭N件だけ学習(0=全件)")
args = ap.parse_args()
inp = Path(args.input)
outp = Path(args.out)
outp.parent.mkdir(parents=True, exist_ok=True)
docs_tokens: List[List[str]] = []
meta_min: List[dict] = []
with inp.open("r", encoding="utf-8-sig", errors="strict") as f:
for i, line in enumerate(tqdm(f, desc="Reading chunks.jsonl")):
rec = json.loads(line)
text = rec.get("text", "")
if not text:
continue
docs_tokens.append(tokenize(text))
# 参照に必要な最小メタだけ保持(ファイル名・行番号)
meta_min.append({
"line_no": i, # chunks.jsonl 上の行番号(0起点)
"file_name": rec.get("file_name"),
"chunk_index": rec.get("chunk_index"),
})
if args.max_docs and len(docs_tokens) >= args.max_docs:
break
# Pickle 化(DB不要)
payload = {
"docs_tokens": docs_tokens, # List[List[str]]
"meta_min": meta_min, # List[dict]
"source_jsonl": str(inp.resolve())
}
with outp.open("wb") as pf:
pickle.dump(payload, pf)
print(f"[DONE] BM25 corpus saved: {outp} docs={len(docs_tokens)}")
if __name__ == "__main__":
main()
4. ハイブリッド検索&RRF融合
- FAISS で「意味検索」上位を取得
- BM25 で「単語一致」上位を取得
- RRF(Reciprocal Rank Fusion)」で順位を融合
5. LLM用プロンプト生成
最後に、Top-kチャンクをまとめてRAGプロンプトを作成します。
このプロンプトをローカルLLMに入力すれば、参照元付きで回答が得られます。
python .\prepare_prompt.py `
--chunks .\output\chunks.jsonl `
--indexdir .\index `
--bm25 .\index\bm25.pkl `
--query "偏光フィルタの特性と測定手順" `
--topk 10 `
--context-chars 6000 `
--per-chunk-max 1200
▶ コード全文を見る
import argparse
import json
import pickle
import faiss
import numpy as np
import re
from pathlib import Path
from typing import Dict, List, Tuple
from rank_bm25 import BM25Okapi
# ===== プロンプト用 クリーニング =====
_WS_MULTI_RE = re.compile(r"[ \t\u3000]{2,}")
_EMPTYLINES_RE = re.compile(r"(?:\r?\n){3,}")
_EOL_SPACE_RE = re.compile(r"[ \t]+(?=\r?\n)")
def clean_for_prompt(s: str) -> str:
if not s:
return s
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = s.replace("\u3000", " ")
# 行単位で前後空白を削り、連続空白を圧縮
lines = []
for line in s.split("\n"):
line = line.strip()
line = _WS_MULTI_RE.sub(" ", line)
lines.append(line)
s = "\n".join(lines)
s = _EOL_SPACE_RE.sub("", s)
# 3つ以上の空行は1つに(段落は保つ)
s = _EMPTYLINES_RE.sub("\n\n", s)
# 先頭・末尾の空行を除去
return s.strip()
# ===== 日本語向けの軽量トークナイザ(BM25用)=====
import re as _re
def tokenize(text: str) -> List[str]:
text = (text or "").strip()
if not text:
return []
words = _re.findall(r"[A-Za-z0-9_]+", text)
s = _re.sub(r"[\s\r\n\t]", "", text)
s = _re.sub(r"[、。,.,.・//\\\-–—::;;\(\)\[\]{}<>「」『』“”\"\']", "", s)
bigrams = [s[i:i+2] for i in range(len(s)-1)] if len(s) >= 2 else ([s] if s else [])
return words + bigrams
# ===== 付帯ユーティリティ =====
def load_info(indexdir: Path) -> Dict:
return json.loads((indexdir / "index_info.json").read_text(encoding="utf-8"))
def open_meta(indexdir: Path):
f = (indexdir / "index_meta.jsonl").open("rb")
offs = np.load(indexdir / "index_meta.offsets.npy")
return f, offs
def read_meta_line(f, offsets: np.ndarray, i: int) -> Dict:
if i < 0 or i >= len(offsets):
return {}
start = int(offsets[i])
f.seek(start)
line = f.readline()
if not line:
return {}
return json.loads(line.decode("utf-8"))
def read_chunk_text_by_lineno(chunks_jsonl: Path, line_no: int) -> Dict:
with chunks_jsonl.open("r", encoding="utf-8-sig") as f:
for i, line in enumerate(f):
if i == line_no:
return json.loads(line)
return {}
def rrf_fusion(faiss_list: List[int], bm25_list: List[int], k: int = 60, topn: int = 20) -> List[int]:
from collections import defaultdict
score = defaultdict(float)
for rank, idx in enumerate(faiss_list):
score[int(idx)] += 1.0 / (k + rank + 1)
for rank, idx in enumerate(bm25_list):
score[int(idx)] += 1.0 / (k + rank + 1)
fused = sorted(score.items(), key=lambda x: x[1], reverse=True)
return [idx for idx, _ in fused[:topn]]
def trim_text(s: str, limit: int) -> str:
if len(s) <= limit:
return s
return s[:limit] + " …"
def build_prompt(query: str, contexts: List[Tuple[Dict, Dict]], max_chars: int) -> str:
header = (
"あなたは技術文書を要約・参照するアシスタントです。以下の「コンテキスト」だけを根拠に、"
"質問に日本語で簡潔に回答し、最後に必ず参照元一覧(ファイル名とチャンク番号)を示してください。\n"
"推測は避け、根拠が足りない場合は『不明』と答えてください。\n"
)
qline = f"\n【質問】\n{clean_for_prompt(query)}\n"
# コンテキストは文書ごとに1行空ける。文書内の連続空行は圧縮済み。
ctx_lines = ["\n【コンテキスト】"]
for i, (meta, rec) in enumerate(contexts, start=1):
fn = rec.get("file_name")
ci = rec.get("chunk_index")
ctx = clean_for_prompt(rec.get("text", ""))
ctx_lines.append(f"--- 文書{i}:{fn}(chunk#{ci}) ---\n{ctx}")
ctx_block = "\n\n".join(ctx_lines) + "\n"
instruct = (
"\n【出力フォーマット】\n"
"回答本文。\n"
"\n"
"【参照元】\n"
"- <ファイル名> chunk#<番号>\n"
"- ...\n"
)
draft = clean_for_prompt(header + qline + ctx_block + instruct)
if len(draft) <= max_chars:
return draft
# 超過時はそのまま(選抜側で制御している想定)
return draft
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--chunks", required=True)
ap.add_argument("--indexdir", required=True)
ap.add_argument("--bm25", required=True)
ap.add_argument("--query", required=True)
ap.add_argument("--topk", type=int, default=10)
ap.add_argument("--context-chars", type=int, default=6000)
ap.add_argument("--per-chunk-max", type=int, default=1200)
args = ap.parse_args()
chunks_jsonl = Path(args.chunks)
indexdir = Path(args.indexdir)
bm25_pkl = Path(args.bm25)
# === FAISS ===
info = load_info(indexdir)
index = faiss.read_index(str(indexdir / info["faiss_index"]))
from sentence_transformers import SentenceTransformer
model = SentenceTransformer(info["model"])
meta_f, offsets = open_meta(indexdir)
# === BM25 ===
payload = pickle.loads(bm25_pkl.read_bytes())
docs_tokens: List[List[str]] = payload["docs_tokens"]
meta_min: List[dict] = payload["meta_min"]
bm25 = BM25Okapi(docs_tokens)
# === 検索 ===
q_emb = model.encode([args.query], normalize_embeddings=bool(info.get("normalize", True))).astype("float32")
D, I = index.search(q_emb, max(args.topk * 5, 20))
faiss_list = [int(x) for x in I[0] if x >= 0]
qtok = tokenize(args.query)
scores = bm25.get_scores(qtok)
bm25_rank = np.argsort(scores)[::-1]
bm25_list = list(bm25_rank[:max(len(faiss_list), args.topk * 5)])
fused_idx = rrf_fusion(faiss_list, bm25_list, k=60, topn=max(args.topk * 3, 30))
# === 選抜(文字数予算&1チャンクあたりの上限)===
picked: List[Tuple[Dict, Dict]] = []
budget = args.context_chars
for idx in fused_idx:
if idx < 0 or idx >= len(offsets):
continue
meta = read_meta_line(meta_f, offsets, idx)
line_no = meta_min[idx]["line_no"] if idx < len(meta_min) else idx
rec = read_chunk_text_by_lineno(chunks_jsonl, line_no)
if not rec:
continue
text = clean_for_prompt(rec.get("text", ""))
if len(text) > args.per_chunk_max:
text = trim_text(text, args.per_chunk_max)
rec["text"] = text
need = len(text) + 200
if need > budget and picked:
break
picked.append((meta, rec))
budget -= need
if len(picked) >= args.topk:
break
prompt = build_prompt(args.query, picked, args.context_chars)
# === 保存 ===
outdir = Path("prompts"); outdir.mkdir(exist_ok=True)
(outdir / "latest_prompt.txt").write_text(prompt, encoding="utf-8")
bundle = {
"query": args.query,
"prompt": prompt,
"citations": [
{
"file_name": r.get("file_name"),
"chunk_index": r.get("chunk_index"),
"file_path": r.get("file_path"),
}
for (_, r) in picked
],
"model_hint": info["model"],
}
(outdir / "latest_prompt.json").write_text(json.dumps(bundle, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[DONE] prompts/latest_prompt.txt")
print(f"[DONE] prompts/latest_prompt.json")
if __name__ == "__main__":
main()
生成物:
-
prompts/latest_prompt.txt
…… LLMにそのまま入力 -
prompts/latest_prompt.json
…… 構造化データ(出典メタ付き)
6. ローカルLLMに接続する
ここまでの流れで準備した latest_prompt.txt
を
ローカルLLM(例: gpt-oss-20b
)に入力するだけです。
まとめ
ステップ | ツール | 出力 |
---|---|---|
Office→TXT | LibreOffice | _txt/*.txt |
正規化&チャンク化 | Python | chunks.jsonl |
埋め込み生成 | SentenceTransformers + FAISS | index/vectors.faiss |
キーワード検索 | BM25 | index/bm25.pkl |
プロンプト生成 | Python | prompts/latest_prompt.txt |
おわりに
RAGは「LLMの頭脳」と「検索エンジンの記憶」を組み合わせる強力なアプローチです。
完全ローカルでナレッジ活用ができるため、安心して社内で運用できます。
終わりに(人間が記述)
LLMで初めてのことにも挑戦しやすくなってきました。
今後は、
・ワード以外の文書も処理する。
・データベースを利用する。
・図や写真も検索対象とする。CLIP等を使う?
などの課題があると思います。