14
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

マニュアルなどのPDF・Word文書からファインチューニング学習データを効率的に作成する方法

Posted at

この記事はLITALICO Engineers Advent Calendar 2025 カレンダー2 の 15日目の記事です

はじめに

企業にはマニュアル、仕様書、手順書など大量の文書データが蓄積されており、これらを活用してAIモデルをファインチューニングしたいニーズが高まっています。しかし、PDF・Word文書から学習データを作成するには、文書の解析・テキスト抽出・データ整形など多くの工程が必要です。

本記事では、PDF・Word文書から効率的にファインチューニング用の学習データを作成する実践的な方法を紹介します。

🚀 クイックスタートガイド

すぐに始めたい方は以下の順序で進んでください:

  1. 必要なライブラリをインストールテキスト抽出の環境構築
  2. 基本的なQ&A生成を試すデータセット作成のサンプルコード
  3. 完全なパイプラインを実行完全実装例のバッチ処理

📚 技術用語集

用語 説明
ファインチューニング 事前学習済みモデルを特定タスクに特化させる学習手法
JSONL JSON Lines形式。1行1JSONオブジェクトの形式
チャンク 長文を意味のある単位に分割した文章ブロック
Q&Aペア 質問(Question)と回答(Answer)のセット
VLM Vision Language Model。画像とテキストを理解するAI
レート制限 APIの使用頻度制限

目次

  1. 全体フロー - データ作成の流れと概要
  2. テキスト抽出 - PDF・Word文書からの効率的な抽出
  3. 画像データセット作成 - 文書内画像からのVLM用データセット
  4. データセット作成 - Q&A生成とファインチューニング形式への変換
  5. 品質管理 - データクリーニングと品質チェック
  6. 実装時の注意点 - よくある問題とその解決策
  7. プロバイダー比較 - OpenAI vs Geminiの使い分け
  8. 完全実装例 - 統合されたパイプライン
  9. 応用例 - 企業・多言語対応など
  10. 今後の展望

全体フロー

ファインチューニング用学習データ作成の全体的な流れは以下の通りです

主要な工程

  1. 文書解析・テキスト抽出: PDF・Wordからテキストと構造情報を抽出
  2. 構造化: セクション、見出し、段落などの構造を認識
  3. Q&A生成: 抽出したテキストから質問・回答ペアを自動生成
  4. データクリーニング: 品質の低いデータを除去・修正
  5. フォーマット変換: ファインチューニング形式(JSONL等)に変換

テキスト抽出

PDF文書からの抽出

PDFからのテキスト抽出には以下の課題があります

  • レイアウトの複雑さ(表、図、複数カラム)
  • 文字認識エラー(スキャンPDFの場合)
  • 構造情報の保持

推奨ライブラリとツール

# PDF抽出の基本例
import fitz  # PyMuPDF
from pdfplumber import PDF
import pymupdf4llm

def extract_pdf_text_with_structure(pdf_path):
    """PDF文書から構造を保持してテキストを抽出"""
    
    # PyMuPDF4LLMを使用(LLM向けに最適化)
    md_text = pymupdf4llm.to_markdown(pdf_path)
    
    # pdfplumberで表データも抽出
    with PDF.open(pdf_path) as pdf:
        tables = []
        for page in pdf.pages:
            page_tables = page.extract_tables()
            tables.extend(page_tables)
    
    return {
        'markdown_text': md_text,
        'tables': tables
    }

Word文書からの抽出

Word文書は構造情報が比較的保持しやすく、以下のライブラリが有効です

from docx import Document
import mammoth

def extract_word_text_with_structure(docx_path):
    """Word文書から構造を保持してテキストを抽出"""
    
    # python-docxで構造情報を保持
    doc = Document(docx_path)
    structured_content = []
    
    for element in doc.element.body:
        if element.tag.endswith('p'):  # 段落
            paragraph = element
            style = paragraph.style.name if paragraph.style else "Normal"
            text = paragraph.text
            structured_content.append({
                'type': 'paragraph',
                'style': style,
                'text': text
            })
        elif element.tag.endswith('tbl'):  # 表
            # 表の処理
            table_data = extract_table_from_element(element)
            structured_content.append({
                'type': 'table',
                'data': table_data
            })
    
    # mammothでHTMLに変換(代替手法)
    with open(docx_path, "rb") as docx_file:
        result = mammoth.convert_to_html(docx_file)
        html_content = result.value
    
    return {
        'structured_content': structured_content,
        'html_content': html_content
    }

文書内画像からのデータセット作成

PDF・Word文書内の画像(図表、チャート、スクリーンショット等)からもファインチューニング用データセットを作成できます。これは Vision Language Model (VLM) のファインチューニングに特に有効です。

画像抽出と基本処理

import fitz  # PyMuPDF
from PIL import Image
import io
import base64
from typing import List, Dict

class ImageExtractor:
    """PDF・Word文書から画像を抽出するクラス"""
    
    def __init__(self):
        self.supported_formats = ['png', 'jpg', 'jpeg', 'bmp', 'tiff']
        self.min_image_size = (100, 100)  # 最小画像サイズ
    
    def extract_images_from_pdf(self, pdf_path: str) -> List[Dict]:
        """PDFから画像を抽出"""
        doc = fitz.open(pdf_path)
        images = []
        
        for page_num in range(len(doc)):
            page = doc[page_num]
            image_list = page.get_images(full=True)
            
            for img_index, img in enumerate(image_list):
                try:
                    # 画像データを取得
                    xref = img[0]
                    pix = fitz.Pixmap(doc, xref)
                    
                    if pix.n - pix.alpha < 4:  # RGB形式チェック
                        img_data = pix.tobytes("png")
                        img_obj = Image.open(io.BytesIO(img_data))
                        
                        # サイズフィルタ
                        if (img_obj.size[0] >= self.min_image_size[0] and 
                            img_obj.size[1] >= self.min_image_size[1]):
                            
                            images.append({
                                'page_number': page_num + 1,
                                'image_index': img_index,
                                'image': img_obj,
                                'size': img_obj.size,
                                'format': 'png',
                                'context': self._extract_surrounding_text(page, img),
                                'image_id': f"page_{page_num+1}_img_{img_index}"
                            })
                    
                    pix = None  # メモリ解放
                    
                except Exception as e:
                    print(f"画像抽出エラー (page {page_num+1}, img {img_index}): {e}")
                    continue
        
        doc.close()
        return images
    
    def _extract_surrounding_text(self, page, img_info) -> str:
        """画像周辺のテキストを抽出(重要な文脈情報)"""
        try:
            # 簡略化された文脈抽出
            page_text = page.get_text()
            # 実際にはより精密な位置ベースの抽出を実装
            return page_text[:200] + "..." if len(page_text) > 200 else page_text
        except:
            return ""

Vision APIによるQ&A生成

import openai

class VisionQAGenerator:
    """画像からQ&Aペアを生成"""
    
    def __init__(self, api_key: str, provider: str = "openai"):
        self.provider = provider
        if provider == "openai":
            self.client = openai.OpenAI(api_key=api_key)
    
    def image_to_base64(self, image: Image.Image) -> str:
        """PIL ImageをBase64に変換"""
        buffered = io.BytesIO()
        image.save(buffered, format="PNG")
        return base64.b64encode(buffered.getvalue()).decode()
    
    def generate_qa_from_image(self, image_data: Dict) -> List[Dict]:
        """画像からQ&Aペアを生成"""
        
        image = image_data['image']
        context = image_data.get('context', '')
        base64_image = self.image_to_base64(image)
        
        prompt = f"""
この画像を分析して、ファインチューニング用の質問・回答ペアを3個生成してください。

画像の文脈: {context}

以下の観点で質問を作成
1. 画像の全体的な内容説明
2. 具体的な要素(数値、文字、色など)  
3. 実用的な用途や操作方法

JSON形式で出力
```json
[
  {{
    "question": "質問文",
    "answer": "回答文",
    "image_type": "図表|チャート|スクリーンショット|その他",
    "difficulty": "初級|中級|上級"
  }}
]
```"""
        
        try:
            response = self.client.chat.completions.create(
                model="gpt-4-vision-preview",
                messages=[{
                    "role": "user",
                    "content": [
                        {"type": "text", "text": prompt},
                        {"type": "image_url", "image_url": {
                            "url": f"data:image/png;base64,{base64_image}"
                        }}
                    ]
                }],
                max_tokens=1000
            )
            
            content = response.choices[0].message.content
            return self._extract_json(content)
            
        except Exception as e:
            print(f"Vision Q&A生成エラー: {e}")
            return []
    
    def _extract_json(self, content: str) -> List[Dict]:
        """レスポンスからJSONを抽出"""
        import re
        import json
        
        json_match = re.search(r'```json\n(.*?)\n```', content, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group(1))
            except:
                pass
        return []

画像種別の自動判定と最適化

def analyze_image_type(image_data: Dict) -> Dict:
    """画像の種別を分析して処理を最適化"""
    
    image = image_data['image']
    width, height = image.size
    aspect_ratio = width / height
    
    # アスペクト比による分類
    if 0.8 < aspect_ratio < 1.2:
        image_type = 'chart_or_diagram'
        focus_areas = ['データ読み取り', '要素の関係', '数値の比較']
    elif aspect_ratio > 2:
        image_type = 'timeline_or_process'  
        focus_areas = ['順序', 'プロセス', '時系列']
    elif aspect_ratio < 0.5:
        image_type = 'list_or_menu'
        focus_areas = ['項目一覧', '選択肢', '階層構造']
    else:
        image_type = 'screenshot_or_interface'
        focus_areas = ['UI操作', '機能説明', '使用方法']
    
    return {
        'type': image_type,
        'focus_areas': focus_areas,
        'size_category': 'large' if width * height > 500000 else 'medium',
        'recommended_qa_count': 3
    }

実用的な活用例

1. 技術マニュアルの活用

  • UI スクリーンショット → 操作手順のQ&A
  • システム構成図 → 関係性理解のQ&A
  • 設定画面 → パラメータ説明のQ&A

2. データ分析資料の活用

  • グラフ・チャート → 数値読み取りのQ&A
  • 比較表 → データ比較のQ&A
  • ダッシュボード → 指標理解のQ&A

3. 研修・教育資料の活用

  • 概念図 → 理論理解のQ&A
  • フローチャート → プロセス理解のQ&A
  • 実物写真 → 識別・判断のQ&A

