0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Python未経験のWebエンジニアが、門前払いを回避するためにローカルLLM+Excel対応RAGアシスタントを自作した話

0
Posted at

はじめに

こんにちは。普段はWebアプリケーションエンジニアとして、システムの設計やバックエンド開発に携わっています。

これまでAIを活用した開発には関心があったものの、機械学習フレームワークの王道であるPyTorchに関しては完全な未経験でした。しかし、昨今の開発現場やキャリアの市場において「PyTorch未経験」というだけで選考の土台にすら乗れず、門前払いされるケースを少なからず耳にするようになりました。「それなら、まずは自分で触って何か作ってみよう」と思い立ったのが今回のきっかけです。

また、筆者は「生成AIのAPIに毎月課金したくない」「開発コストを極限まで抑えたい」というスーパーケチ精神を抱えています。そのため、外部APIを一切叩かず、手元のローカル環境(PCのローカルリソース)だけで完結する、実用的な社内資料メンター系アシスタントを構築することを目指しました。

初心者なりに試行錯誤した結果、思ったよりもスムーズに実装できたので、知見を共有するための記録として残します。


1. 構築したシステムの概要

今回作成したのは、日本のIT現場でよく見られる「複雑なExcel詳細設計書」を読み込ませ、その内容に基づいてローカルLLMが回答する簡易RAG(検索拡張生成)アシスタントです。

モデルには軽量かつ高性能な Qwen/Qwen2.5-1.5B-Instruct を採用し、PyTorchのテンソル演算を利用した独自のコンテキスト管理エンジンをバックエンドに組み込みました。

開発におけるコア機能

  • ハードウェアレベルの最適化: 非正規数(Denormal number)によるCPUの計算遅延ペナルティを回避する設定や、各種キャッシュの最適化。
  • テンソルによる状態管理: 各テキストセクターの重要度や参照頻度をバッファに保存。ループ処理を排除した一括減衰演算の実装。
  • 日本のExcel(方眼紙・結合セル)特化型パース: 結合セルの値を自動展開し、文脈の欠落やノイズを排除してLLMに渡す構造。

2. 実装における工夫と「日本のExcel」対策

RAGシステムを構築する上で最大の障壁となったのが、インプットとなるExcel(.xlsx)の構造でした。日本の開発現場で汎用される詳細設計書は、方眼紙形式、セルの結合、複雑な帳票レイアウトが多用されており、通常のテキスト抽出では文脈がバラバラになってしまいます。

これを解決するために、パース処理(load_environment_entropy)に以下の工夫を施しました。

  • 結合セルの値を全対象セルに展開: 結合されたセルのトップ左にある値を、マージされているすべての領域に複製して文脈の断絶を防ぐ。
  • 2Dプロジェクション(ヘッダー自動推論): 上方にある短いテキストを自動的にカラムヘッダーと見なし、各セルの値と [ヘッダー] 値 の形式で結合。
  • レイアウトのシリアライズ: 項目と値が「左から右へ」並ぶ関係性をパイプ記号( | )で繋ぎ、1行の明確なデータとしてLLMに構造を認識させる。

なお、今回の検証にあたり、Excelのサンプル構造として、大変実用的で素晴らしいテンプレートを公開されているこちらの記事「TISの画面設計書テンプレ&サンプルが役立ちそうなので紹介しておく」よりテンプレートをお借りしました。この場を借りて感謝申し上げます。


3. 実際の動作ログ

構築したシステムに実際の画面設計書(Excel)を読み込ませ、コンソールから質問を投げた際の出力ログです。

ローカルのCPU環境(torch.float32)であっても、事前にテンソル化とビットパッキングを行うことで、スムーズに参照データを抽出して回答が生成されていることが確認できます。

========================================
   社内資料 学習・作成支援アシスタント
========================================

[システム] 社内資料(design_docs.xlsx)を探しています...

[システム] 発見: c:\Users\design_docs.xlsx

[システム] 資料のベクトル化(2Dプロジェクション学習)、およびビットパッキングが完了しました。

[システム] AIの知能をロードしています...
Warning: You are sending unauthenticated requests to the HF Hub. Please set a HF_TOKEN to enable higher rate limits and faster downloads.
Loading weights: 100%|█████████████████████████████████| 338/338 [00:09<00:00, 34.29it/s]

[システム] 準備完了 (使用デバイス: cpu, 推論精度: torch.float32)

