#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Tatoeba日本語文抽出スクリプト【完全版】
言語学的に妥当な3軸分類でバランスの取れたデータセットを作成:
1. 文の機能(質問/平叙/命令/条件/推量)
2. 丁寧度(丁寧体/普通体/その他)
3. 文の複雑度(単文/複文/列挙)
Author: Claude
Version: 2.0
"""
import csv
import re
import random
import argparse
import sys
from collections import defaultdict, Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import statistics
# ============================================================================
# データクラス定義
# ============================================================================
@dataclass
class SentenceFeatures:
"""文の言語的特徴を保持するデータクラス"""
text: str
# 主要な分類軸(バランス抽出に使用)
function: str # 文の機能: question/statement/imperative/conditional/volition
politeness: str # 丁寧度: polite/plain/other
complexity: str # 複雑度: simple/complex/enumeration
# 補助的特徴(統計情報として記録)
has_quote: bool = False # 引用符の有無
has_numeric: bool = False # 数値の有無
has_katakana: bool = False # カタカナ語(3文字以上)の有無
char_count: int = 0 # 文字数
def get_bucket_key(self) -> Tuple[str, str, str]:
"""バランス抽出用のバケットキーを返す"""
return (self.function, self.politeness, self.complexity)
# ============================================================================
# 分類器クラス
# ============================================================================
class JapaneseSentenceClassifier:
"""
日本語文を言語学的特徴に基づいて分類
正規表現は慎重に設計され、誤検知を最小限に抑えている。
"""
def __init__(self):
# 文の機能を判定する正規表現(優先順位順)
self.function_patterns = {
'question': [
re.compile(r'[??]'), # 疑問符
re.compile(r'か[。\s]*$'), # 「か」で終わる
re.compile(r'(だろう|でしょう)か'), # 「だろうか」「でしょうか」
],
'imperative': [
re.compile(r'(なさい|ください|てくれ|ておくれ|てちょうだい)[。!!]*$'),
re.compile(r'(しろ|せよ|するな)[。!!]*$'),
],
'conditional': [
re.compile(r'(なら|ならば|たら|れば|ば)[、。]'),
re.compile(r'(としたら|とすれば|であれば)[、。]'),
re.compile(r'(ても|でも|といえども)[、。]'),
],
'volition': [
re.compile(r'(たい|たく|たがる)[。、!!]'),
re.compile(r'(よう|まい)[。!!]*$'),
re.compile(r'(だろう|でしょう)[。!!]*$'),
],
}
# 丁寧度を判定する正規表現
self.politeness_patterns = {
'polite': [
# です・ますの各活用形(文末または読点の前)
re.compile(r'(です|ですか|でした|でしょう)[。!?!?、]*$'),
re.compile(r'(ます|ますか|ました|ません|ませんか|ましょう)[。!?!?、]*$'),
re.compile(r'(ございます|ございました)[。!?!?、]*$'),
],
'plain': [
# だ・であるの各活用形(だけど、などを除外)
re.compile(r'(?<![んだけそ])(だ|だった|だろう)[。!?!?]*$'),
re.compile(r'(である|であった|であろう)[。!?!?]*$'),
re.compile(r'(た|ない|なかった)[。!?!?]$'),
],
}
# 文の複雑度を判定する正規表現
self.complexity_patterns = {
'enumeration': [
re.compile(r'、.+、.+'), # 読点が2つ以上(列挙の可能性)
],
'complex': [
re.compile(r'、'), # 読点あり(単純な接続)
re.compile(r'(ので|のに|から|けれど|けれども|が、|し、|て、|で、)'),
],
}
def classify_function(self, text: str) -> str:
"""
文の機能を分類
Returns:
'question': 質問文
'imperative': 命令文・依頼文
'conditional': 条件文
'volition': 推量・意志表現
'statement': 平叙文(デフォルト)
"""
# 優先順位順にチェック
for func_type, patterns in self.function_patterns.items():
for pattern in patterns:
if pattern.search(text):
return func_type
return 'statement' # デフォルト: 平叙文
def classify_politeness(self, text: str) -> str:
"""
丁寧度を分類
Returns:
'polite': 丁寧体(です・ます調)
'plain': 普通体(だ・である調)
'other': その他(体言止めなど)
"""
for politeness_type, patterns in self.politeness_patterns.items():
for pattern in patterns:
if pattern.search(text):
return politeness_type
return 'other'
def classify_complexity(self, text: str) -> str:
"""
文の複雑度を分類
Returns:
'enumeration': 列挙文(読点2つ以上)
'complex': 複文(読点あり、または接続表現)
'simple': 単文
"""
for complexity_type, patterns in self.complexity_patterns.items():
for pattern in patterns:
if pattern.search(text):
return complexity_type
return 'simple'
def extract_features(self, text: str) -> SentenceFeatures:
"""
文から全ての言語的特徴を抽出
Args:
text: 分析する日本語文
Returns:
SentenceFeatures: 抽出された特徴
"""
return SentenceFeatures(
text=text,
function=self.classify_function(text),
politeness=self.classify_politeness(text),
complexity=self.classify_complexity(text),
has_quote=bool(re.search(r'[「」『』]', text)),
has_numeric=bool(re.search(r'[0-90-9]', text)),
has_katakana=bool(re.search(r'[ァ-ヴー]{3,}', text)), # 3文字以上のカタカナ
char_count=len(text),
)
# ============================================================================
# フィルタリング関数
# ============================================================================
def is_valid_text(text: str, min_len: int, max_len: int) -> Tuple[bool, str]:
"""
テキストが品質基準を満たすかチェック
Args:
text: チェックする文字列
min_len: 最小文字数
max_len: 最大文字数
Returns:
(is_valid, reason): 有効性とその理由
"""
# URL除外
if re.search(r'https?://', text):
return False, "contains_url"
# メンション・ハッシュタグ除外
if re.search(r'[@@##]', text):
return False, "contains_mention_or_hashtag"
# 絵文字・記号除外(より広範囲に)
if re.search(r'[\u2600-\u27BF\U0001F300-\U0001F9FF]', text):
return False, "contains_emoji"
# HTML/XMLタグ除外
if re.search(r'<[^>]+>', text):
return False, "contains_html_tag"
# 日本語文字が含まれない文を除外
if not re.search(r'[ぁ-んァ-ヴー一-龥]', text):
return False, "no_japanese_chars"
# 同じ文字の繰り返し(スパム対策)
if re.search(r'(.)\1{4,}', text): # 同じ文字が5回以上
return False, "repetitive_chars"
# 文字数チェック
char_count = len(text)
if char_count < min_len:
return False, f"too_short({char_count}<{min_len})"
if char_count > max_len:
return False, f"too_long({char_count}>{max_len})"
return True, "ok"
# ============================================================================
# バランス抽出器
# ============================================================================
class BalancedSampler:
"""
3軸分類に基づいてバランスの取れたサンプリングを行う
戦略:
1. 機能 × 丁寧度 × 複雑度の組み合わせでバケット作成
2. 各バケットから均等に抽出(quota方式)
3. 不足分はプールからランダム補充
"""
def __init__(self, target_n: int, seed: int = 42):
"""
Args:
target_n: 抽出する文の総数
seed: 乱数シード(再現性確保)
"""
self.target_n = target_n
self.seed = seed
random.seed(seed)
self.buckets: Dict[Tuple[str, str, str], List[SentenceFeatures]] = defaultdict(list)
def add_sentence(self, features: SentenceFeatures):
"""バケットに文を追加"""
key = features.get_bucket_key()
self.buckets[key].append(features)
def sample(self, verbose: bool = False) -> List[SentenceFeatures]:
"""
バランスの取れたサンプリングを実行
Returns:
抽出された文のリスト
"""
if not self.buckets:
return []
# 各バケットから抽出する数(quota)を計算
num_buckets = len(self.buckets)
quota = self.target_n // num_buckets
picked: List[SentenceFeatures] = []
pool: List[SentenceFeatures] = []
if verbose:
print(f"\n🎲 Sampling strategy:", file=sys.stderr)
print(f" Total buckets: {num_buckets}", file=sys.stderr)
print(f" Quota per bucket: {quota}", file=sys.stderr)
print(f" Target total: {self.target_n}", file=sys.stderr)
print(f"\n📊 Sampling from each bucket:", file=sys.stderr)
# 各バケットから均等に抽出
for key in sorted(self.buckets.keys()):
sentences = self.buckets[key]
random.shuffle(sentences)
# quota分を抽出
taken = sentences[:quota]
picked.extend(taken)
# 残りはプールへ
pool.extend(sentences[quota:])
if verbose:
func, pol, comp = key
print(f" {func:12s} × {pol:6s} × {comp:11s}: "
f"{len(sentences):5d} available → picked {len(taken):4d}",
file=sys.stderr)
# 不足分をプールからランダム補充
if len(picked) < self.target_n:
random.shuffle(pool)
need = self.target_n - len(picked)
picked.extend(pool[:need])
if verbose:
print(f"\n🔄 Filled {need} more from pool (total: {len(picked)})",
file=sys.stderr)
# 最終的にシャッフルして必要数だけ返す
random.shuffle(picked)
return picked[:self.target_n]
# ============================================================================
# 統計レポート生成
# ============================================================================
def generate_statistics_report(
picked: List[SentenceFeatures],
stats: Counter,
verbose: bool = False
) -> str:
"""
抽出結果の詳細な統計レポートを生成
Args:
picked: 抽出された文のリスト
stats: 処理中の統計情報
verbose: 詳細表示フラグ
Returns:
統計レポートの文字列
"""
lines = []
# 基本統計
lines.append("\n" + "=" * 70)
lines.append("📈 FINAL STATISTICS")
lines.append("=" * 70)
# 文字数統計
lengths = [f.char_count for f in picked]
lines.append(f"\n📏 Character count statistics:")
lines.append(f" Min: {min(lengths):3d} chars")
lines.append(f" Max: {max(lengths):3d} chars")
lines.append(f" Mean: {statistics.mean(lengths):5.1f} chars")
lines.append(f" Median: {statistics.median(lengths):5.1f} chars")
# 主要分類軸の分布
lines.append(f"\n📊 Distribution by main axes:")
# 機能別
func_counter = Counter(f.function for f in picked)
lines.append(f"\n 🎯 Function:")
for func in ['question', 'statement', 'imperative', 'conditional', 'volition']:
count = func_counter[func]
pct = 100 * count / len(picked)
lines.append(f" {func:12s}: {count:5d} ({pct:5.1f}%)")
# 丁寧度別
pol_counter = Counter(f.politeness for f in picked)
lines.append(f"\n 🙇 Politeness:")
for pol in ['polite', 'plain', 'other']:
count = pol_counter[pol]
pct = 100 * count / len(picked)
lines.append(f" {pol:6s}: {count:5d} ({pct:5.1f}%)")
# 複雑度別
comp_counter = Counter(f.complexity for f in picked)
lines.append(f"\n 🔗 Complexity:")
for comp in ['simple', 'complex', 'enumeration']:
count = comp_counter[comp]
pct = 100 * count / len(picked)
lines.append(f" {comp:11s}: {count:5d} ({pct:5.1f}%)")
# 補助的特徴
lines.append(f"\n 📌 Auxiliary features:")
quote_count = sum(1 for f in picked if f.has_quote)
numeric_count = sum(1 for f in picked if f.has_numeric)
katakana_count = sum(1 for f in picked if f.has_katakana)
lines.append(f" Has quote: {quote_count:5d} ({100*quote_count/len(picked):5.1f}%)")
lines.append(f" Has numeric: {numeric_count:5d} ({100*numeric_count/len(picked):5.1f}%)")
lines.append(f" Has katakana: {katakana_count:5d} ({100*katakana_count/len(picked):5.1f}%)")
# 詳細な組み合わせ統計(verbose時のみ)
if verbose:
lines.append(f"\n 🔍 Detailed combination statistics (Top 10):")
combo_counter = Counter(f.get_bucket_key() for f in picked)
for (func, pol, comp), count in combo_counter.most_common(10):
pct = 100 * count / len(picked)
lines.append(f" {func:12s} × {pol:6s} × {comp:11s}: "
f"{count:4d} ({pct:4.1f}%)")
lines.append("\n" + "=" * 70)
return "\n".join(lines)
# ============================================================================
# メイン処理
# ============================================================================
def parse_arguments():
"""コマンドライン引数をパース"""
parser = argparse.ArgumentParser(
description="Tatoebaから言語学的にバランスの取れた日本語文を抽出",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用例:
# 基本実行(デフォルト設定)
python extract_ja_perfect.py --verbose
# 短文データセット(チャットボット用)
python extract_ja_perfect.py --n 5000 --min_len 10 --max_len 30 --out ja_short.csv
# 長文データセット(要約タスク用)
python extract_ja_perfect.py --n 3000 --min_len 50 --max_len 100 --out ja_long.csv
"""
)
parser.add_argument(
"--src",
default="sentences.csv",
help="入力CSVファイル(Tatoebaのsentences.csv)"
)
parser.add_argument(
"--out",
default="ja_balanced_10000.csv",
help="出力CSVファイル"
)
parser.add_argument(
"--n",
type=int,
default=10000,
help="抽出する文の数(デフォルト: 10000)"
)
parser.add_argument(
"--min_len",
type=int,
default=20,
help="最短文字数(デフォルト: 20)"
)
parser.add_argument(
"--max_len",
type=int,
default=50,
help="最長文字数(デフォルト: 50)"
)
parser.add_argument(
"--seed",
type=int,
default=42,
help="乱数シード(再現性確保、デフォルト: 42)"
)
parser.add_argument(
"--verbose",
action="store_true",
help="詳細なログを出力"
)
return parser.parse_args()
def main():
"""メイン処理"""
args = parse_arguments()
# 入力ファイルの存在確認
input_path = Path(args.src)
if not input_path.exists():
print(f"❌ Error: 入力ファイルが見つかりません: {args.src}", file=sys.stderr)
print(f"\n💡 Hint: 以下のコマンドでファイルをダウンロードしてください:", file=sys.stderr)
print(f" wget https://downloads.tatoeba.org/exports/sentences.tar.bz2", file=sys.stderr)
print(f" tar -xjf sentences.tar.bz2", file=sys.stderr)
sys.exit(1)
# ヘッダー表示
print("=" * 70, file=sys.stderr)
print("🌸 Tatoeba Japanese Sentence Extractor [Perfect Edition]", file=sys.stderr)
print("=" * 70, file=sys.stderr)
print(f"📖 Input: {args.src}", file=sys.stderr)
print(f"📝 Output: {args.out}", file=sys.stderr)
print(f"🎯 Target: {args.n:,} sentences", file=sys.stderr)
print(f"📏 Length: {args.min_len}-{args.max_len} chars", file=sys.stderr)
print(f"🌱 Seed: {args.seed}", file=sys.stderr)
print("=" * 70, file=sys.stderr)
# 分類器とサンプラーの初期化
classifier = JapaneseSentenceClassifier()
sampler = BalancedSampler(target_n=args.n, seed=args.seed)
# 統計カウンター
stats = Counter()
seen_texts = set()
line_count = 0
# データ読み込みと処理
print(f"\n📚 Reading and classifying sentences...", file=sys.stderr)
try:
with open(args.src, 'r', encoding='utf-8', newline='') as f:
reader = csv.reader(f, delimiter='\t')
for row in reader:
line_count += 1
# プログレス表示
if args.verbose and line_count % 100000 == 0:
print(f" ... processed {line_count:,} lines", file=sys.stderr)
# 行の形式チェック
if len(row) < 3:
stats['malformed'] += 1
continue
sentence_id, lang, text = row[0], row[1], row[2].strip()
# 日本語のみ対象
if lang != 'jpn':
continue
stats['total_jpn'] += 1
# 重複除外
if text in seen_texts:
stats['duplicate'] += 1
continue
# 品質チェック
is_valid, reason = is_valid_text(text, args.min_len, args.max_len)
if not is_valid:
stats[f'filtered_{reason}'] += 1
continue
# 特徴抽出してサンプラーに追加
features = classifier.extract_features(text)
sampler.add_sentence(features)
seen_texts.add(text)
stats['accepted'] += 1
print(f"✅ Processed {line_count:,} lines", file=sys.stderr)
except Exception as e:
print(f"❌ Error reading file: {e}", file=sys.stderr)
sys.exit(1)
# 処理統計の表示
print(f"\n📊 Processing statistics:", file=sys.stderr)
print(f" Total lines: {line_count:,}", file=sys.stderr)
print(f" Japanese: {stats['total_jpn']:,}", file=sys.stderr)
print(f" Accepted: {stats['accepted']:,}", file=sys.stderr)
print(f" Filtered: {sum(v for k, v in stats.items() if k.startswith('filtered_')):,}", file=sys.stderr)
print(f" Duplicates: {stats['duplicate']:,}", file=sys.stderr)
if args.verbose:
print(f"\n Detailed filter reasons:", file=sys.stderr)
for key in sorted(k for k in stats.keys() if k.startswith('filtered_')):
print(f" {key:30s}: {stats[key]:,}", file=sys.stderr)
# 受け入れられた文が不足している場合
if stats['accepted'] < args.n:
print(f"\n⚠️ Warning: 条件に合う文が {stats['accepted']:,} 件しか見つかりませんでした", file=sys.stderr)
print(f" 目標の {args.n:,} 件に対して不足しています", file=sys.stderr)
print(f"\n💡 解決策:", file=sys.stderr)
print(f" 1. --n の値を減らす (例: --n {stats['accepted'] // 2})", file=sys.stderr)
print(f" 2. 文字数範囲を広げる (例: --min_len 15 --max_len 60)", file=sys.stderr)
# それでも続行可能なら続行
if stats['accepted'] == 0:
print(f"\n❌ Error: 抽出可能な文が0件です。処理を中止します。", file=sys.stderr)
sys.exit(1)
# バランス抽出の実行
picked = sampler.sample(verbose=args.verbose)
if not picked:
print(f"❌ Error: サンプリングに失敗しました", file=sys.stderr)
sys.exit(1)
# 出力ファイルへの書き込み
try:
output_path = Path(args.out)
with open(output_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerow(['ja']) # ヘッダー
for features in picked:
writer.writerow([features.text])
print(f"\n✅ Successfully wrote {len(picked):,} sentences to: {args.out}",
file=sys.stderr)
except Exception as e:
print(f"❌ Error writing output file: {e}", file=sys.stderr)
sys.exit(1)
# 統計レポートの生成と表示
report = generate_statistics_report(picked, stats, verbose=args.verbose)
print(report, file=sys.stderr)
print("\n🎉 Complete! Enjoy your perfectly balanced dataset!", file=sys.stderr)
print("=" * 70, file=sys.stderr)
if __name__ == "__main__":
main()