コストと効果の見積もり

def estimate_image_qa_cost(num_images: int, provider: str = "openai") -> Dict:
    """画像Q&A生成のコスト見積もり"""
    
    costs = {
        'openai': 0.01,  # 約1円/画像
        'gemini': 0.005  # 約0.5円/画像  
    }
    
    cost_per_image = costs.get(provider, 0.01)
    
    return {
        'total_images': num_images,
        'estimated_cost': num_images * cost_per_image,
        'expected_qa_pairs': num_images * 3,
        'cost_per_qa': cost_per_image / 3,
        'provider': provider
    }

# 使用例
cost_estimate = estimate_image_qa_cost(100, "openai")
print(f"100枚の画像処理: {cost_estimate['estimated_cost']}")
print(f"生成Q&A数: {cost_estimate['expected_qa_pairs']}ペア")

統合データセット作成

def create_multimodal_dataset(pdf_path: str) -> Dict:
    """テキスト + 画像の統合データセット作成"""
    
    # 1. テキストQ&A生成(既存機能)
    text_qa_pairs = []  # 前述のテキスト処理で生成
    
    # 2. 画像Q&A生成
    image_extractor = ImageExtractor()
    vision_qa = VisionQAGenerator("your-api-key")
    
    images = image_extractor.extract_images_from_pdf(pdf_path)
    image_qa_pairs = []
    
    for img_data in images:
        qa_pairs = vision_qa.generate_qa_from_image(img_data)
        for qa in qa_pairs:
            qa['source_type'] = 'image'
            qa['image_id'] = img_data['image_id']
        image_qa_pairs.extend(qa_pairs)
    
    # 3. 統合と保存
    combined_dataset = {
        'text_qa': text_qa_pairs,
        'image_qa': image_qa_pairs,
        'total_qa': len(text_qa_pairs) + len(image_qa_pairs),
        'multimodal_ready': True
    }
    
    return combined_dataset

この機能により、文書内の画像も含めた完全なマルチモーダルファインチューニングデータセットが作成可能になります!

視覚的な情報を含むことで、より実用的で包括的なAIモデルの構築ができるようになります。

画像データセットの検証・確認の重要性

画像から自動生成されたQ&Aデータセットの内容確認は、ファインチューニング成功の鍵です。以下の理由から必須の工程となります。

1. Vision APIの精度限界

class ImageDatasetValidator:
    """画像データセットの品質検証クラス"""
    
    def __init__(self):
        self.common_vision_errors = [
            "画像が不鮮明で詳細が読み取れません",
            "テキストが小さすぎて判読できません", 
            "申し訳ございませんが",
            "確実ではありませんが",
            "推測ですが"
        ]
        self.quality_thresholds = {
            'min_answer_length': 20,
            'max_answer_length': 500,
            'min_question_length': 10,
            'confidence_threshold': 0.7
        }
    
    def validate_image_qa_batch(self, image_qa_dataset: List[Dict]) -> Dict:
        """画像Q&Aデータセットの一括検証"""
        
        validation_results = {
            'total_qa_pairs': len(image_qa_dataset),
            'valid_pairs': 0,
            'quality_issues': {},
            'error_categories': {},
            'recommendations': []
        }
        
        for qa in image_qa_dataset:
            issues = self._validate_single_qa(qa)
            
            if not issues:
                validation_results['valid_pairs'] += 1
            else:
                for issue in issues:
                    if issue not in validation_results['quality_issues']:
                        validation_results['quality_issues'][issue] = 0
                    validation_results['quality_issues'][issue] += 1
        
        # 品質分析
        success_rate = validation_results['valid_pairs'] / validation_results['total_qa_pairs']
        
        if success_rate < 0.7:
            validation_results['recommendations'].append("画像品質が低すぎます。より鮮明な画像を使用してください")
        
        if validation_results['quality_issues'].get('vision_api_error', 0) > 10:
            validation_results['recommendations'].append("Vision APIの認識エラーが多発しています。画像の前処理が必要です")
        
        return validation_results
    
    def _validate_single_qa(self, qa: Dict) -> List[str]:
        """単一Q&Aペアの品質検証"""
        issues = []
        
        question = qa.get('question', '')
        answer = qa.get('answer', '')
        
        # Vision APIエラーの検出
        for error_phrase in self.common_vision_errors:
            if error_phrase in answer:
                issues.append('vision_api_error')
                break
        
        # 長さチェック
        if len(answer) < self.quality_thresholds['min_answer_length']:
            issues.append('answer_too_short')
        
        if len(answer) > self.quality_thresholds['max_answer_length']:
            issues.append('answer_too_long')
        
        if len(question) < self.quality_thresholds['min_question_length']:
            issues.append('question_too_short')
        
        # 具体性チェック
        vague_phrases = ['何かしら', 'おそらく', 'と思われます', '可能性があります']
        if any(phrase in answer for phrase in vague_phrases):
            issues.append('answer_too_vague')
        
        # 画像参照の妥当性チェック
        if 'この画像' in question and 'image_id' not in qa:
            issues.append('missing_image_reference')
        
        return issues

2. 手動確認のワークフロー

import matplotlib.pyplot as plt
from IPython.display import display, HTML
import base64

class ImageQAReviewer:
    """画像Q&Aの手動レビュー支援ツール"""
    
    def __init__(self):
        self.review_results = []
    
    def create_review_interface(self, image_qa_dataset: List[Dict], output_dir: str = "./review"):
        """レビュー用のHTMLインターフェースを生成"""
        
        import os
        os.makedirs(output_dir, exist_ok=True)
        
        html_content = """
<!DOCTYPE html>
<html>
<head>
    <title>画像Q&Aデータセット レビュー</title>
    <style>
        .qa-item { border: 1px solid #ccc; margin: 10px; paddierror: 15px; }
        .image-container { max-width: 500px; margin: 10px 0; }
        .image-container img { max-width: 100%; height: auto; }
        .question { background: #e3f2fd; paddierror: 10px; margin: 5px 0; }
        .answer { background: #f3e5f5; paddierror: 10px; margin: 5px 0; }
        .metadata { background: #f5f5f5; paddierror: 5px; font-size: 0.9em; }
        .review-buttons { margin: 10px 0; }
        .review-buttons button { margin: 5px; paddierror: 8px 15px; }
        .approve { background: #4caf50; color: white; }
        .reject { background: #f44336; color: white; }
        .edit { background: #ff9800; color: white; }
    </style>
</head>
<body>
    <h1>画像Q&Aデータセット レビュー</h1>
"""
        
        for i, qa in enumerate(image_qa_dataset):
            # 画像をBase64エンコード
            if 'image' in qa:
                buffered = io.BytesIO()
                qa['image'].save(buffered, format="PNG")
                img_str = base64.b64encode(buffered.getvalue()).decode()
                
                html_content += f"""
    <div class="qa-item" id="qa-{i}">
        <h3>Q&A {i+1}</h3>
        <div class="image-container">
            <img src="data:image/png;base64,{img_str}" alt="Image {i+1}">
        </div>
        <div class="question">
            <strong>質問:</strong> {qa.get('question', '')}
        </div>
        <div class="answer">
            <strong>回答:</strong> {qa.get('answer', '')}
        </div>
        <div class="metadata">
            <strong>画像ID:</strong> {qa.get('image_id', 'N/A')} | 
            <strong>タイプ:</strong> {qa.get('image_type', 'N/A')} | 
            <strong>難易度:</strong> {qa.get('difficulty', 'N/A')}
        </div>
        <div class="review-buttons">
            <button class="approve" onclick="approve({i})">承認</button>
            <button class="reject" onclick="reject({i})">却下</button>
            <button class="edit" onclick="edit({i})">編集</button>
        </div>
        <div id="notes-{i}" style="margin-top: 10px;">
            <textarea placeholder="レビューコメント..." style="width: 100%; height: 60px;"></textarea>
        </div>
    </div>
"""
        
        html_content += """
    <script>
        let reviewResults = {};
        
        function approve(index) {
            reviewResults[index] = 'approved';
            document.getElementById(`qa-${index}`).style.backgroundColor = '#e8f5e8';
            console.log('Approved QA:', index);
        }
        
        function reject(index) {
            reviewResults[index] = 'rejected';
            document.getElementById(`qa-${index}`).style.backgroundColor = '#ffebee';
            console.log('Rejected QA:', index);
        }
        
        function edit(index) {
            reviewResults[index] = 'needs_editing';
            document.getElementById(`qa-${index}`).style.backgroundColor = '#fff3e0';
            console.log('Needs editing QA:', index);
        }
        
        function exportResults() {
            console.log('Review Results:', reviewResults);
            // 実際の実装では、結果をサーバーに送信またはファイルとして保存
        }
    </script>
    <button onclick="exportResults()" style="position: fixed; bottom: 20px; right: 20px; paddierror: 15px; background: #2196f3; color: white;">
        レビュー結果をエクスポート
    </button>
</body>
</html>
"""
        
        # HTMLファイルを保存
        with open(f"{output_dir}/review.html", 'w', encoding='utf-8') as f:
            f.write(html_content)
        
        print(f"レビューインターフェースを生成しました: {output_dir}/review.html")
        return f"{output_dir}/review.html"
    
    def generate_quality_report(self, image_qa_dataset: List[Dict]) -> str:
        """品質レポートを生成"""
        
        validator = ImageDatasetValidator()
        validation_results = validator.validate_image_qa_batch(image_qa_dataset)
        
        report = f"""
# 画像Q&Aデータセット 品質レポート

## 概要統計
- **総Q&A数**: {validation_results['total_qa_pairs']}
- **品質合格数**: {validation_results['valid_pairs']}
- **合格率**: {validation_results['valid_pairs']/validation_results['total_qa_pairs']*100:.1f}%

## 品質問題の内訳
"""
        
        for issue, count in validation_results['quality_issues'].items():
            percentage = (count / validation_results['total_qa_pairs']) * 100
            report += f"- **{issue}**: {count}件 ({percentage:.1f}%)\n"
        
        report += "\n## 推奨改善策\n"
        for rec in validation_results['recommendations']:
            report += f"- {rec}\n"
        
        return report

3. 品質改善のベストプラクティス