あなた: このドキュメントについて教えて

[システム] 参照データを抽出中...

==================================================
【作成支援AI】
【結論】
このドキュメントはおそらく一部の電子書籍またはオンラインマニュアルの構成要素である可能性があります。ただし、具体的な内容や目的については不明です。

【詳細】
- このドキュメントは複数のシートで構成されています:表紙、変更履歴、目次、各ページのタイトルなどが含まれています。
- シート名には「WA1020201」、「WA1020202」などの数字と文字が含まれており、これらは特定の機能や画面の名称を示唆しています。
- 「データ」シートも存在しており、これは可能であればデータの管理や分析に関する情報を含んでいるかもしれません。

==================================================


4. ソースコード全貌

ローカル環境(CPU/GPU問わず)で動作するアシスタントのコードです。実行には torch, transformers, openpyxl, pandas などのライブラリが必要です。

import os
import textwrap
import torch
import warnings
import pandas as pd
import openpyxl
from transformers import AutoModelForCausalLM, AutoTokenizer, TextStreamer
from transformers import logging as hf_logging

# 警告メッセージとHugging Faceのログを完全に沈黙させる
warnings.filterwarnings("ignore")
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
hf_logging.set_verbosity_error()

# =====================================================================
# ハードウェア最適化 (PyTorch)
# =====================================================================
torch.set_num_threads(os.cpu_count() or 4)

# 【計算力向上】非正規数(Denormal)によるCPUの深刻な計算遅延ペナルティをハードウェアレベルで回避
torch.set_flush_denormal(True)

if torch.cuda.is_available():
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    torch.backends.cudnn.benchmark = True

# =====================================================================
# 内部ロジック: コンテキスト管理エンジン
# =====================================================================