class ImageQAOptimizer:
    """画像Q&A品質の最適化"""
    
    def __init__(self):
        self.improvement_strategies = {
            'low_resolution': '画像解像度を300DPI以上に向上',
            'poor_contrast': 'コントラスト調整で視認性向上',
            'multiple_elements': '1画像1要素の原則で分割',
            'text_heavy': 'OCR前処理でテキスト抽出精度向上'
        }
    
    def preprocess_images_for_qa(self, images: List[Dict]) -> List[Dict]:
        """Q&A生成前の画像前処理"""
        
        processed_images = []
        
        for img_data in images:
            image = img_data['image']
            
            # 画像品質分析
            quality_score = self._analyze_image_quality(image)
            
            if quality_score['overall'] > 0.6:  # 品質閾値
                # 必要に応じて画像を改善
                enhanced_image = self._enhance_image(image, quality_score)
                
                img_data['image'] = enhanced_image
                img_data['quality_score'] = quality_score
                img_data['preprocessing_applied'] = True
                
                processed_images.append(img_data)
            else:
                print(f"画像品質が低いためスキップ: {img_data['image_id']} (スコア: {quality_score['overall']:.2f})")
        
        return processed_images
    
    def _analyze_image_quality(self, image: Image.Image) -> Dict:
        """画像品質の分析"""
        import numpy as np
        
        # PIL ImageをNumPy配列に変換
        img_array = np.array(image.convert('L'))  # グレースケール変換
        
        # 基本的な品質指標
        sharpness = np.var(img_array)  # 分散によるシャープネス評価
        contrast = img_array.std()     # 標準偏差によるコントラスト評価
        brightness = img_array.mean()  # 平均輝度
        
        # 正規化スコア (0-1)
        quality_scores = {
            'sharpness': min(sharpness / 10000, 1.0),
            'contrast': min(contrast / 128, 1.0),
            'brightness': 1.0 - abs(brightness - 128) / 128,
            'size_adequacy': 1.0 if image.size[0] * image.size[1] > 40000 else 0.5
        }
        
        # 総合スコア
        quality_scores['overall'] = sum(quality_scores.values()) / len(quality_scores)
        
        return quality_scores
    
    def _enhance_image(self, image: Image.Image, quality_score: Dict) -> Image.Image:
        """画像品質の改善"""
        from PIL import ImageEnhance
        
        enhanced = image.copy()
        
        # コントラスト改善
        if quality_score['contrast'] < 0.5:
            enhancer = ImageEnhance.Contrast(enhanced)
            enhanced = enhancer.enhance(1.5)
        
        # シャープネス改善
        if quality_score['sharpness'] < 0.5:
            enhancer = ImageEnhance.Sharpness(enhanced)
            enhanced = enhancer.enhance(1.3)
        
        # 明度調整
        if quality_score['brightness'] < 0.7:
            enhancer = ImageEnhance.Brightness(enhanced)
            factor = 1.2 if quality_score['brightness'] < 0.5 else 1.1
            enhanced = enhancer.enhance(factor)
        
        return enhanced

4. 継続的な品質監視

def setup_quality_monitoring_pipeline(dataset_path: str) -> Dict:
    """品質監視パイプラインの設定"""
    
    monitoring_config = {
        'validation_frequency': 'daily',
        'quality_thresholds': {
            'min_success_rate': 0.8,
            'max_error_rate': 0.1,
            'min_avg_confidence': 0.7
        },
        'alert_conditions': [
            'success_rate_drop_below_threshold',
            'error_spike_detected',
            'confidence_degradation'
        ],
        'auto_actions': {
            'low_quality_removal': True,
            'retry_failed_generations': True,
            'fallback_to_manual_review': True
        }
    }
    
    return monitoring_config

# 実使用例
def comprehensive_image_qa_workflow(pdf_path: str) -> Dict:
    """包括的な画像Q&Aワークフロー"""
    
    # 1. 画像抽出
    extractor = ImageExtractor()
    raw_images = extractor.extract_images_from_pdf(pdf_path)
    
    # 2. 画像前処理・品質改善
    optimizer = ImageQAOptimizer()
    processed_images = optimizer.preprocess_images_for_qa(raw_images)
    
    # 3. Q&A生成
    vision_qa = VisionQAGenerator("your-api-key")
    qa_dataset = []
    
    for img_data in processed_images:
        qa_pairs = vision_qa.generate_qa_from_image(img_data)
        qa_dataset.extend(qa_pairs)
    
    # 4. 品質検証
    validator = ImageDatasetValidator()
    validation_results = validator.validate_image_qa_batch(qa_dataset)
    
    # 5. 手動レビュー用インターフェース生成
    reviewer = ImageQAReviewer()
    review_interface = reviewer.create_review_interface(qa_dataset)
    
    # 6. 品質レポート生成
    quality_report = reviewer.generate_quality_report(qa_dataset)
    
    return {
        'total_images_processed': len(processed_images),
        'qa_pairs_generated': len(qa_dataset),
        'validation_results': validation_results,
        'review_interface_path': review_interface,
        'quality_report': quality_report,
        'recommended_next_steps': [
            '1. レビューインターフェースで手動確認を実施',
            '2. 品質問題のあるQ&Aを修正または除去',
            '3. 承認されたデータセットでファインチューニング実行'
        ]
    }

なぜ内容確認が重要なのか

  1. Vision APIの限界:

    • 小さい文字やぼやけた画像の誤認識
    • 文脈理解の不完全性
    • 推測による不正確な回答
  2. ファインチューニングへの影響:

    • 低品質データによるモデル性能劣化
    • 間違った知識の学習
    • 本番環境での予期しない動作
  3. ROI最大化:

    • 高品質データによる効果的な学習
    • 手動修正コストの最小化
    • 実用的なAIモデルの構築

画像データセットの内容確認は、品質の高いファインチューニングを実現するための必須工程です!

構造化データ変換

抽出したテキストを構造化し、意味のある単位に分割します。

セクション分割

import re
from typing import List, Dict

def split_into_sections(markdown_text: str) -> List[Dict]:
    """マークダウンテキストをセクション単位に分割"""
    
    sections = []
    current_section = None
    
    lines = markdown_text.split('\n')
    
    for line in lines:
        # 見出しの検出
        heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
        
        if heading_match:
            # 前のセクションを保存
            if current_section:
                sections.append(current_section)
            
            # 新しいセクションを開始
            level = len(heading_match.group(1))
            title = heading_match.group(2)
            
            current_section = {
                'level': level,
                'title': title,
                'content': []
            }
        else:
            # 内容を現在のセクションに追加
            if current_section:
                current_section['content'].append(line)
    
    # 最後のセクションを保存
    if current_section:
        sections.append(current_section)
    
    # 内容を結合
    for section in sections:
        section['content'] = '\n'.join(section['content']).strip()
    
    return sections

意味のある単位への分割

def create_meaningful_chunks(sections: List[Dict], max_length: int = 1000) -> List[Dict]:
    """セクションを意味のある単位に分割"""
    
    chunks = []
    
    for section in sections:
        content = section['content']
        title = section['title']
        
        if len(content) <= max_length:
            # そのまま使用
            chunks.append({
                'title': title,
                'content': content,
                'context': f"セクション: {title}"
            })
        else:
            # 長い場合は段落単位で分割
            paragraphs = content.split('\n\n')
            current_chunk = ""
            
            for paragraph in paragraphs:
                if len(current_chunk + paragraph) <= max_length:
                    current_chunk += paragraph + '\n\n'
                else:
                    if current_chunk:
                        chunks.append({
                            'title': title,
                            'content': current_chunk.strip(),
                            'context': f"セクション: {title}"
                        })
                    current_chunk = paragraph + '\n\n'
            
            if current_chunk:
                chunks.append({
                    'title': title,
                    'content': current_chunk.strip(),
                    'context': f"セクション: {title}"
                })
    
    return chunks

データセット作成

必要なデータ量の概算

ファインチューニングに適したデータセットを作成するために、元のPDF文書にどの程度の文字数が必要かの概算を示します

データ変換効率の目安

# 実際の変換効率に基づく概算例
def estimate_dataset_size(pdf_char_count: int) -> Dict:
    """PDF文字数からデータセットサイズを概算"""
    
    # 経験的な変換効率
    chunk_ratio = 0.7  # 有効テキスト率(図表・重複除去後)
    chunk_size = 800   # 1チャンクあたりの平均文字数
    qa_per_chunk = 3   # チャンクあたりのQ&A生成数
    quality_filter_ratio = 0.8  # 品質フィルタ通過率
    
    effective_chars = pdf_char_count * chunk_ratio
    total_chunks = effective_chars / chunk_size
    raw_qa_pairs = total_chunks * qa_per_chunk
    final_qa_pairs = raw_qa_pairs * quality_filter_ratio
    
    return {
        'pdf_chars': pdf_char_count,
        'effective_chars': int(effective_chars),
        'chunks': int(total_chunks),
        'raw_qa_pairs': int(raw_qa_pairs),
        'final_qa_pairs': int(final_qa_pairs),
        'quality_ratio': f"{quality_filter_ratio:.1%}"
    }

# 具体例での試算
examples = [
    ("短いマニュアル", 10000),
    ("中規模マニュアル", 50000),
    ("大規模マニュアル", 200000),
    ("複数文書統合", 500000)
]

print("PDF文字数からデータセット概算")
print("=" * 60)
for name, char_count in examples:
    result = estimate_dataset_size(char_count)
    print(f"\n{name} ({char_count:,}文字):")
    print(f"  → 有効テキスト: {result['effective_chars']:,}文字")
    print(f"  → チャンク数: {result['chunks']}")
    print(f"  → 最終Q&A数: {result['final_qa_pairs']}ペア")

ファインチューニング目的別の推奨データ量

目的 推奨Q&A数 必要PDF文字数(概算) 備考
概念実証・実験 50-100ペア 20,000-40,000文字 小規模マニュアル1冊程度
部門特化型AI 200-500ペア 80,000-200,000文字 中規模マニュアル2-3冊
企業全体対応 1,000-3,000ペア 400,000-1,200,000文字 大規模マニュアル群
高精度専門AI 3,000-10,000ペア 1,200,000-4,000,000文字 包括的な文書コレクション

文書種別の効率性

def analyze_document_efficiency(doc_type: str) -> Dict:
    """文書種類別の変換効率分析"""
    
    efficiency_map = {
        'operation_manual': {
            'text_ratio': 0.8,  # 高いテキスト密度
            'qa_quality': 0.9,  # 構造化された内容
            'complexity': 'low',
            'recommended_chunk_size': 600
        },
        'technical_spec': {
            'text_ratio': 0.6,  # 図表が多い
            'qa_quality': 0.7,  # 技術的で複雑
            'complexity': 'high', 
            'recommended_chunk_size': 1000
        },
        'policy_document': {
            'text_ratio': 0.9,  # ほぼテキスト
            'qa_quality': 0.8,  # 抽象的な内容
            'complexity': 'medium',
            'recommended_chunk_size': 800
        },
        'faq_document': {
            'text_ratio': 0.95, # Q&A形式で効率的
            'qa_quality': 0.95, # 既にQ&A構造
            'complexity': 'low',
            'recommended_chunk_size': 400
        }
    }
    
    return efficiency_map.get(doc_type, {
        'text_ratio': 0.7,
        'qa_quality': 0.8, 
        'complexity': 'medium',
        'recommended_chunk_size': 800
    })

# 使用例:文書種別に応じた詳細見積もり
def detailed_estimate(pdf_chars: int, doc_type: str) -> Dict:
    """文書種別を考慮した詳細見積もり"""
    
    efficiency = analyze_document_efficiency(doc_type)
    
    effective_chars = pdf_chars * efficiency['text_ratio']
    chunk_count = effective_chars / efficiency['recommended_chunk_size']
    qa_per_chunk = 3 if efficiency['complexity'] == 'low' else 2
    final_qa = chunk_count * qa_per_chunk * efficiency['qa_quality']
    
    return {
        'document_type': doc_type,
        'pdf_chars': pdf_chars,
        'effective_chars': int(effective_chars),
        'estimated_qa_pairs': int(final_qa),
        'efficiency_notes': f"テキスト率: {efficiency['text_ratio']:.1%}, "
                           f"品質率: {efficiency['qa_quality']:.1%}"
    }

# 実例
doc_examples = [
    ("操作マニュアル (50,000文字)", 50000, 'operation_manual'),
    ("技術仕様書 (80,000文字)", 80000, 'technical_spec'), 
    ("社内規程 (30,000文字)", 30000, 'policy_document'),
    ("FAQ集 (20,000文字)", 20000, 'faq_document')
]

print("\n文書種別での詳細見積もり")
print("=" * 50)
for desc, chars, doc_type in doc_examples:
    result = detailed_estimate(chars, doc_type)
    print(f"\n{desc}:")
    print(f"  推定Q&A数: {result['estimated_qa_pairs']}ペア")
    print(f"  {result['efficiency_notes']}")

品質を重視した場合の推奨値

def quality_focused_estimates():
    """品質重視の場合の推奨データ量"""
    
    quality_tiers = {
        'minimum_viable': {
            'qa_pairs': 100,
            'pdf_chars': 40000,
            'description': '最低限の動作確認レベル'
        },
        'production_ready': {
            'qa_pairs': 500,
            'pdf_chars': 200000,
            'description': '実用レベル(80%の質問に対応)'
        },
        'high_quality': {
            'qa_pairs': 1500,
            'pdf_chars': 600000,
            'description': '高品質(90%の質問に対応)'
        },
        'enterprise_grade': {
            'qa_pairs': 3000,
            'pdf_chars': 1200000,
            'description': '企業級(95%の質問に対応)'
        }
    }
    
    print("品質レベル別推奨データ量")
    print("=" * 40)
    for tier, data in quality_tiers.items():
        print(f"\n{tier.replace('_', ' ').title()}:")
        print(f"  Q&A数: {data['qa_pairs']}ペア")
        print(f"  PDF文字数: {data['pdf_chars']:,}文字")
        print(f"  レベル: {data['description']}")

quality_focused_estimates()

コスト見積もり

def calculate_processing_cost(pdf_chars: int, provider: str = "openai") -> Dict:
    """処理コストの見積もり"""
    
    # データセット規模を推定
    estimate = estimate_dataset_size(pdf_chars)
    
    # プロバイダー別料金(2024年11月時点、円建て)
    rates = {
        'openai': {
            'input_per_1k': 2.25,   # GPT-4入力 ($0.015 * 150円)
            'output_per_1k': 6.75   # GPT-4出力 ($0.045 * 150円)
        },
        'gemini': {
            'input_per_1k': 0.11,   # Gemini Pro入力
            'output_per_1k': 0.23   # Gemini Pro出力
        }
    }
    
    # トークン数概算(日本語は約0.7トークン/文字)
    input_tokens = estimate['effective_chars'] * 0.7
    output_tokens = estimate['final_qa_pairs'] * 150 * 0.7  # Q&A平均150文字
    
    rate = rates[provider]
    input_cost = (input_tokens / 1000) * rate['input_per_1k']
    output_cost = (output_tokens / 1000) * rate['output_per_1k']
    total_cost = input_cost + output_cost
    
    return {
        'provider': provider,
        'pdf_chars': pdf_chars,
        'estimated_qa_pairs': estimate['final_qa_pairs'],
        'input_tokens': int(input_tokens),
        'output_tokens': int(output_tokens),
        'input_cost': input_cost,
        'output_cost': output_cost,
        'total_cost': total_cost,
        'cost_per_qa': total_cost / max(estimate['final_qa_pairs'], 1)
    }

# コスト比較例
pdf_sizes = [50000, 200000, 500000, 1000000]

print("\nプロバイダー別コスト見積もり")
print("=" * 60)
for size in pdf_sizes:
    openai_cost = calculate_processing_cost(size, "openai")
    gemini_cost = calculate_processing_cost(size, "gemini") 
    
    print(f"\nPDF {size:,}文字 → 約{openai_cost['estimated_qa_pairs']}Q&Aペア:")
    print(f"  OpenAI: {openai_cost['total_cost']:.0f}円 ({openai_cost['cost_per_qa']:.1f}円/ペア)")
    print(f"  Gemini: {gemini_cost['total_cost']:.0f}円 ({gemini_cost['cost_per_qa']:.1f}円/ペア)")
    print(f"  節約額: {openai_cost['total_cost'] - gemini_cost['total_cost']:.0f}")

Q&Aペアの自動生成

OpenAI API、Gemini API、Claude APIを使用してQ&Aペアを生成します

import openai
import google.generativeai as genai
import json
from typing import List

class QAGenerator:
    def __init__(self, api_key: str, provider: str = "openai"):
        self.provider = provider
        
        if provider == "openai":
            self.client = openai.OpenAI(api_key=api_key)
        elif provider == "gemini":
            genai.configure(api_key=api_key)
            self.model = genai.GenerativeModel('gemini-1.5-pro')
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    def generate_qa_pairs(self, chunk: Dict, num_questions: int = 3) -> List[Dict]:
        """テキストチャンクからQ&Aペアを生成"""
        
        prompt = f"""
以下のテキストから、ファインチューニング用の質問・回答ペアを{num_questions}個生成してください。

テキスト
{chunk['content']}

コンテキスト:{chunk['context']}

要件
1. 質問は具体的で実用的なものにする
2. 回答はテキストの内容に基づいて正確に作成する
3. 様々な種類の質問(手順、定義、理由など)を含める
4. 日本語で生成する

出力形式
```json
[
  {
    "question": "質問文",
    "answer": "回答文",
    "context": "関連するコンテキスト"
  }
]```
"""
        
        if self.provider == "openai":
            return self._generate_with_openai(prompt)
        elif self.provider == "gemini":
            return self._generate_with_gemini(prompt)
    
    def _generate_with_openai(self, prompt: str) -> List[Dict]:
        """OpenAI APIを使用してQ&Aペアを生成"""
        try:
            response = self.client.chat.completions.create(
                model="gpt-4",
                messages=[
                    {"role": "system", "content": "あなたは教育用Q&Aペア生成の専門家です。"},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.7
            )
            
            # JSONを抽出
            content = response.choices[0].message.content
            return self._extract_json_from_response(content)
                
        except Exception as e:
            print(f"OpenAI Q&A生成エラー: {e}")
            return []
    
    def _generate_with_gemini(self, prompt: str) -> List[Dict]:
        """Gemini APIを使用してQ&Aペアを生成"""
        try:
            # Gemini用にプロンプトを調整
            gemini_prompt = f"""
あなたは教育用Q&Aペア生成の専門家です。

{prompt}

重要: 必ずJSON形式で回答してください。他の説明文は不要です。
"""
            
            response = self.model.generate_content(
                gemini_prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=0.7,
                    top_p=0.8,
                    top_k=40,
                    max_output_tokens=2048,
                )
            )
            
            # JSONを抽出
            content = response.text
            return self._extract_json_from_response(content)
                
        except Exception as e:
            print(f"Gemini Q&A生成エラー: {e}")
            return []
    
    def _extract_json_from_response(self, content: str) -> List[Dict]:
        """レスポンスからJSONを抽出"""
        import re
        
        # JSONブロックを探す
        json_match = re.search(r'```json\n(.*?)\n```', content, re.DOTALL)
        
        if json_match:
            try:
                qa_pairs = json.loads(json_match.group(1))
                return qa_pairs
            except json.JSONDecodeError:
                pass
        
        # JSONブロックがない場合、直接JSONを探す
        try:
            # [{ で始まる部分を探す
            start_idx = content.find('[{')
            if start_idx != -1:
                # 対応する ]を探す
                bracket_count = 0
                end_idx = start_idx
                
                for i, char in enumerate(content[start_idx:], start_idx):
                    if char == '[':
                        bracket_count += 1
                    elif char == ']':
                        bracket_count -= 1
                        if bracket_count == 0:
                            end_idx = i + 1
                            break
                
                json_str = content[start_idx:end_idx]
                qa_pairs = json.loads(json_str)
                return qa_pairs
        except json.JSONDecodeError:
            pass
        
        print("JSONの抽出に失敗しました")
        return []
    
    def generate_dataset(self, chunks: List[Dict]) -> List[Dict]:
        """全チャンクからデータセットを生成"""
        
        dataset = []
        
        for i, chunk in enumerate(chunks):
            print(f"処理中: {i+1}/{len(chunks)} - {chunk['title']} (Provider: {self.provider})")
            
            qa_pairs = self.generate_qa_pairs(chunk)
            
            for qa in qa_pairs:
                dataset.append({
                    'instruction': qa['question'],
                    'input': '',
                    'output': qa['answer'],
                    'source_title': chunk['title'],
                    'source_context': chunk['context'],
                    'generated_by': self.provider
                })
        
        return dataset