class PyContextEngine:
    def __init__(self, sector_capacity=5, max_sectors=2000):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.sector_capacity = sector_capacity
        self.max_sectors = max_sectors
        
        self.sectors = []
        self.probability_cloud_summary = ""
        self.conversation_turns = []
        
        self.vocab_size = 65536 
        
        self.state_buffer = torch.zeros(max_sectors, dtype=torch.int32, device=self.device)
        self.feature_matrix = torch.zeros((max_sectors, self.vocab_size), dtype=torch.int16, device=self.device)
        self.document_frequency = torch.zeros(self.vocab_size, dtype=torch.int16, device=self.device)
        
        # 検索時の `torch.sum` 計算を排除するための事前計算キャッシュ(SoA構造)
        self.sector_lengths = torch.ones(max_sectors, dtype=torch.int32, device=self.device)
        
        self.active_sector_count = 0

    def _hash_bigram(self, bigram):
        return hash(bigram) % self.vocab_size

    def pack_state(self, idx, va, vb, vc, state, ruin):
        packed = ((int(va) & 0xFF) | 
                  ((int(vb) & 0xFF) << 8) | 
                  ((int(vc) & 0xFF) << 16) | 
                  ((int(state) & 0x07) << 24) | 
                  ((int(ruin) & 0x07) << 27))
        self.state_buffer[idx] = torch.tensor(packed, dtype=torch.int32, device=self.device)

    def unpack_state(self, idx):
        val = self.state_buffer[idx].item()
        return {
            "vA": val & 0xFF,
            "vB": (val >> 8) & 0xFF,
            "vC": (val >> 16) & 0xFF,
            "State": (val >> 24) & 0x07,
            "RuinScore": (val >> 27) & 0x07
        }

    def load_environment_entropy(self, excel_path):
        """日本のExcel詳細設計書(方眼紙、結合セル、帳票形式)に特化した読み込み"""
        if not os.path.exists(excel_path):
            return False
            
        try:
            wb = openpyxl.load_workbook(excel_path, data_only=True)
            
            meta_data = f"【ブック構成情報】\nこのExcelファイルには以下のシートが含まれています: {', '.join(wb.sheetnames)}"
            self._add_sector("workbook_meta", meta_data)
            
            for sheet_name in wb.sheetnames:
                ws = wb[sheet_name]
                
                # 結合セル(マージセル)の値を全対象セルに展開し、文脈の欠落を防ぐ
                merged_values = {}
                for merged_range in ws.merged_cells.ranges:
                    min_col, min_row, max_col, max_row = merged_range.bounds
                    top_left_val = ws.cell(row=min_row, column=min_col).value
                    if top_left_val is not None:
                        val_str = str(top_left_val).replace('\n', ' ').replace('\r', '').strip()
                        if val_str:
                            for r in range(min_row, max_row + 1):
                                for c in range(min_col, max_col + 1):
                                    merged_values[(r, c)] = val_str
                
                chunk_lines = []
                chunk_length = 0
                chunk_idx = 0
                OVERLAP_SIZE = 2 
                
                col_headers = {}
                
                for row_idx, row in enumerate(ws.iter_rows(), start=1):
                    row_data = []
                    last_val = None
                    
                    for col_idx, cell in enumerate(row, start=1):
                        if (row_idx, col_idx) in merged_values:
                            val_str = merged_values[(row_idx, col_idx)]
                        else:
                            val = cell.value
                            if val is not None:
                                val_str = str(val).replace('\n', ' ').replace('\r', '').strip()
                            else:
                                val_str = ""
                                
                        if val_str:
                            if val_str != last_val:
                                if col_idx not in col_headers:
                                    col_headers[col_idx] = val_str
                                    row_data.append(val_str)
                                else:
                                    header = col_headers[col_idx]
                                    if header != val_str and len(header) <= 20:
                                        row_data.append(f"[{header}] {val_str}")
                                    else:
                                        row_data.append(val_str)
                                last_val = val_str
                    
                    if not row_data:
                        continue
                        
                    line_text = f"- 行{row_idx} => " + " | ".join(row_data)
                    
                    if chunk_length + len(line_text) > 800 and len(chunk_lines) > 0:
                        chunk_text = "\n".join(chunk_lines)
                        self._add_sector(f"{sheet_name}_part{chunk_idx}", f"【シート: {sheet_name}\n{chunk_text}")
                        
                        overlap_lines = chunk_lines[-OVERLAP_SIZE:] if len(chunk_lines) > OVERLAP_SIZE else []
                        chunk_lines = overlap_lines + [line_text]
                        chunk_length = sum(len(l) for l in overlap_lines) + len(line_text)
                        chunk_idx += 1
                    else:
                        chunk_lines.append(line_text)
                        chunk_length += len(line_text)
                        
                if len(chunk_lines) > 0:
                    chunk_text = "\n".join(chunk_lines)
                    self._add_sector(f"{sheet_name}_part{chunk_idx}", f"【シート: {sheet_name}\n{chunk_text}")
                    
            wb.close()
            return True
        except Exception as e:
            print(f"[内部エラー] パース中に異常発生: {e}")
            return False

    def _add_sector(self, sector_id, text_data):
        if self.active_sector_count >= self.max_sectors:
            return
            
        idx = self.active_sector_count
        self.sectors.append({"id": sector_id, "data": text_data})
        
        bigrams = [text_data[i:i+2] for i in range(len(text_data) - 1)] if len(text_data) > 1 else [text_data]
        unique_bigrams = list(set(bigrams))
        
        if bigrams:
            hashes = [self._hash_bigram(bg) for bg in bigrams]
            hash_tensor = torch.tensor(hashes, dtype=torch.long, device=self.device)
            counts = torch.bincount(hash_tensor)
            non_zero_hashes = torch.nonzero(counts).squeeze(-1)
            self.feature_matrix[idx, non_zero_hashes] += counts[non_zero_hashes].to(torch.int16)
            
            unique_hashes = [self._hash_bigram(bg) for bg in unique_bigrams]
            unique_hash_tensor = torch.tensor(unique_hashes, dtype=torch.long, device=self.device)
            self.document_frequency[unique_hash_tensor] += 1
            
        self.pack_state(idx, va=1, vb=0, vc=0, state=1, ruin=0)
        self.sector_lengths[idx] = torch.sum(self.feature_matrix[idx].to(torch.int32)) + 1
        self.active_sector_count += 1

    def calculate_cone of influence(self, user_query):
        if self.active_sector_count == 0:
            return "該当する参考資料はありません。"

        bigrams = [user_query[i:i+2] for i in range(len(user_query) - 1)] if len(user_query) > 1 else [user_query]
        unique_bigrams = list(set(bigrams))
        
        if not unique_bigrams:
            return "該当する参考資料はありません。"
            
        hashes = [self._hash_bigram(bg) for bg in unique_bigrams]
        hash_tensor = torch.tensor(hashes, dtype=torch.long, device=self.device)
        
        df_tensor = self.document_frequency[hash_tensor]
        idf_weights = (self.active_sector_count // (df_tensor + 1)) + 1
        
        relevant_features = self.feature_matrix[:self.active_sector_count, hash_tensor].to(torch.int32)
        base_scores = torch.mv(relevant_features, idf_weights.to(torch.int32))
        
        lengths = self.sector_lengths[:self.active_sector_count]
        base_scores_normalized = (base_scores * 100) // lengths
        
        states = self.state_buffer[:self.active_sector_count]
        va_values = states & 0xFF 
        
        final_scores = base_scores_normalized + va_values
        
        top_k = min(self.sector_capacity, self.active_sector_count)
        best_scores, best_indices = torch.topk(final_scores, top_k)
        
        if top_k > 0:
            max_score = best_scores[0].item()
            threshold = max_score * 0.4 
        else:
            threshold = 0

        cone_data = []
        for i in range(top_k):
            idx = best_indices[i].item()
            score = best_scores[i].item()
            if score > threshold and score > 0: 
                cone_data.append(self.sectors[idx]["data"])
                
                s = self.unpack_state(idx)
                new_va = min(s["vA"] + 2, 255)
                new_vb = min(s["vB"] + 1, 255)
                self.pack_state(idx, va=new_va, vb=new_vb, vc=0, state=s["State"], ruin=s["RuinScore"])
                
        if not cone_data:
            return "該当する参考資料はありません。"
            
        return "\n".join(cone_data)

    def apply_state_decay(self):
        """忘却・減衰のテンソル一括演算
           ループを使わずに全セクターの状態を一斉更新する
        """
        if self.active_sector_count == 0:
            return
            
        states = self.state_buffer[:self.active_sector_count]
        
        va = states & 0xFF
        vb = (states >> 8) & 0xFF
        vc = (states >> 16) & 0xFF
        st = (states >> 24) & 0x07
        ruin = (states >> 27) & 0x07
        
        vc = torch.clamp(vc + 1, max=255)
        
        decay_mask = (vc > 3).to(torch.int32)
        va = torch.clamp(va - decay_mask, min=1)
        vc = torch.where(decay_mask > 0, torch.zeros_like(vc), vc)
        
        ruin_mask = ((va == 1) & (vb == 0)).to(torch.int32)
        ruin = torch.clamp(ruin + ruin_mask, max=7)
        
        packed = (va | (vb << 8) | (vc << 16) | (st << 24) | (ruin << 27))
        self.state_buffer[:self.active_sector_count] = packed

    def enforce_state_transition(self, user_text, ai_text):
        self.conversation_turns.append({"user": user_text, "ai": ai_text})
        if len(self.conversation_turns) > 3:
            self.conversation_turns.pop(0)
            self.probability_cloud_summary = "(注意:会話が長くなったため、一部の古い文脈は圧縮されました)"
            
        self.apply_state_decay()


# =====================================================================
# UIレイヤー
# =====================================================================

def print_system_message(text):
    print(f"\n[システム] {text}")

def find_excel_file(filename):
    path1 = os.path.abspath(filename)
    path2 = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename)
    path3 = os.path.join(os.getcwd(), filename)
    
    for p in [path1, path2, path3]:
        if os.path.exists(p):
            return p
    return None

def main():
    os.system('cls' if os.name == 'nt' else 'clear')
    
    print("========================================")
    print("   社内資料 学習・作成支援アシスタント")
    print("========================================")
    
    engine = PyContextEngine(sector_capacity=5)
    excel_name = "design_docs.xlsx"
    
    print_system_message(f"社内資料({excel_name})を探しています...")
    
    excel_path = find_excel_file(excel_name)
    
    if excel_path:
        print_system_message(f"発見: {excel_path}")
        if engine.load_environment_entropy(excel_path):
            print_system_message("資料のベクトル化、およびビットパッキングが完了しました。")
        else:
            print_system_message("ファイルの読み込み中にエラーが発生しました。")
    else:
        print_system_message(f"エラー: '{excel_name}' が見つかりませんでした。")
        engine._add_sector("dummy", "学習データなしで起動します。語尾は『〜とする』で統一します。")

    print_system_message("AIの知能をロードしています...")
    
    try:
        model_id = "Qwen/Qwen2.5-1.5B-Instruct"
        tokenizer = AutoTokenizer.from_pretrained(model_id)
        
        device = "cuda" if torch.cuda.is_available() else "cpu"
        
        if device == "cuda" and torch.cuda.is_bf16_supported():
            dtype = torch.bfloat16
        else:
            dtype = torch.float16 if device == "cuda" else torch.float32
        
        model = AutoModelForCausalLM.from_pretrained(
            model_id, 
            torch_dtype=dtype,
            low_cpu_mem_usage=True,
            attn_implementation="sdpa"
        ).to(device)
        
        print_system_message(f"準備完了 (使用デバイス: {device}, 推論精度: {dtype})")
        
    except Exception as e:
        print_system_message(f"AI読み込み失敗: {e}")
        return

    while True:
        try:
            user_input = input("\nあなた: ")
            if user_input.lower() in ['exit', 'quit', '終了']:
                break
            if not user_input.strip():
                continue

            print_system_message("参照データを抽出中...\n")
            
            reference_data = engine.calculate_cone_of_influence(user_input)
            
            system_prompt = (
                "あなたは社内資料に基づいて的確かつ論理的に回答する優秀なAIアシスタントです。\n"
                "提供された【参考資料】の内容のみを事実とし、推測で捏造してはいけません。\n"
                "【重要】ユーザーの質問に対して、参考資料に関係のない情報が含まれている場合は、その部分には触れず、資料にある内容だけで回答してください。\n"
                "資料に全く記載がない場合は、あなたの持っている一般的な知識で答えず、必ず「資料に記載がありません」と明言してください。\n\n"
                "回答は必ず以下のフォーマットで出力してください。\n"
                "【結論】(質問に対する簡潔な直接の答え)\n"
                "【詳細】(参考資料から読み取れる具体的なデータや背景)"
            )
            if engine.probability_cloud_summary:
                system_prompt += f"\n{engine.probability_cloud_summary}"

            messages = [{"role": "system", "content": system_prompt}]
            for turn in engine.conversation_turns:
                messages.append({"role": "user", "content": turn["user"]})
                messages.append({"role": "assistant", "content": turn["ai"]})
            messages.append({"role": "user", "content": f"【参考資料】\n{reference_data}\n\n【質問】\n{user_input}"})
            
            prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
            inputs = tokenizer(prompt, return_tensors="pt", add_special_tokens=False).to(device)
            streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
            
            print("="*50 + "\n【作成支援AI】")
            with torch.inference_mode():
                outputs = model.generate(
                    **inputs,
                    max_new_tokens=2048, 
                    do_sample=False, 
                    repetition_penalty=1.1,
                    pad_token_id=tokenizer.eos_token_id,
                    streamer=streamer,
                    use_cache=True
                )
            print("\n" + "="*50)
            
            answer_tokens = outputs[0][inputs['input_ids'].shape[1]:]
            answer = tokenizer.decode(answer_tokens, skip_special_tokens=True).strip()
            engine.enforce_state_transition(user_text=user_input, ai_text=answer)

        except KeyboardInterrupt:
            break