# 使用例の比較
def compare_qa_generation():
    """OpenAIとGeminiでのQ&A生成を比較"""
    
    sample_chunk = {
        'title': 'システム起動手順',
        'content': 'システムを起動するには、まず電源ボタンを押します。次に、ログイン画面が表示されるまで待ちます。ユーザー名とパスワードを入力してログインします。',
        'context': 'セクション: システム操作マニュアル'
    }
    
    # OpenAI版
    openai_generator = QAGenerator(api_key="openai-api-key", provider="openai")
    openai_qa = openai_generator.generate_qa_pairs(sample_chunk)
    
    # Gemini版
    gemini_generator = QAGenerator(api_key="gemini-api-key", provider="gemini")
    gemini_qa = gemini_generator.generate_qa_pairs(sample_chunk)
    
    print("=== OpenAI生成結果 ===")
    for qa in openai_qa:
        print(f"Q: {qa['question']}")
        print(f"A: {qa['answer']}\n")
    
    print("=== Gemini生成結果 ===")
    for qa in gemini_qa:
        print(f"Q: {qa['question']}")
        print(f"A: {qa['answer']}\n")

Gemini特有の設定とベストプラクティス

class GeminiQAGenerator:
    """Gemini専用のQ&A生成クラス"""
    
    def __init__(self, api_key: str):
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel('gemini-1.5-pro')
        
        # Gemini用の安全設定
        self.safety_settings = [
            {
                "category": "HARM_CATEGORY_HARASSMENT",
                "threshold": "BLOCK_MEDIUM_AND_ABOVE"
            },
            {
                "category": "HARM_CATEGORY_HATE_SPEECH", 
                "threshold": "BLOCK_MEDIUM_AND_ABOVE"
            },
            {
                "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
                "threshold": "BLOCK_MEDIUM_AND_ABOVE"
            },
            {
                "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
                "threshold": "BLOCK_MEDIUM_AND_ABOVE"
            }
        ]
    
    def generate_qa_with_context_awareness(self, chunk: Dict, previous_qa: List[Dict] = None) -> List[Dict]:
        """文脈を考慮したQ&A生成(Geminiの長文理解能力を活用)"""
        
        context_prompt = ""
        if previous_qa:
            context_prompt = "\n\n既に生成された質問(重複を避けてください):\n"
            for qa in previous_qa[-5:]:  # 直近5個の質問を参考
                context_prompt += f"- {qa['question']}\n"
        
        prompt = f"""
あなたは企業文書の専門家です。以下のテキストから高品質な質問・回答ペアを3個生成してください。

# テキスト内容
{chunk['content']}

# コンテキスト
{chunk['context']}

{context_prompt}

# 生成ガイドライン
1. 実務で実際に困る具体的な質問を作成
2. 回答は正確で actionable な内容にする
3. 技術用語は適切に説明する
4. 手順がある場合は順序立てて説明する

# 出力形式(JSON形式で回答)
[
  {
    "question": "質問文",
    "answer": "回答文", 
    "difficulty": "初級|中級|上級",
    "category": "手順|定義|トラブルシューティング|その他"
  }
]
"""
        
        try:
            response = self.model.generate_content(
                prompt,
                generation_config=genai.types.GenerationConfig(
                    temperature=0.7,
                    top_p=0.8,
                    top_k=40,
                    max_output_tokens=3000,
                    candidate_count=1
                ),
                safety_settings=self.safety_settings
            )
            
            # レスポンスの処理
            if response.text:
                qa_pairs = self._extract_and_validate_json(response.text)
                return qa_pairs
            else:
                print("Geminiからの応答が空です")
                return []
                
        except Exception as e:
            print(f"Gemini Q&A生成エラー: {e}")
            return []
    
    def _extract_and_validate_json(self, content: str) -> List[Dict]:
        """JSONの抽出と検証"""
        import re
        
        # JSON部分を抽出
        json_pattern = r'\[[\s\S]*?\]'
        matches = re.findall(json_pattern, content)
        
        for match in matches:
            try:
                qa_pairs = json.loads(match)
                
                # 各Q&Aペアを検証
                validated_pairs = []
                for qa in qa_pairs:
                    if self._validate_qa_pair(qa):
                        validated_pairs.append(qa)
                
                return validated_pairs
                
            except json.JSONDecodeError:
                continue
        
        print("有効なJSONが見つかりませんでした")
        return []
    
    def _validate_qa_pair(self, qa: Dict) -> bool:
        """Q&Aペアの妥当性検証"""
        required_fields = ['question', 'answer']
        
        # 必須フィールドチェック
        for field in required_fields:
            if field not in qa or not qa[field].strip():
                return False
        
        # 長さチェック
        if len(qa['question']) < 10 or len(qa['answer']) < 20:
            return False
        
        # 質問形式チェック
        question = qa['question'].strip()
        if not (question.endswith('?') or question.endswith('') or 
                '' in question or 'どの' in question or 'いつ' in question or
                'どこ' in question or 'なぜ' in question or 'どうやって' in question):
            return False
        
        return True

データ形式の統一

def format_for_finetuning(dataset: List[Dict], format_type: str = "alpaca") -> List[Dict]:
    """ファインチューニング用の形式に変換"""
    
    formatted_data = []
    
    for item in dataset:
        if format_type == "alpaca":
            # Alpaca形式
            formatted_item = {
                "instruction": item['instruction'],
                "input": item.get('input', ''),
                "output": item['output']
            }
        elif format_type == "chat":
            # Chat形式
            formatted_item = {
                "messages": [
                    {"role": "user", "content": item['instruction']},
                    {"role": "assistant", "content": item['output']}
                ]
            }
        elif format_type == "custom":
            # カスタム形式(企業固有要件に対応)
            formatted_item = {
                "prompt": f"質問: {item['instruction']}\n回答:",
                "completion": f" {item['output']}",
                "metadata": {
                    "source": item.get('source_title', ''),
                    "context": item.get('source_context', '')
                }
            }
        
        formatted_data.append(formatted_item)
    
    return formatted_data

def save_dataset(dataset: List[Dict], output_path: str):
    """データセットをJSONLファイルに保存"""
    
    with open(output_path, 'w', encoding='utf-8') as f:
        for item in dataset:
            f.write(json.dumps(item, ensure_ascii=False) + '\n')

品質向上とデータクリーニング

データ品質評価

import re
from textstat import flesch_reading_ease

class DataQualityChecker:
    def __init__(self):
        self.min_length = 10
        self.max_length = 2000
        self.min_readability = 30
    
    def check_quality(self, qa_pair: Dict) -> Dict:
        """Q&Aペアの品質をチェック"""
        
        issues = []
        score = 100
        
        question = qa_pair['instruction']
        answer = qa_pair['output']
        
        # 長さチェック
        if len(question) < self.min_length:
            issues.append("質問が短すぎます")
            score -= 20
        
        if len(answer) < self.min_length:
            issues.append("回答が短すぎます")
            score -= 20
        
        if len(answer) > self.max_length:
            issues.append("回答が長すぎます")
            score -= 10
        
        # 可読性チェック
        try:
            readability = flesch_reading_ease(answer)
            if readability < self.min_readability:
                issues.append("回答の可読性が低いです")
                score -= 15
        except:
            pass
        
        # 重複チェック
        if question.lower() in answer.lower():
            issues.append("質問と回答が重複しています")
            score -= 10
        
        # 不完全な文章チェック
        if not question.endswith(('?', '', '', '', ':')):
            issues.append("質問の形式が不適切です")
            score -= 10
        
        return {
            'score': max(score, 0),
            'issues': issues,
            'passed': score >= 60
        }
    
    def filter_dataset(self, dataset: List[Dict]) -> List[Dict]:
        """品質基準に基づいてデータセットをフィルタリング"""
        
        filtered_data = []
        quality_report = {
            'total': len(dataset),
            'passed': 0,
            'failed': 0,
            'issues_summary': {}
        }
        
        for item in dataset:
            quality_result = self.check_quality(item)
            
            if quality_result['passed']:
                filtered_data.append(item)
                quality_report['passed'] += 1
            else:
                quality_report['failed'] += 1
                
            # 問題の集計
            for issue in quality_result['issues']:
                if issue not in quality_report['issues_summary']:
                    quality_report['issues_summary'][issue] = 0
                quality_report['issues_summary'][issue] += 1
        
        print(f"品質フィルタリング結果:")
        print(f"総数: {quality_report['total']}")
        print(f"合格: {quality_report['passed']}")
        print(f"不合格: {quality_report['failed']}")
        print(f"合格率: {quality_report['passed']/quality_report['total']*100:.1f}%")
        
        return filtered_data

重複除去とデータ統合

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class DuplicateRemover:
    def __init__(self, similarity_threshold: float = 0.8):
        self.similarity_threshold = similarity_threshold
        self.vectorizer = TfidfVectorizer(ngram_range=(1, 2))
    
    def remove_duplicates(self, dataset: List[Dict]) -> List[Dict]:
        """類似度に基づいて重複を除去"""
        
        # 質問文のベクトル化
        questions = [item['instruction'] for item in dataset]
        
        try:
            question_vectors = self.vectorizer.fit_transform(questions)
            similarity_matrix = cosine_similarity(question_vectors)
            
            # 重複除去
            to_remove = set()
            
            for i in range(len(dataset)):
                if i in to_remove:
                    continue
                    
                for j in range(i + 1, len(dataset)):
                    if j in to_remove:
                        continue
                        
                    if similarity_matrix[i][j] > self.similarity_threshold:
                        # より短い質問を除去
                        if len(questions[i]) < len(questions[j]):
                            to_remove.add(i)
                        else:
                            to_remove.add(j)
            
            # 重複を除去したデータセットを作成
            filtered_dataset = [
                dataset[i] for i in range(len(dataset)) 
                if i not in to_remove
            ]
            
            print(f"重複除去: {len(dataset)} -> {len(filtered_dataset)} (-{len(to_remove)})")
            
            return filtered_dataset
            
        except Exception as e:
            print(f"重複除去エラー: {e}")
            return dataset

実装例

完全な処理パイプライン

import os
from pathlib import Path