if __name__ == "__main__":
    main()


5. 実際に触ってみた感想と考察

「PyTorchは数理モデルをガチガチに組むための専門家向けの道具」という先入観がありましたが、初心者なりにドキュメントを読みながら組んでみると、強力な多次元配列(テンソル)操作ライブラリとして非常に扱いやすいことに気づきました。

特に、検索スコアの計算(torch.mv を用いた行列とベクトルの積)や、バッファデータのビット演算・一括クランプ処理(torch.clamp)などは、通常のPythonのforループで回すよりも圧倒的に高速かつ簡潔に記述できます。

「食わず嫌いせずに触ってみるものだな」というのが率直な感想です。ここでの知見をベースにして、今後はさらに独自のロジックやAIの挙動制御システムを組み込んでみたいと考えています。


おわりに

API全盛期の時代ですが、完全ローカル環境でのAI駆動システムには、コスト面(完全無料)だけでなく、社内の秘匿情報を外部に送信しないというセキュリティ上の強力なメリットもあります。今後もクラウドの有料APIに頼ることなく、この「スーパーケチ精神」を原動力にして、ローカルリソースをフルに活かした独自のAIシステムやアーキテクチャの拡張に挑戦していきたいと思います。

もし手元に余っているハードウェア資産(GPUやマルチコアCPU)があれば、ぜひローカルRAGの構築を試してみてはいかがでしょうか。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?