class DocumentToDatasetPipeline:
    def __init__(self, api_key: str, provider: str = "openai"):
        self.qa_generator = QAGenerator(api_key, provider)
        self.quality_checker = DataQualityChecker()
        self.duplicate_remover = DuplicateRemover()
        self.provider = provider
    
    def process_documents(self, input_dir: str, output_path: str):
        """文書フォルダを処理してデータセットを作成"""
        
        all_chunks = []
        input_path = Path(input_dir)
        
        # 1. 文書からテキスト抽出
        for file_path in input_path.glob("**/*"):
            if file_path.suffix.lower() == '.pdf':
                print(f"PDF処理中: {file_path}")
                extracted = extract_pdf_text_with_structure(str(file_path))
                sections = split_into_sections(extracted['markdown_text'])
                chunks = create_meaningful_chunks(sections)
                all_chunks.extend(chunks)
                
            elif file_path.suffix.lower() == '.docx':
                print(f"Word処理中: {file_path}")
                extracted = extract_word_text_with_structure(str(file_path))
                # Word文書用の処理を実装
                # ...
        
        print(f"総チャンク数: {len(all_chunks)}")
        
        # 2. Q&Aペア生成
        print("Q&Aペア生成中...")
        dataset = self.qa_generator.generate_dataset(all_chunks)
        print(f"生成されたQ&Aペア数: {len(dataset)}")
        
        # 3. 品質チェック
        print("品質チェック中...")
        filtered_dataset = self.quality_checker.filter_dataset(dataset)
        
        # 4. 重複除去
        print("重複除去中...")
        final_dataset = self.duplicate_remover.remove_duplicates(filtered_dataset)
        
        # 5. 形式変換と保存
        print("データセット保存中...")
        formatted_dataset = format_for_finetuning(final_dataset, "alpaca")
        save_dataset(formatted_dataset, output_path)
        
        print(f"完了: {output_path}{len(final_dataset)} 件のデータを保存")
        
        return final_dataset

# 使用例
if __name__ == "__main__":
    # OpenAI使用例
    openai_pipeline = DocumentToDatasetPipeline(
        api_key="your-openai-api-key",
        provider="openai"
    )
    
    # Gemini使用例  
    gemini_pipeline = DocumentToDatasetPipeline(
        api_key="your-gemini-api-key",
        provider="gemini"
    )
    
    # 処理実行(プロバイダーを選択)
    selected_pipeline = openai_pipeline  # または gemini_pipeline
    
    selected_pipeline.process_documents(
        input_dir="./documents",
        output_path="./finetuning_dataset.jsonl"
    )

バッチ処理用スクリプト

#!/usr/bin/env python3
"""
バッチ処理用スクリプト
使用法: python batch_process.py --input documents/ --output dataset.jsonl --api-key YOUR_KEY
"""

import argparse
import json
from pathlib import Path

def main():
    parser = argparse.ArgumentParser(description='文書からファインチューニングデータセットを作成')
    parser.add_argument('--input', required=True, help='入力文書フォルダ')
    parser.add_argument('--output', required=True, help='出力JSONLファイル')
    parser.add_argument('--api-key', required=True, help='API キー')
    parser.add_argument('--provider', default='openai', choices=['openai', 'gemini'], help='LLMプロバイダー')
    parser.add_argument('--format', default='alpaca', choices=['alpaca', 'chat', 'custom'])
    parser.add_argument('--questions-per-chunk', type=int, default=3, help='チャンクあたりの質問数')
    parser.add_argument('--max-chunk-length', type=int, default=1000, help='チャンクの最大長')
    
    args = parser.parse_args()
    
    # パイプライン実行
    pipeline = DocumentToDatasetPipeline(args.api_key, args.provider)
    pipeline.qa_generator.questions_per_chunk = args.questions_per_chunk
    
    dataset = pipeline.process_documents(args.input, args.output)
    
    # 統計情報出力
    stats = {
        'total_qa_pairs': len(dataset),
        'average_question_length': sum(len(item['instruction']) for item in dataset) / len(dataset),
        'average_answer_length': sum(len(item['output']) for item in dataset) / len(dataset)
    }
    
    print("\n=== 統計情報 ===")
    for key, value in stats.items():
        print(f"{key}: {value:.2f}")

if __name__ == "__main__":
    main()

実装時のハマったポイント

PDF・Word抽出でのハマったポイント

1. PDF抽出の問題

# 失敗例
def bad_pdf_extraction(pdf_path):
    # PyPDFだけだと表やレイアウトが崩れる
    import PyPDF2
    with open(pdf_path, 'rb') as file:
        reader = PyPDF2.PdfReader(file)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
    return text  # 文字が重複したり順序が崩れる

# 改善版:複数ライブラリの組み合わせ
def robust_pdf_extraction(pdf_path):
    results = {}
    errors = []
    
    # PyMuPDF4LLM(最優先)
    try:
        import pymupdf4llm
        results['pymupdf4llm'] = pymupdf4llm.to_markdown(pdf_path)
    except Exception as e:
        errors.append(f"PyMuPDF4LLM エラー: {e}")
    
    # pdfplumber(表データ用)
    try:
        import pdfplumber
        with pdfplumber.open(pdf_path) as pdf:
            text_parts = []
            for page in pdf.pages:
                text_parts.append(page.extract_text())
            results['pdfplumber'] = '\n'.join(text_parts)
    except Exception as e:
        errors.append(f"pdfplumber エラー: {e}")
    
    # フォールバック:PyMuPDF
    try:
        import fitz
        doc = fitz.open(pdf_path)
        text_parts = []
        for page in doc:
            text_parts.append(page.get_text())
        results['pymupdf'] = '\n'.join(text_parts)
        doc.close()
    except Exception as e:
        errors.append(f"PyMuPDF エラー: {e}")
    
    # 最良の結果を選択
    if 'pymupdf4llm' in results and results['pymupdf4llm'].strip():
        return results['pymupdf4llm']
    elif 'pdfplumber' in results and results['pdfplumber'].strip():
        return results['pdfplumber']
    elif 'pymupdf' in results and results['pymupdf'].strip():
        return results['pymupdf']
    else:
        raise Exception(f"全ての抽出方法が失敗: {errors}")

2. スキャンPDFの問題

def handle_scanned_pdf(pdf_path):
    """スキャンPDFのOCR処理"""
    import pytesseract
    from pdf2image import convert_from_path
    
    try:
        # まず通常の抽出を試行
        text = robust_pdf_extraction(pdf_path)
        
        # テキストが少なすぎる場合はOCR実行
        if len(text.strip()) < 100:
            print("スキャンPDFを検出、OCR処理を開始...")
            
            # PDFを画像に変換
            images = convert_from_path(pdf_path)
            
            ocr_text = ""
            for i, image in enumerate(images):
                print(f"OCR処理中: {i+1}/{len(images)}")
                
                # 日本語OCR実行
                page_text = pytesseract.image_to_string(
                    image, 
                    lang='jpn+eng',  # 日本語+英語
                    config='--psm 6'  # uniform text block
                )
                ocr_text += f"\n--- ページ {i+1} ---\n{page_text}"
            
            return ocr_text
        
        return text
        
    except Exception as e:
        raise Exception(f"スキャンPDF処理エラー: {e}")

# 使用例
try:
    text = handle_scanned_pdf("scanned_document.pdf")
except Exception as e:
    print(f"エラー: {e}")
    # 手動確認が必要

API関連のハマったポイント

1. レート制限とコスト管理

import time
import random
from functools import wraps

def rate_limit_handler(max_retries=3, base_delay=1):
    """レート制限を考慮したデコレーター"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                
                except Exception as e:
                    error_str = str(e).lower()
                    
                    if 'rate limit' in error_str or 'quota' in error_str:
                        delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                        print(f"レート制限検出、{delay:.1f}秒待機...")
                        time.sleep(delay)
                        continue
                    
                    elif 'insufficient quota' in error_str:
                        raise Exception("API quota exceeded - 請求設定を確認してください")
                    
                    else:
                        raise e
            
            raise Exception(f"最大リトライ数({max_retries})に達しました")
        return wrapper
    return decorator

class CostTracker:
    """API使用量とコスト追跡"""
    def __init__(self):
        self.usage = {
            'openai': {'tokens': 0, 'cost': 0},
            'gemini': {'tokens': 0, 'cost': 0}
        }
        # 2024年11月時点の概算料金($1 = 150円として計算)
        self.rates = {
            'openai': {'input': 0.015, 'output': 0.045},  # GPT-4 per 1K tokens (円)
            'gemini': {'input': 0.00075, 'output': 0.0015}  # Gemini Pro per 1K tokens (円)
        }
    
    def track_usage(self, provider: str, input_tokens: int, output_tokens: int):
        """使用量を追跡"""
        input_cost = (input_tokens / 1000) * self.rates[provider]['input']
        output_cost = (output_tokens / 1000) * self.rates[provider]['output']
        
        self.usage[provider]['tokens'] += input_tokens + output_tokens
        self.usage[provider]['cost'] += input_cost + output_cost
        
        total_cost = sum(data['cost'] for data in self.usage.values())
        
        if total_cost > 1000:  # 1000円を超えたら警告
            print(f"warnierror: 累計コスト: {total_cost:.2f}円 - 予算を確認してください")
    
    def get_summary(self):
        """使用量サマリーを表示"""
        print("\n=== API使用量サマリー ===")
        total_cost = 0
        for provider, data in self.usage.items():
            print(f"{provider}: {data['tokens']:,} tokens, {data['cost']:.2f}")
            total_cost += data['cost']
        print(f"総コスト: {total_cost:.2f}")

2. APIレスポンス品質の問題

class ResponseValidator:
    """APIレスポンスの妥当性検証"""
    
    def __init__(self):
        self.common_errors = [
            "申し訳ございませんが",
            "お手伝いできません",
            "適切な回答を提供できません",
            "I cannot", "I'm sorry",
            "as an AI", "as a language model"
        ]
    
    def validate_response(self, response_text: str, chunk: Dict) -> Dict:
        """レスポンスの品質をチェック"""
        issues = []
        
        # エラーメッセージの検出
        for error_phrase in self.common_errors:
            if error_phrase in response_text:
                issues.append(f"エラーメッセージ検出: {error_phrase}")
        
        # JSON形式チェック
        try:
            import json
            import re
            json_match = re.search(r'\[.*\]', response_text, re.DOTALL)
            if json_match:
                json.loads(json_match.group(0))
            else:
                issues.append("JSON形式が見つからない")
        except json.JSONDecodeError:
            issues.append("無効なJSON形式")
        
        # 内容の関連性チェック
        chunk_keywords = set(chunk['content'].lower().split())
        response_keywords = set(response_text.lower().split())
        
        # 共通キーワードが少なすぎる場合
        common_ratio = len(chunk_keywords & response_keywords) / len(chunk_keywords)
        if common_ratio < 0.1:
            issues.append("元文書との関連性が低い")
        
        return {
            'valid': len(issues) == 0,
            'issues': issues,
            'confidence': max(0, 1 - len(issues) * 0.3)
        }

# 品質チェック付きQ&A生成(前述のrate_limit_handlerとDataQualityCheckerを組み合わせ)
@rate_limit_handler(max_retries=3)
def safe_qa_generation(generator, chunk, validator, max_attempts=3):
    """エラーハンドリングと品質チェックを統合したQ&A生成"""
    
    for attempt in range(max_attempts):
        try:
            qa_pairs = generator.generate_qa_pairs(chunk)
            
            # 前述の品質チェック機能を活用
            for qa in qa_pairs:
                validation = validator.validate_response(qa.get('answer', ''), chunk)
                if not validation['valid']:
                    print(f"品質問題検出: {validation['issues']}")
                    if attempt < max_attempts - 1:
                        continue
            return qa_pairs
            
        except Exception as e:
            print(f"生成エラー (試行 {attempt + 1}): {e}")
            if attempt == max_attempts - 1:
                raise
    return []

データ品質・形式のハマったポイント

1. 文字化け・エンコーディング問題

import chardet
import codecs

def detect_and_read_file(file_path: str) -> str:
    """エンコーディングを自動検出してファイルを読み込み"""
    
    # バイナリでファイルを読んでエンコーディングを検出
    with open(file_path, 'rb') as f:
        raw_data = f.read()
        result = chardet.detect(raw_data)
        encoding = result['encoding']
        confidence = result['confidence']
    
    print(f"検出エンコーディング: {encoding} (信頼度: {confidence:.2f})")
    
    # 複数のエンコーディングを試行
    encodings_to_try = [encoding, 'utf-8', 'shift_jis', 'cp932', 'utf-16']
    
    for enc in encodings_to_try:
        if not enc:
            continue
        try:
            with open(file_path, 'r', encoding=enc) as f:
                content = f.read()
                
            # 文字化けチェック
            if '?' not in content[:1000] and '' not in content[:1000]:
                print(f"成功: {enc}")
                return content
                
        except (UnicodeDecodeError, UnicodeError):
            continue
    
    raise Exception(f"エンコーディングの検出に失敗: {file_path}")

def clean_extracted_text(text: str) -> str:
    """抽出テキストのクリーニング"""
    import re
    
    # 不要な改行・空白の正規化
    text = re.sub(r'\n\s*\n\s*\n', '\n\n', text)  # 3つ以上の改行を2つに
    text = re.sub(r'[ \t]+', ' ', text)  # 複数スペースを1つに
    text = re.sub(r' +\n', '\n', text)  # 行末スペースを削除
    
    # 特殊文字の処理
    text = text.replace('\u3000', ' ')  # 全角スペース
    text = text.replace('\ufeff', '')  # BOM
    
    # ページ番号・ヘッダー・フッターの除去
    text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
    text = re.sub(r'^ページ\s*\d+.*$', '', text, flags=re.MULTILINE)
    
    return text.strip()

2. データセット形式・品質の問題

def validate_dataset_format(dataset: List[Dict], target_format: str = "alpaca") -> Dict:
    """データセット形式の検証"""
    
    validation_results = {
        'total_items': len(dataset),
        'valid_items': 0,
        'errors': [],
        'warnings': []
    }
    
    required_fields = {
        'alpaca': ['instruction', 'input', 'output'],
        'chat': ['messages'],
        'custom': ['prompt', 'completion']
    }
    
    for i, item in enumerate(dataset):
        item_errors = []
        
        # 必須フィールドチェック
        for field in required_fields.get(target_format, []):
            if field not in item:
                item_errors.append(f"必須フィールド欠如: {field}")
            elif not item[field] or (isinstance(item[field], str) and not item[field].strip()):
                item_errors.append(f"空のフィールド: {field}")
        
        # 文字数チェック
        if target_format == 'alpaca':
            if len(item.get('instruction', '')) < 10:
                item_errors.append("instructionが短すぎます")
            if len(item.get('output', '')) < 20:
                item_errors.append("outputが短すぎます")
            if len(item.get('output', '')) > 4000:
                validation_results['warnings'].append(f"項目{i}: outputが長すぎます({len(item['output'])}文字)")
        
        # 特殊文字・制御文字チェック
        for field in ['instruction', 'output']:
            if field in item:
                text = item[field]
                if any(ord(char) < 32 and char not in '\n\t' for char in text):
                    item_errors.append(f"{field}に制御文字が含まれています")
        
        if item_errors:
            validation_results['errors'].append(f"項目{i}: {', '.join(item_errors)}")
        else:
            validation_results['valid_items'] += 1
    
    validation_results['success_rate'] = validation_results['valid_items'] / validation_results['total_items']
    
    return validation_results

# 品質レポート生成
def generate_quality_report(dataset: List[Dict]) -> None:
    """データセット品質レポートを生成"""
    
    report = {
        'total_qa_pairs': len(dataset),
        'avg_question_length': 0,
        'avg_answer_length': 0,
        'question_types': {},
        'answer_types': {},
        'potential_issues': []
    }
    
    question_lengths = []
    answer_lengths = []
    
    for item in dataset:
        question = item.get('instruction', '')
        answer = item.get('output', '')
        
        question_lengths.append(len(question))
        answer_lengths.append(len(answer))
        
        # 質問タイプの分類
        if any(word in question for word in ['', 'なに', 'どの', 'どれ']):
            report['question_types']['What系'] = report['question_types'].get('What系', 0) + 1
        elif any(word in question for word in ['いつ', '時間', '期間']):
            report['question_types']['When系'] = report['question_types'].get('When系', 0) + 1
        elif any(word in question for word in ['どこ', '場所', '位置']):
            report['question_types']['Where系'] = report['question_types'].get('Where系', 0) + 1
        elif any(word in question for word in ['なぜ', '理由', '原因']):
            report['question_types']['Why系'] = report['question_types'].get('Why系', 0) + 1
        elif any(word in question for word in ['どうやって', '方法', '手順']):
            report['question_types']['How系'] = report['question_types'].get('How系', 0) + 1
        else:
            report['question_types']['その他'] = report['question_types'].get('その他', 0) + 1
        
        # 潜在的な問題を検出
        if question.lower() == answer.lower():
            report['potential_issues'].append("質問と回答が同一")
        
        if len(set(question.split()) & set(answer.split())) / len(set(question.split())) > 0.8:
            report['potential_issues'].append("質問と回答の類似度が高すぎる")
    
    report['avg_question_length'] = sum(question_lengths) / len(question_lengths)
    report['avg_answer_length'] = sum(answer_lengths) / len(answer_lengths)
    
    # レポート出力
    print("\n" + "="*50)
    print("データセット品質レポート")
    print("="*50)
    print(f"総Q&Aペア数: {report['total_qa_pairs']}")
    print(f"平均質問長: {report['avg_question_length']:.1f}文字")
    print(f"平均回答長: {report['avg_answer_length']:.1f}文字")
    
    print("\n質問タイプ分布:")
    for q_type, count in report['question_types'].items():
        percentage = (count / report['total_qa_pairs']) * 100
        print(f"  {q_type}: {count}個 ({percentage:.1f}%)")
    
    if report['potential_issues']:
        print(f"\n潜在的な問題: {len(report['potential_issues'])}")
        for issue in set(report['potential_issues'])[:5]:
            print(f"  - {issue}")

ファインチューニング実行時の問題

1. メモリ・計算リソース不足

import psutil
import os

def check_system_requirements(dataset_size: int) -> Dict:
    """システム要件をチェック"""
    
    # メモリ使用量チェック
    memory = psutil.virtual_memory()
    available_gb = memory.available / (1024**3)
    
    # 推奨メモリ計算(データセットサイズに基づく)
    recommended_memory = max(8, dataset_size * 0.001)  # 最低8GB
    
    # ディスク容量チェック
    disk = psutil.disk_usage('/')
    available_disk_gb = disk.free / (1024**3)
    
    requirements = {
        'memory_status': 'OK' if available_gb >= recommended_memory else 'INSUFFICIENT',
        'available_memory_gb': available_gb,
        'recommended_memory_gb': recommended_memory,
        'disk_status': 'OK' if available_disk_gb >= 10 else 'INSUFFICIENT',
        'available_disk_gb': available_disk_gb,
        'warnings': []
    }
    
    if requirements['memory_status'] == 'INSUFFICIENT':
        requirements['warnings'].append(f"メモリ不足: {available_gb:.1f}GB利用可能、{recommended_memory:.1f}GB推奨")
    
    if requirements['disk_status'] == 'INSUFFICIENT':
        requirements['warnings'].append(f"ディスク容量不足: {available_disk_gb:.1f}GB利用可能、10GB以上推奨")
    
    return requirements

def optimize_dataset_for_resources(dataset: List[Dict], max_items: int = None) -> List[Dict]:
    """リソースに応じてデータセットを最適化"""
    
    if max_items and len(dataset) > max_items:
        print(f"データセットを{len(dataset)}から{max_items}に削減")
        
        # 品質スコアでソートして上位を選択
        # 前述のDataQualityCheckerクラスを使用
        quality_checker = DataQualityChecker()
        scored_dataset = []
        
        for item in dataset:
            quality_result = quality_checker.check_quality(item)
            scored_dataset.append({
                'item': item,
                'score': quality_result['score']
            })
        
        scored_dataset.sort(key=lambda x: x['score'], reverse=True)
        return [item['item'] for item in scored_dataset[:max_items]]
    
    return dataset

2. ファインチューニング設定の問題

def generate_finetuning_config(dataset_size: int, model_type: str = "gpt-3.5-turbo") -> Dict:
    """データセットサイズに応じたファインチューニング設定を生成"""
    
    # データセットサイズに基づく推奨設定
    if dataset_size < 100:
        config = {
            'n_epochs': 3,
            'batch_size': 1,
            'learning_rate_multiplier': 2.0,
            'warnings': ['データセットサイズが小さいため、オーバーフィッティングに注意']
        }
    elif dataset_size < 1000:
        config = {
            'n_epochs': 3,
            'batch_size': 4,
            'learning_rate_multiplier': 1.5,
            'warnings': []
        }
    else:
        config = {
            'n_epochs': 2,
            'batch_size': 8,
            'learning_rate_multiplier': 1.0,
            'warnings': []
        }
    
    # モデル固有の調整
    if model_type == "gpt-4":
        config['learning_rate_multiplier'] *= 0.5  # GPT-4は学習率を下げる
        config['warnings'].append('GPT-4は学習率を下げています')
    
    return config

# 事前チェックリスト
def pre_finetuning_checklist(dataset_path: str) -> bool:
    """ファインチューニング前のチェックリスト"""
    
    checks = []
    
    # 1. ファイル存在チェック
    if not os.path.exists(dataset_path):
        checks.append(f"error: データセットファイルが見つかりません: {dataset_path}")
        return False
    
    # 2. ファイル形式チェック
    try:
        with open(dataset_path, 'r', encoding='utf-8') as f:
            first_line = f.readline()
            json.loads(first_line)
        checks.append("success: JSONL形式が正しい")
    except:
        checks.append("error: JSONL形式が無効")
        return False
    
    # 3. データセットサイズチェック
    with open(dataset_path, 'r', encoding='utf-8') as f:
        line_count = sum(1 for line in f)
    
    if line_count < 10:
        checks.append("error: データセットが小さすぎます(最低10件必要)")
        return False
    else:
        checks.append(f"success: データセットサイズ: {line_count}")
    
    # 4. システムリソースチェック
    requirements = check_system_requirements(line_count)
    if requirements['warnings']:
        for warning in requirements['warnings']:
            checks.append(f"warnierror: {warning}")
    else:
        checks.append("success: システムリソース十分")
    
    # 5. API キーチェック(環境変数)
    if os.getenv('OPENAI_API_KEY'):
        checks.append("success: OpenAI API キー設定済み")
    else:
        checks.append("error: OpenAI API キーが設定されていません")
    
    print("\n" + "="*50)
    print("ファインチューニング事前チェック")
    print("="*50)
    for check in checks:
        print(check)
    
    failed_checks = [c for c in checks if c.startswith("error:")]
    return len(failed_checks) == 0

この対応により、私は実際に遭遇した問題を改善でき、より確実にファインチューニングデータセットを作成できるようになりました。

他のハマりどころや、その解決策についても、良かったらコメントで記載いただけると嬉しいです。

APIプロバイダー比較とベストプラクティス

OpenAI vs Gemini の特徴比較

特徴 OpenAI (GPT-4) Gemini (1.5 Pro)
コスト 比較的高価 比較的安価
応答速度 高速 中程度
日本語品質 優秀 優秀
文脈理解 優秀 非常に優秀(長文)
API制限 レート制限あり レート制限あり
出力の一貫性 高い 中程度

プロバイダー選択の指針

def choose_provider_based_on_requirements(
    document_length: int,
    budget_limit: float,
    quality_priority: str
) -> str:
    """要件に基づいてプロバイダーを選択"""
    
    if quality_priority == "consistency":
        return "openai"
    elif quality_priority == "cost":
        return "gemini"
    elif document_length > 10000:  # 長文の場合
        return "gemini"
    elif budget_limit < 100:  # 予算制限がある場合
        return "gemini"
    else:
        return "openai"

# 使用例
recommended_provider = choose_provider_based_on_requirements(
    document_length=5000,
    budget_limit=50,
    quality_priority="cost"
)
print(f"推奨プロバイダー: {recommended_provider}")

ハイブリッド処理の実装

class HybridQAGenerator:
    """複数のプロバイダーを使い分けるジェネレーター"""
    
    def __init__(self, openai_key: str, gemini_key: str):
        self.openai_generator = QAGenerator(openai_key, "openai")
        self.gemini_generator = QAGenerator(gemini_key, "gemini")
    
    def generate_qa_pairs_hybrid(self, chunk: Dict) -> List[Dict]:
        """文書の特性に応じてプロバイダーを選択"""
        
        content_length = len(chunk['content'])
        
        if content_length > 2000:
            # 長文はGeminiが得意
            print(f"長文のためGeminiを使用: {content_length}文字")
            return self.gemini_generator.generate_qa_pairs(chunk)
        else:
            # 短文はOpenAIで安定した品質
            print(f"短文のためOpenAIを使用: {content_length}文字")
            return self.openai_generator.generate_qa_pairs(chunk)
    
    def generate_qa_pairs_ensemble(self, chunk: Dict) -> List[Dict]:
        """両方のプロバイダーから生成し、最良のものを選択"""
        
        openai_qa = self.openai_generator.generate_qa_pairs(chunk)
        gemini_qa = self.gemini_generator.generate_qa_pairs(chunk)
        
        # 品質評価して最良のものを選択
        all_qa = []
        
        for qa in openai_qa:
            qa['provider'] = 'openai'
            all_qa.append(qa)
        
        for qa in gemini_qa:
            qa['provider'] = 'gemini' 
            all_qa.append(qa)
        
        # 品質スコアに基づいてソート
        quality_checker = DataQualityChecker()
        scored_qa = []
        
        for qa in all_qa:
            quality_result = quality_checker.check_quality({
                'instruction': qa['question'],
                'output': qa['answer']
            })
            qa['quality_score'] = quality_result['score']
            scored_qa.append(qa)
        
        # 上位N個を選択
        scored_qa.sort(key=lambda x: x['quality_score'], reverse=True)
        return scored_qa[:3]  # 上位3個

応用例とベストプラクティス

企業文書に特化したカスタマイズ

class EnterpriseDocumentProcessor(DocumentToDatasetPipeline):
    """企業文書に特化した処理クラス"""
    
    def __init__(self, api_key: str, company_context: str):
        super().__init__(api_key)
        self.company_context = company_context
    
    def generate_company_specific_qa(self, chunk: Dict) -> List[Dict]:
        """企業固有のQ&Aを生成"""
        
        prompt = f"""
あなたは{self.company_context}の社内文書を理解する専門家です。
以下のテキストから、新入社員や社員が実際に困るであろう質問と回答を生成してください。

テキスト
{chunk['content']}

特に以下の観点で質問を作成
1. 手順・プロセスに関する質問
2. 用語・概念の定義
3. トラブルシューティング
4. 関連部署・連絡先
5. 期限・スケジュール

出力は実用的で具体的な内容にしてください。
"""
        # 生成処理...
        return qa_pairs

多言語対応

class MultilingualQAGenerator:
    """多言語Q&A生成クラス"""
    
    def __init__(self, api_key: str, target_languages: List[str]):
        self.client = openai.OpenAI(api_key=api_key)
        self.target_languages = target_languages
    
    def generate_multilingual_qa(self, chunk: Dict) -> List[Dict]:
        """多言語でQ&Aを生成"""
        
        multilingual_dataset = []
        
        for lang in self.target_languages:
            qa_pairs = self.generate_qa_in_language(chunk, lang)
            multilingual_dataset.extend(qa_pairs)
        
        return multilingual_dataset

まとめ

本記事では、PDF・Word文書からファインチューニング用の学習データを効率的に作成する実践的な方法を、豊富なコード例とともに紹介しました。

🎯 重要なポイント

  1. 適切なライブラリ選択: PyMuPDF4LLM、python-docxなど、文書形式に応じた最適なツールを使用
  2. 構造保持: 見出し、セクション、表などの構造情報を活用したデータセット作成
  3. マルチモーダル対応: テキスト + 画像の統合データセットによるVLMファインチューニング対応
  4. プロバイダー使い分け: OpenAI(安定性重視)、Gemini(コスト・長文対応)の特徴に応じた選択
  5. 徹底的な品質管理: 自動検証 + 手動レビューによる高品質データセット構築
  6. 実装の堅牢性: エラーハンドリング、レート制限対応、フォールバック機能

📊 APIプロバイダー選択指針

用途 OpenAI推奨 Gemini推奨
品質重視 ✅ 企業重要文書
コスト重視 ✅ 大量文書処理
長文対応 ❌ 1000文字以下 ✅ 2000文字以上
実験段階 ✅ プロトタイプ

⚠️ 実装時の必須チェック項目

  • API利用コスト: 事前見積もりと使用量監視の実装
  • レート制限対応: 指数バックオフとリトライ機能
  • 品質保証: 自動検証 + 手動レビューの両立
  • セキュリティ: 機密情報の適切な取り扱い
  • フォールバック: 複数プロバイダー対応によるリスク分散

🚀 活用効果

企業の文書資産を活用したファインチューニングにより、以下の効果が期待できます:

  • 業務効率化: 部門特化型AIによる自動化・支援
  • 知識継承: ベテラン社員のノウハウの形式知化
  • 競争優位: 独自データによる差別化されたAIシステム
  • コスト削減: 汎用AIからの脱却による効率的な運用

本記事のコードとノウハウを活用して、あなたの組織独自のAIシステム構築にチャレンジしてください!


今後の展望

ファインチューニング技術の民主化

本記事で紹介した手法により、これまで機械学習の専門知識が必要だった高度なAIカスタマイズが、文書を持つ全ての組織・個人にとって身近な技術となります。

期待される変化

企業レベルでの活用

  • 各部門が業務知識を活かした特化型AI(営業支援、法務、カスタマーサポートなど)を構築
  • 中小企業でも専門分野に特化したAIシステムの導入が可能に
  • 暗黙知の形式知化により、ベテラン社員の経験・ノウハウを組織全体で共有

個人レベルでの活用

  • 研究者、クリエイター、専門家が自分専用のAIアシスタントを作成
  • 個人の知識・経験を学習データ化し、後継者育成や知識継承に活用

新しいビジネス機会

  • 専門知識を学習したAIモデルの販売・ライセンス
  • 業界特化型AIの構築・運用支援サービス
  • ドメイン特化AIのマーケットプレイス

まとめ

ファインチューニングの簡素化は、知識を持つ全ての人がAIクリエイターになれる時代の到来を意味します。あなたの専門知識や業務経験が、次世代のAIシステムを生み出す貴重な資源となります。

本記事の手法を活用して、あなただけのAIシステムの構築にチャレンジしてみてください。

14
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?