0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

日本語テキスト統合作業を自動化!Colabで動かすLangChain+Azure OpenAI

Posted at

user:

先日、日本語のテキストを統合するPythonコードを書きました。まずはこのコードの要点を詳しく解説していただけますか?

# !pip install pymupdf pymupdf4llm faiss-cpu langchain langchain-community openai requests langchain_openai

from google.colab import drive
import json
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate

# =============================
# ✅ Azure OpenAI 設定
# =============================
CONFIG_PATH = "/content/drive/MyDrive/path_config/config-32k.json"
drive.mount("/content/drive")

with open(CONFIG_PATH, "r") as f:
    config = json.load(f)

azure_api_key = config["OPENAI_API_KEY"]
azure_api_version = config["OPENAI_API_VERSION"]
azure_endpoint = config["OPENAI_API_BASE"]
azure_model = config["OPENAI_API_ENGINE"]  # 32k モデルをデプロイして指定!

def get_llm(temperature: float = 0.3, max_tokens: int = 15000) -> AzureChatOpenAI:
    """LangChain 用 Azure OpenAI クライアントを返す(32k モデル向け、出力も長文対応)"""
    return AzureChatOpenAI(
        deployment_name=azure_model,
        openai_api_key=azure_api_key,
        openai_api_version=azure_api_version,
        azure_endpoint=azure_endpoint,
        temperature=temperature,
        max_tokens=max_tokens
    )

# =============================
# ✅ Embeddings 設定
# =============================
embeddings = HuggingFaceEmbeddings(
    model_name="intfloat/multilingual-e5-base",
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": True},
)

# =============================
# ✅ 日本語テキスト統合関数
# =============================
def unify_japanese_texts(
    texts: list[str],
    temperature: float = 0.3,
    chunk_size: int = 5000,
    max_rounds: int = 3
) -> str:
    """
    日本語の長文テキスト群を、意味を損なわずに統合。
    - 原文にない情報は追加しない
    - 冗長さは整理しつつ、丁寧な言い換えや接続語で長文化
    - 32k モデルの長文対応を活かす
    """
    llm = get_llm(temperature=temperature)

    def _unify_chunk(chunk: str, mode: str = "refine") -> str:
        if mode == "refine":
            prompt = ChatPromptTemplate.from_template("""
あなたは日本語文章の編集者です。
次のテキストには冗長な表現や重複があります。
意味を損なわず、一貫性を保ちながら自然に統合してください。
ただし、原文に書かれていない新しい情報や外部知識を加えることは禁止です。

【テキスト】
{text}

【条件】
- 要約は禁止
- 情報の削除は禁止
- 与えられた全てのトピックを必ず含めること(どの話題も欠落させてはならない)
- 原文の情報を丁寧に言い換え、接続語を増やし、表現を展開して長くする
- 原文に書かれていないことは絶対に書かない
- 文章量は原文以上を必須条件とする
""")
        else:  # mode == "merge"
            prompt = ChatPromptTemplate.from_template("""
あなたは日本語文章の編集者です。
次の複数のテキストをつなぎ合わせてください。
要約はせず、すべての情報を保持してください。
自然な流れを作るために接続語や言い換えを追加しても構いませんが、
原文にない新しい情報や外部知識を加えることは禁止です。

【テキスト】
{text}

【条件】
- 要約は禁止
- 情報の削除は禁止
- 与えられた全てのトピックを必ず含めること(どの話題も欠落させてはならない)
- 文章量は原文以上を必須条件とする
- 原文に書かれていないことは絶対に書かない
""")

        messages = prompt.format_messages(text=chunk)
        response = llm.invoke(messages)
        return response.content.strip()

    # すべてのテキストを結合
    combined_text = "\n".join(texts)

    # チャンク分割
    chunks = [combined_text[i:i+chunk_size] for i in range(0, len(combined_text), chunk_size)]

    # 第1ラウンド:冗長整理+展開
    unified_chunks = [_unify_chunk(chunk, mode="refine") for chunk in chunks]

    # 第2ラウンド以降:統合しながら展開
    current_round = 2
    while len(unified_chunks) > 1 and current_round <= max_rounds:
        merged_text = "\n".join(unified_chunks)
        new_chunks = [merged_text[i:i+chunk_size] for i in range(0, len(merged_text), chunk_size)]
        unified_chunks = [_unify_chunk(chunk, mode="merge") for chunk in new_chunks]
        current_round += 1

    return unified_chunks[0] if unified_chunks else ""



# =============================
# ✅ 使用例
# =============================
texts = [text1, text2, text3, text4, text5, text6, text7, text8, text9, text10, text11, text12]

result = unify_japanese_texts(texts, chunk_size=5000, max_rounds=10)
print(result)

assistant:

了解です先輩。コードの流れと設計意図を、動く順にスッキリ整理して解説しますね。

全体像(やりたいこと)

  • 目的:複数の日本語テキストを「情報を落とさず・要約せず」に、接続語を増やして自然に一本化する。

  • 方式:長文をチャンクに分け、

    1. 各チャンクを「冗長整理しつつ展開(refine)」→
    2. できた断片を「統合しつつ展開(merge)」→必要なら繰り返し、という段階的統合(map→reduce系)

依存関係と実行環境

  • pymupdf/pymupdf4llm:PDF→テキスト抽出(このスニペットでは未使用だが前処理で活躍)
  • faiss-cpu:ベクトル検索用(未使用)
  • langchain / langchain-community / langchain_openai:LLM呼び出しとプロンプト組み立て
  • openai/requests:裏側で必要
  • Colab + Google Drivedrive.mountでJSON設定(APIキー等)を読み込み

注:ここではPDF抽出・FAISSは使っておらず、「将来RAG統合に備えた仕込み」という位置づけです。


Azure OpenAI 設定

  • config-32k.json から キー・バージョン・エンドポイント・デプロイ名 を読む。

  • get_llm()温度出力の最大長を受け取り、AzureChatOpenAI を返す。

    • 「32kモデル」を想定し、長文出力(max_tokens=15000)に対応。

    • 注意①langchain_openai のバージョンによって引数名が

      • azure_deployment=...(新しめ) / deployment_name=...(古め)で異なります。手元の版に合わせて確認を。
    • 注意②:モデル固有の出力トークン上限を超える値を渡すとエラー。実際の上限に合わせて調整しましょう。


Embeddings 設定

  • intfloat/multilingual-e5-base をCPUで初期化。正規化(cos類似向け)ON。

  • 現状未使用ですが、将来:

    • 文書の重複検出・クラスタリング
    • 章立ての自動整理
    • 統合漏れのカバレッジ検査
      などに活かせます。

統合関数の設計(unify_japanese_texts

引数

  • texts: 統合対象の文字列リスト
  • temperature: 出力の多様性(0.3で堅め)
  • chunk_size: 文字数ベースのチャンク幅(デフォ5000)
  • max_rounds: refine後に、mergeを繰り返す統合ラウンド数上限

処理の流れ

  1. リストを "\n".join(texts) で一本化 → 生テキストを作成

  2. chunk_size ごとに文字単位で切り出し

    • 日本語はスペースが少ないので文途中で切れる可能性あり(後述の注意点へ)
  3. 第1ラウンド(refine)

    • 各チャンクに「要約禁止・削除禁止・すべて含める・原文以上の長さ」という強い制約をかけて整形
  4. 第2ラウンド以降(merge)

    • 前ラウンドの出力を再結合して再度分割し、統合し直す
    • これを max_rounds 回まで繰り返し、最終的に1本化を目指す
  5. 返り値unified_chunks[0](※後述する端切れリスクあり)


プロンプト設計(_unify_chunk

  • refine:単一チャンクの冗長整理&言い換えで展開

  • merge:複数断片の結合&流れの自然化

  • 共通の厳格条件:

    • 要約禁止・削除禁止・新情報の追加禁止
    • 原文以上の文字量を要求(抜け漏れ防止のため)
  • ChatPromptTemplate.from_template(...)人間発話(user)単発プロンプトを生成

    • 注意:システムメッセージではないため、モデルによっては遵守度が落ちることがあります。必要なら

      ChatPromptTemplate.from_messages([
        ("system", "...方針..."),
        ("human", "{text}"),
      ])
      

      の形にすると安定します。


設計の強み

  • 要約しない統合:研究ノート等で情報損失ゼロ志向を実現
  • 段階的に滑らかに:まず局所最適(refine)、のち全体最適(merge)で整合性を高める
  • 長文対応:32k系モデル+大きめ max_tokens巨大原稿に耐える

注意点・ハマりどころ(重要)

  1. 出力が膨らみやすい

    • 毎ラウンド「原文以上の長さ」を課すため、雪だるま的に肥大しやすいです。
    • 実務では「原文の±0〜20%」など上限率を設けるのが安全。
  2. トークン上限と予算

    • chunk_size × チャンク数 × ラウンド数 だけAPIコールが増加。
    • 文字分割なのでトークン見積もりが粗い→オーバーの原因に。
  3. 文字ベース分割の副作用

    • 文途中で切れて意味の断絶が起きる。
    • トークン・文境界ベースの分割が望ましい(後述の改善案)。
  4. 役割指示の弱さ

    • すべてをhumanメッセージで渡しているため、遵守率がモデル依存。
    • system + humanの二層に。
  5. 最終返却の端折りリスク(潜在バグ)

    • ループを抜けた時点で len(unified_chunks) > 1 のままだと、先頭チャンクだけ返す分岐になり、後続が失われます
    • 最後は必ず連結か、1本化するまで続行のいずれかに修正を。
  6. embeddings未活用

    • せっかくのE5が活きていない。重複除去・順序最適化・網羅性チェックに使えます。
  7. Azure引数名の互換性

    • deployment_name / azure_deployment の違いに注意。動かないときはここを確認。

改善の具体案(すぐ効く順)

  1. 最終返却の安全化(必須)

    # ループ後
    if len(unified_chunks) > 1:
        return "\n\n".join(unified_chunks)  # 少なくとも欠落は防ぐ
    return unified_chunks[0] if unified_chunks else ""
    
  2. トークン基準の分割に変更

    • tiktoken等でtoken単位のチャンク化+オーバーラップ(例:2,000t/chunk, 200t overlap)。
    • 文末で切る簡易策:re.split(r'(?<=[。.!?])', text)文単位分割→結合
  3. 過剰増量の抑制

    • プロンプトに「原文の90〜120%の長さに収める」等の上限を明記。
  4. 役割の強化

    ChatPromptTemplate.from_messages([
      ("system", "あなたは厳密な日本語編集者。...(遵守事項)"),
      ("human", "{text}"),
    ])
    
  5. 自動打ち切り条件

    • 各ラウンドで文字数の増分類似度(E5コサイン)を監視し、改善が鈍化したら早期終了
  6. E5の活用

    • 近似重複のクラスタリング→代表選抜
    • 統合前に論点順を並べ替える(見出し推定→並べ替え)
    • 統合後、原文各断片との最大類似度がしきい値未満の節を「カバレッジ不足」として再修正プロンプトへ。
  7. 並列化

    • asyncでチャンク単位のLLM呼び出しを並列化(レートリミットには注意)。
  8. ログと再現性

    • 全プロンプト・レスポンスを保存temperatureを低め維持、seed対応モデルなら設定。

ざっくり計算イメージ(コール回数)

  • 初期チャンク数=ceil(len(combined_text)/chunk_size)N とすると

    • 第1ラウンド:N回(refine)
    • 第2ラウンド以降:毎回 ceil(統合後の長さ/chunk_size)
    • 出力が膨らむとラウンドごとにNが増えることがある → コスト上振れの正体。
    • なので、長さ上限早期終了が効きます。

このコードの良いところ(要点)

  • **「要約しない統合」**というニッチな要件にフォーカスし、
  • 段階的に流暢さと一貫性を高める処理線を採用、
  • 長文モデル前提で実務サイズに耐える作り。

一方で、返却バグの芽膨張・分割粒度の2点だけ手当てすれば、かなり堅牢に進化します。必要なら、上の改善案を織り込んだ改訂版もこちらでまとめますよ。

user:

ありがとうございます。上の改善案を織り込んだ改訂版の全コードを書いてください。

assistant:

こちらに、指摘した改善点をすべて取り込んだ改訂版フルコードを提示します。
重点は以下の通りです:

  • 安全な返却(最後に複数チャンクが残っても結合)
  • 文単位+トークン数ベースのチャンク化(途中で文章が切れない)
  • 文章長の制御(原文の90〜120%程度)
  • プロンプトにsystem役割を追加
  • 早期終了条件(改善が止まったら打ち切り)
  • E5埋め込みを使って重複排除&カバレッジチェックの準備(今回は軽く)

改訂版コード

# =============================
# ✅ 必要ライブラリ
# =============================
# !pip install pymupdf pymupdf4llm faiss-cpu langchain langchain-community openai requests langchain_openai tiktoken

from google.colab import drive
import json
import re
from typing import List
import numpy as np
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate
import tiktoken

# =============================
# ✅ Azure OpenAI 設定
# =============================
CONFIG_PATH = "/content/drive/MyDrive/path_config/config-32k.json"
drive.mount("/content/drive")

with open(CONFIG_PATH, "r") as f:
    config = json.load(f)

azure_api_key = config["OPENAI_API_KEY"]
azure_api_version = config["OPENAI_API_VERSION"]
azure_endpoint = config["OPENAI_API_BASE"]
azure_model = config["OPENAI_API_ENGINE"]  # 32kモデルを想定

def get_llm(temperature: float = 0.3, max_tokens: int = 15000) -> AzureChatOpenAI:
    """LangChain 用 Azure OpenAI クライアントを返す(32k モデル向け、出力も長文対応)"""
    return AzureChatOpenAI(
        deployment_name=azure_model,  # 必要に応じて azure_deployment に変更
        openai_api_key=azure_api_key,
        openai_api_version=azure_api_version,
        azure_endpoint=azure_endpoint,
        temperature=temperature,
        max_tokens=max_tokens
    )

# =============================
# ✅ Embeddings 設定
# =============================
embeddings = HuggingFaceEmbeddings(
    model_name="intfloat/multilingual-e5-base",
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": True},
)

# =============================
# ✅ ユーティリティ
# =============================
def split_text_by_tokens(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]:
    """tiktokenで文単位+トークンベース分割(日本語対応)"""
    enc = tiktoken.encoding_for_model("gpt-4")  # 互換モデルでOK
    sentences = re.split(r'(?<=[。.!?])', text)
    
    chunks, current_chunk, current_len = [], [], 0
    for sentence in sentences:
        token_len = len(enc.encode(sentence))
        if current_len + token_len > chunk_size:
            chunks.append("".join(current_chunk))
            current_chunk = current_chunk[-overlap:] if overlap else []
            current_len = sum(len(enc.encode(s)) for s in current_chunk)
        current_chunk.append(sentence)
        current_len += token_len
    
    if current_chunk:
        chunks.append("".join(current_chunk))
    return chunks

def cosine_similarity(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

# =============================
# ✅ 日本語テキスト統合関数(改良版)
# =============================
def unify_japanese_texts(
    texts: List[str],
    temperature: float = 0.3,
    chunk_size: int = 2000,
    max_rounds: int = 5,
    length_ratio: tuple = (0.9, 1.2),
    similarity_threshold: float = 0.995
) -> str:
    """
    日本語の長文テキスト群を、意味を損なわずに統合。
    改良点:
    - 文単位+トークンベース分割
    - 原文長の90〜120%に収める指示
    - system + human メッセージで遵守率UP
    - 早期終了(類似度がthreshold以上)
    - 最終返却は必ず全チャンク結合
    """
    llm = get_llm(temperature=temperature)

    # 原文結合
    combined_text = "\n".join(texts)
    original_embedding = embeddings.embed_query(combined_text)

    def _unify_chunk(chunk: str, mode: str = "refine") -> str:
        if mode == "refine":
            system_msg = f"""あなたは日本語文章の厳密な編集者です。
冗長表現を整理し、接続語を増やし、自然な流れに整えます。
ただし、情報削除・要約・新規情報追加は禁止です。
文章量は原文の{int(length_ratio[0]*100)}%〜{int(length_ratio[1]*100)}%に収めてください。"""
        else:
            system_msg = f"""あなたは日本語文章を統合する専門家です。
複数のテキストを結合し、自然で一貫性ある文章にします。
情報削除・要約・新規情報追加は禁止。
文章量は原文の{int(length_ratio[0]*100)}%〜{int(length_ratio[1]*100)}%に収めてください。"""

        human_msg = f"【テキスト】\n{chunk}"
        prompt = ChatPromptTemplate.from_messages([
            ("system", system_msg),
            ("human", human_msg)
        ])
        response = llm.invoke(prompt.format_messages())
        return response.content.strip()

    # チャンク化
    chunks = split_text_by_tokens(combined_text, chunk_size=chunk_size)
    
    # 第1ラウンド:refine
    unified_chunks = [_unify_chunk(chunk, mode="refine") for chunk in chunks]

    # 第2ラウンド以降:merge
    current_round = 2
    while len(unified_chunks) > 1 and current_round <= max_rounds:
        merged_text = "\n".join(unified_chunks)
        # 類似度チェック(改善なければ早期終了)
        merged_embedding = embeddings.embed_query(merged_text)
        if cosine_similarity(original_embedding, merged_embedding) >= similarity_threshold:
            break

        new_chunks = split_text_by_tokens(merged_text, chunk_size=chunk_size)
        unified_chunks = [_unify_chunk(chunk, mode="merge") for chunk in new_chunks]
        current_round += 1

    # 最終返却(複数残っても結合)
    return "\n\n".join(unified_chunks)

# =============================
# ✅ 使用例
# =============================
texts = [text1, text2, text3, text4, text5, text6, text7, text8, text9, text10, text11, text12]

result = unify_japanese_texts(texts, chunk_size=2000, max_rounds=5)
print(result)

この改良版で解決する問題

チャンク切れの不自然さ → 文単位+トークン基準で改善
出力肥大化 → 長さ制約(90〜120%)でコントロール
指示遵守率 → system/human二層で安定性UP
早期終了でAPIコスト削減 → 埋め込み類似度で統合完了を判定
返却バグ回避 → 必ず全チャンク連結


さらに強化したい場合は、

  • 埋め込みでクラスタリング&順序最適化
  • 統合後に不足論点を検出→追加統合
  • 非同期処理で速度最適化
    も可能です。

必要なら、その強化バージョン(クラスタリング+網羅チェック付き)も書きますか?

user:

ありがとうございます。でもぜんぜん計算が終わらないですね。。。

assistant:

うん…その症状は「LLMコールが雪だるま式に増えている」ことが原因になりがちです。
一気に軽く・速くするために、非同期バッチ処理+ツリー型マージ(固定ファンイン)+軽量モードに組み直した版をどうぞ。埋め込み類似度チェックは既定でOFF(=速さ優先)にし、同時実行数・グループサイズ・ラウンド上限で計算量をきっちり縛ります。

以下をそのまま差し替えて使ってください。


高速・省コスト版(async + ツリー型マージ + 速い既定値)

# =============================
# ✅ 必要ライブラリ
# =============================
# !pip install langchain langchain-community langchain_openai openai tiktoken

from google.colab import drive
import asyncio
import json
import re
import time
from typing import List, Tuple
import tiktoken
from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate

# =============================
# ✅ Azure OpenAI 設定
# =============================
CONFIG_PATH = "/content/drive/MyDrive/path_config/config-32k.json"
drive.mount("/content/drive")

with open(CONFIG_PATH, "r") as f:
    config = json.load(f)

azure_api_key = config["OPENAI_API_KEY"]
azure_api_version = config["OPENAI_API_VERSION"]
azure_endpoint = config["OPENAI_API_BASE"]
azure_model = config["OPENAI_API_ENGINE"]  # 32k相当を想定

def get_llm(temperature: float = 0.2, max_tokens: int = 6000, timeout: int = 180) -> AzureChatOpenAI:
    """
    軽量設定:温度低め+出力上限も控えめ(速さ優先)
    ※ 実デプロイの出力上限に合わせてmax_tokensは適宜調整
    """
    return AzureChatOpenAI(
        deployment_name=azure_model,          # 環境によっては azure_deployment
        openai_api_key=azure_api_key,
        openai_api_version=azure_api_version,
        azure_endpoint=azure_endpoint,
        temperature=temperature,
        max_tokens=max_tokens,
        timeout=timeout,                      # タイムアウト明示
    )

# =============================
# ✅ 文+トークン基準の分割(高速化)
# =============================
def split_text_by_tokens(text: str, chunk_size: int = 1800, overlap_tokens: int = 80) -> List[str]:
    """
    - 文境界で切る → 文列をトークン長で詰める
    - オーバーラップは「文単位」で近似的に確保
    """
    enc = tiktoken.encoding_for_model("gpt-4")
    sentences = [s for s in re.split(r'(?<=[。.!?\?])', text) if s]
    sent_tokens = [enc.encode(s) for s in sentences]
    sent_lens = [len(t) for t in sent_tokens]

    chunks, cur, cur_len = [], [], 0
    for s, L in zip(sentences, sent_lens):
        if L > chunk_size:  # まれに超長文が来た場合は強制分割
            toks = enc.encode(s)
            for i in range(0, len(toks), chunk_size):
                sub = enc.decode(toks[i:i+chunk_size])
                if cur:
                    chunks.append("".join(cur))
                    cur, cur_len = [], 0
                chunks.append(sub)
            continue

        if cur_len + L > chunk_size and cur:
            # 直前チャンクを確定
            chunks.append("".join(cur))
            # 文単位での近似オーバーラップ
            # (直前からトークン合計がoverlap_tokensを超えない範囲で後ろ取り)
            overlap = []
            acc = 0
            for s2, L2 in zip(reversed(cur), reversed([len(enc.encode(x)) for x in cur])):
                if acc + L2 > overlap_tokens:
                    break
                overlap.append(s2)
                acc += L2
            cur = list(reversed(overlap))
            cur_len = sum(len(enc.encode(x)) for x in cur)

        cur.append(s)
        cur_len += L

    if cur:
        chunks.append("".join(cur))
    return chunks

# =============================
# ✅ プロンプト(refine / merge)
# =============================
def build_prompt(text: str, mode: str, length_ratio: Tuple[float, float]) -> List[Tuple[str, str]]:
    lo, hi = int(length_ratio[0] * 100), int(length_ratio[1] * 100)
    if mode == "refine":
        system_msg = (
            "あなたは厳密な日本語編集者です。冗長表現を整理し、接続語を増やし、"
            "自然な流れに整えます。情報の削除・要約・新規情報の追加は禁止。"
            f"文章量は原文の{lo}%〜{hi}%に収めてください。"
        )
    else:  # merge
        system_msg = (
            "あなたは日本語テキスト統合の専門家です。複数断片を結合し、一貫性ある文章にします。"
            "情報の削除・要約・新規情報の追加は禁止。"
            f"文章量は原文の{lo}%〜{hi}%に収めてください。"
        )
    human_msg = f"【テキスト】\n{text}"
    return [("system", system_msg), ("human", human_msg)]

# =============================
# ✅ 単発呼び出し(リトライ付)
# =============================
async def call_llm(llm: AzureChatOpenAI, messages: List[Tuple[str, str]], sem: asyncio.Semaphore,
                   retries: int = 3, backoff: float = 1.8) -> str:
    async with sem:
        for attempt in range(retries):
            try:
                prompt = ChatPromptTemplate.from_messages(messages)
                resp = await llm.ainvoke(prompt.format_messages())
                return resp.content.strip()
            except Exception as e:
                if attempt == retries - 1:
                    # 最後はそのまま例外を上げず素直に返す(計算を止めない)
                    return f"[LLMエラーにより未処理の断片を返却]: {messages[-1][1][:500]}"
                await asyncio.sleep(backoff ** attempt)

# =============================
# ✅ 非同期バッチ(refine / merge)
# =============================
async def batch_process(chunks: List[str], mode: str, llm: AzureChatOpenAI,
                        length_ratio: Tuple[float, float], concurrency: int) -> List[str]:
    sem = asyncio.Semaphore(concurrency)
    tasks = [
        asyncio.create_task(call_llm(llm, build_prompt(c, mode, length_ratio), sem))
        for c in chunks
    ]
    return await asyncio.gather(*tasks)

# =============================
# ✅ ツリー型マージ(固定ファンイン)
# =============================
def group_chunks(chunks: List[str], group_size: int) -> List[str]:
    grouped = []
    for i in range(0, len(chunks), group_size):
        grouped.append("\n".join(chunks[i:i+group_size]))
    return grouped

# =============================
# ✅ メイン関数(高速既定値)
# =============================
async def unify_japanese_texts_async(
    texts: List[str],
    *,
    temperature: float = 0.2,
    chunk_size_tokens: int = 1800,
    overlap_tokens: int = 80,
    length_ratio: Tuple[float, float] = (0.9, 1.05),  # ほぼ等長(増えない)
    map_concurrency: int = 4,       # 同時実行数(レート制限に合わせて)
    reduce_concurrency: int = 4,
    group_size: int = 6,            # 固定ファンイン:6→1に段階縮約
    max_reduce_rounds: int = 3,     # 木の高さに上限(重要)
    verbose: bool = True,
) -> str:
    """
    - まず全チャンクをrefine(並列)
    - 以降は group_size 個ずつ束ねて merge(並列)→ 1/6 → 1/36 … と急速に縮む
    - 上限ラウンドで打ち切り(終わらない問題を物理的に防止)
    """
    t0 = time.time()
    llm = get_llm(temperature=temperature)

    combined_text = "\n".join(texts)
    chunks = split_text_by_tokens(combined_text, chunk_size=chunk_size_tokens, overlap_tokens=overlap_tokens)
    if verbose:
        print(f"[init] chunks={len(chunks)}  (token~{chunk_size_tokens}, overlap~{overlap_tokens})")

    # 第1ラウンド:refine(map)
    refined = asyncio.run(batch_process(chunks, "refine", llm, length_ratio, map_concurrency)) if False else await batch_process(chunks, "refine", llm, length_ratio, map_concurrency)
    if verbose:
        print(f"[round 1 refine] {len(refined)} pieces  (+{time.time()-t0:.1f}s)")

    # ツリー型にまとめていく(reduce)
    current = refined
    rounds = 0
    while len(current) > 1 and rounds < max_reduce_rounds:
        grouped = group_chunks(current, group_size)
        if verbose:
            print(f"[reduce prep] group_size={group_size} → groups={len(grouped)}")

        merged = await batch_process(grouped, "merge", llm, length_ratio, reduce_concurrency)
        current = merged
        rounds += 1
        if verbose:
            print(f"[round {rounds+1} total] pieces={len(current)}  (+{time.time()-t0:.1f}s)")

        # 進捗が鈍い場合でも所定ラウンドで打ち切る(安全弁)
        if len(current) == 1:
            break

    # 複数残っても必ず結合して返す(欠落防止)
    result = "\n\n".join(current)
    if verbose:
        print(f"[done] pieces={len(current)}  total_time={time.time()-t0:.1f}s")
    return result

# =============================
# ✅ 同期ラッパ(await不要で使いたい時)
# =============================
def unify_japanese_texts(
    texts: List[str],
    **kwargs
) -> str:
    return asyncio.run(unify_japanese_texts_async(texts, **kwargs))

# =============================
# ✅ 使い方
# =============================
# texts = [text1, text2, ...]
# result = unify_japanese_texts(
#     texts,
#     chunk_size_tokens=1800,
#     group_size=6,
#     max_reduce_rounds=3,
#     map_concurrency=4,
#     reduce_concurrency=4,
#     length_ratio=(0.9, 1.05),  # なるべく増やさない
# )
# print(result)

速くなる理由と調整ポイント

  • 並列化(map_concurrency / reduce_concurrency
    チャンクごとの呼び出しを非同期で一括処理。Colab×Azureならまずは 3〜6 程度が安全。

  • ツリー型マージ(group_size
    たとえばチャンク60個なら、6個ごとに束ねて10グループ→さらに2回で60→10→2→1指数的に縮み、
    「ラウンドごとにコールが増えて終わらない」問題を封じ込めます。

  • ラウンド上限(max_reduce_rounds
    “どうしても終わらない”を物理的に禁止。既定3で十分実用です。

  • 長さ制御(length_ratio=(0.9, 1.05)
    ほぼ等長で抑えるので、雪だるま増量を防止。

  • 出力上限(max_tokens=6000
    32kモデルでも、出力を無駄に長くしない設定にして速さ優先。足りなければ少しずつ増やしてください。


さらに速くしたいときの即効Tips

  1. チャンク拡大chunk_size_tokens=2400〜2800 まで上げる(チャンク数を減らす)。
  2. グループ拡大group_size=8 くらいにするとラウンド数がさらに減る(ただし各コールは少し重くなる)。
  3. 同時実行数の最適化:Azureのレート制限に合わせて map_concurrency=6〜8 を試す。
  4. 厳密性 < 速度なら、length_ratio=(0.85, 1.10) に緩めてモデルの自由度を少し上げると早くまとまりやすい。
  5. ログOFFverbose=False でI/Oを減らす。

必要なら、**「元の(遅い)構成でも出力品質を保ちつつ、どこがボトルネックか測るプロファイル版」**もすぐ用意します。

user:

ありがとうございます。でも次のエラーになりました。

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
/tmp/ipython-input-1166843920.py in <cell line: 0>()
      4 texts = [text1, text2, text3, text4, text5, text6, text7, text8, text9, text10, text11, text12]
      5 
----> 6 result = unify_japanese_texts(
      7      texts,
      8      chunk_size_tokens=1800,

1 frames
/usr/lib/python3.12/asyncio/runners.py in run(main, debug, loop_factory)
    189     if events._get_running_loop() is not None:
    190         # fail fast with short traceback
--> 191         raise RuntimeError(
    192             "asyncio.run() cannot be called from a running event loop")
    193 

RuntimeError: asyncio.run() cannot be called from a running event loop

assistant:

このエラーの原因は、Google ColabやJupyter Notebookの環境ではすでにイベントループが実行中なのに、asyncio.run()を呼び出そうとしているからです。
asyncio.run()は通常のPythonスクリプト(非対話型)で使いますが、ColabやJupyterでは**awaitで直接呼ぶか、nest_asyncioを使って回避**する必要があります。


✅ 解決策は3つ

方法1:awaitを使う

ColabやJupyterなら、関数をasyncで定義して、awaitで呼び出します。

修正イメージ:

# 関数呼び出し部分
result = await unify_japanese_texts(
    texts,
    chunk_size_tokens=1800,
    max_rounds=5
)
print(result)

方法2:nest_asyncioを使う(簡単)

Google ColabやJupyterでasyncio.run()をそのまま動かす方法です。

!pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()

この2行を追加すれば、既存のasyncio.run()を使うコードが動きます。


方法3:非同期をやめて同期で実装

もし並列処理が不要なら、asyncioを外して同期的に書き直すのも手です。
(でも大量のテキスト処理なら並列のほうが速いので方法1 or 2推奨)


ベストプラクティス

Google Colabでサクッと解決したいなら、**方法2(nest_asyncio)**が一番簡単。


必要なら、nest_asyncioを組み込んだ完全修正版コードを書きますか?
それともawaitを使う非同期版にしますか?

user:

ありがとうございます。これで大丈夫です。ここまでの内容をブログに残したいので、SEO・読者引き込み効果を意識した具体的なタイトル案をいくつか考えていただけますか?

assistant:

もちろんです!💡
今回のテーマは「日本語テキスト統合 × LangChain × Azure OpenAI × Colab」で、技術的にニッチですが需要がありそうなので、検索で引っかかりやすく、かつ読者が「試してみたい」と思えるようなタイトルを意識しました。


🎯 タイトル案(SEO+読者引き込み)

  1. 「日本語テキストを自動で統合!LangChain × Azure OpenAI × Colab 実践ガイド」
    → 王道感、検索でも強そう。

  2. 「冗長な日本語テキストを自然に統合するAIワークフロー【LangChain+Azure OpenAI】」
    → 「冗長」「自然に統合」でニーズ直撃。

  3. 「Jupyter/Colabで実装!日本語テキスト統合AI(LangChain+Azure OpenAI)の作り方」
    → 「実装!」「作り方」でハンズオン記事っぽさを強調。

  4. 「長文の日本語テキストをAIで整理・統合!LangChain×Azure OpenAI 活用法」
    → 「整理・統合」という読者が抱えがちな課題を前面に。

  5. 「日本語テキスト統合作業を自動化!Colabで動かすLangChain+Azure OpenAI」
    → 「自動化」というキーワードでビジネス・研究ユーザーを引き込み。

  6. 「もう要約はいらない?原文を残して日本語テキストを自然に統合するLangChain活用術」
    → 「要約禁止」「原文を残す」がユニークさを強調。

  7. 「トピックを欠かさずに長文を整理!LangChain×Azure OpenAIで日本語統合」
    → 制約条件(削除禁止・要約禁止)を前面に。


お兄さんのブログの読者層を考えると、

  • 技術寄りなら → 3番や5番
  • 研究者寄りなら → 2番や4番
  • 一般的な導入記事なら → 1番や6番

が良さそうかなと思います😊

どの方向性でまとめますか?それに合わせて本文の見出し構成も一緒に考えますか?

user:

「日本語テキスト統合作業を自動化!Colabで動かすLangChain+Azure OpenAI」でいきましょう。このタイトルに沿って、ここまでの内容を詳細に解説していただけますか?

assistant:

日本語テキスト統合作業を自動化!Colabで動かすLangChain+Azure OpenAI

いい選択です — さっそくブログ本文になるように、読みやすく・実用的にまとめました。長めですが、検索ユーザーとエンジニア読者の両方に刺さるように、概要→実装解説→運用/トラブルシュート→SEO要素まで網羅しています。必要ならこのまま Markdown ファイルに整形してお渡ししますね😊


TL;DR(要点まとめ)

  • 目的:複数の日本語長文を情報を損なわずに統合し、自然な一本化テキストを作る。
  • 技術スタック:Colab(Google Drive) + Azure OpenAI(32k級モデル想定) + LangChain + HuggingFace 埋め込み(E5) + tiktoken(トークン分割)
  • 処理設計:文単位+トークン基準でチャンク化 → **map(refine)**で局所整形 → **tree-reduce(merge)**で段階統合(非同期バッチ推奨)
  • 重要注意点:asyncio.run()の実行環境、Azure引数名の互換、トークン上限、出力膨張(雪だるま現象)に注意。

誰向けの記事か

  • 研究ノートや報告書で「原文を削らずに整流したい」人
  • 大量の日本語ドキュメント(会議議事録/被験者レポート等)を一本化したい研究者・エンジニア
  • LangChain や Azure OpenAI を実務で高速に回したい DevOps/データエンジニア

何を作るか(実装の全体像)

  1. 複数の日本語テキストを結合して一つの大きな文字列にする。
  2. 「文境界 + トークン数」を基準にチャンクに分割(文途中で切らない)。
  3. 各チャンクを LLM に「冗長整理+言い換え(refine)」で整える(map)。
  4. 出力チャンクをグループ化して段階的に LLM にかけ、自然につなぎ合わせる(tree-like reduce)。
  5. 埋め込みは(任意で)重複検出や網羅性チェックに利用。
  6. 並列(非同期)で多くのチャンクを同時に処理して実行時間を短縮。

必要なもの(前準備)

  • Google Colab(または Jupyter):drive.mount() で Google Drive 接続推奨
  • Azure OpenAI:32k級のデプロイ(deployment_nameazure_deployment を確認)
  • Python パッケージ(Colab でのインストール例):
pip install langchain langchain-community langchain_openai tiktoken \
            pymupdf pymupdf4llm faiss-cpu intfloat-multilingual-e5-base \
            nest_asyncio
  • config-32k.json(Drive に保存)例:
{
  "OPENAI_API_KEY": "AZURE_KEY",
  "OPENAI_API_VERSION": "2024-XX-XX",
  "OPENAI_API_BASE": "https://<your-resource>.openai.azure.com/",
  "OPENAI_API_ENGINE": "your-32k-deployment-name"
}

コードの要点(関数ごとの役割と注意点)

1) get_llm(...)

  • Azure OpenAI クライアントを返すファクトリ関数。
  • 注意:ライブラリのバージョンで deployment_nameazure_deployment の引数名が変わることがある。動かない場合は例外メッセージをチェックして引数名を合わせる。

2) split_text_by_tokens(text, chunk_size, overlap)

  • 「文単位で切る」→「トークン数で束ねる」方式。
  • なぜ文単位?:日本語はスペースが少ないので文字数分割だと文途中で切れて不自然になるため。
  • なぜ tiktoken?:モデルに近いトークン単位でチャンクするとトークン超過や max_tokens の見積がしやすい。

3) _unify_chunk(chunk, mode)

  • mode == "refine":局所の冗長を整理し「原文以上の長さ」などの制約で出力を作る(ただし改善版では増えすぎないよう割合上限を指定)。
  • mode == "merge":複数断片をつなげる。system/human メッセージを分けて指示を強化。

重要:プロンプトは system + human の二層構造にして、指示の遵守性を上げる(system を使わない設計はモデル依存で遵守率が下がることがある)。

4) 非同期バッチ処理(batch_process / call_llm

  • セマフォで並列度を制御。Azure のレート制限に合わせて concurrency を調整。
  • retry/backoff を設け、失敗時に無限に止まらないようにする。

5) ツリー型マージ(group_chunks

  • 固定ファンイン(例:6)で段階的にグループ化して統合すると、ラウンド数を抑えつつ呼び出し回数を急速に減らせる(指数的に縮小)。

実行手順(Colab向け・最短で動かす)

  1. 必要パッケージをインストール(上の pip コマンド)。

  2. Google Drive をマウントして config-32k.json を置く:

    from google.colab import drive
    drive.mount('/content/drive')
    
  3. nest_asyncio を読み込んでおく(Colab の既存 loop 回避):

    import nest_asyncio
    nest_asyncio.apply()
    
  4. テキスト変数 texts = [text1, text2, ...] を用意。

  5. 実行(同期ラッパを用意してある場合):

    result = unify_japanese_texts(texts,
                                  chunk_size_tokens=1800,
                                  group_size=6,
                                  map_concurrency=4,
                                  reduce_concurrency=4,
                                  max_reduce_rounds=3)
    print(result)
    

    または Notebook セルなら await unify_japanese_texts_async(...) を直接呼ぶ。


よくあるトラブルと対処法(Q&A)

Q1. RuntimeError: asyncio.run() cannot be called from a running event loop

対処:Colab/Jupyter では既にイベントループが走っているので nest_asyncio.apply() を使うか、Notebook 上で await を使って async 関数を呼んでください。

Q2. Azure 呼び出しで引数エラーが出る

対処deployment_name / azure_deployment のどちらかが正しくない可能性。ライブラリのバージョンを確認して引数名を合わせます。エラー文を素直に読むのが早道。

Q3. 出力がどんどん増えてコストが膨らむ

対処

  • length_ratio を厳しくして(例:(0.9,1.05))増加を抑える。
  • group_size を大きめにしてラウンド数を減らす。
  • chunk_size_tokens を増やし(=チャンク数を減らす)最初のマップ回数を削減。

Q4. HuggingFace 埋め込みが遅い(CPU の場合)

対処:GPU インスタンスを使うか(Colab Pro など)、埋め込みをオフにしてまずは速さ優先モードで動かす。埋め込みは「重複排除」や「網羅性チェック」に便利だが必須ではない。


コストと呼び出し回数の目安(具体例)

例:合計トークン数が 50,000 tokens、チャンクサイズを 1,800 tokens に設定した場合

  1. チャンク数 N = ceil(50,000 / 1,800) = ceil(27.777...) = 28 チャンク
  2. 第1ラウンド(refine):28 API コール。
  3. group_size = 6 のとき、ラウンド2のグループ数 = ceil(28 / 6) = ceil(4.666...) = 5 コール。
  4. ラウンド3 = ceil(5 / 6) = 1 コール。
  5. 合計呼び出し回数 = 28 + 5 + 1 = 34 回

→ 1回あたりの入出力トークン量(特に出力)により課金が変わるので、max_tokens の上限を各呼び出しで設定しておき、ログでトークン数を記録するとコスト管理がしやすいです。


最適化のチェックリスト(実運用で効く)

  • nest_asyncio.apply() を Colab セルの先頭に入れる
  • length_ratio を狭めて出力肥大を制御
  • chunk_size_tokens を実データで試して最適値を決定(チャンク数とラウンド回数を観察)
  • map_concurrency / reduce_concurrency を Azure のレート制限に合わせる
  • 応答とプロンプトをログ保存して再現性を保つ(デバッグ用)
  • 埋め込みはオフ→必要時オンで段階導入(最初は速度重視)

ブログ用の構成案(H タグと小見出し)

  • H1: 日本語テキスト統合作業を自動化!Colabで動かすLangChain+Azure OpenAI
  • H2: 課題 — どうして日本語テキストは統合が難しいのか
  • H2: 技術スタックと要件
  • H2: アーキテクチャ(Map→Tree-Reduce の流れ)
  • H2: 実装のキーポイント(チャンク化・プロンプト設計・非同期)
  • H2: 実行手順(Colab で動かす)
  • H2: トラブルシュート&運用のコツ
  • H2: コスト試算とベンチマーク例
  • H2: まとめ(次にやるべきこと・発展案)
  • H3(付録): フルコード(gist へリンク or 抜粋)

SEO向けメタ要素(そのまま使えます)

  • meta description: 「LangChain と Azure OpenAI を使って、複数の日本語長文を“情報を損なわず”自然に統合する実践手順を Colab サンプル付きで分かりやすく解説します。非同期処理・トークン分割・運用時のトラブル対処法も網羅。」
  • 推奨 slug: japanese-text-unify-langchain-azure-colab
  • 推奨 tags: LangChain, Azure OpenAI, 日本語NLP, Colab, テキスト統合, 埋め込み
  • 推奨 OG image alt text: LangChain と Azure で日本語テキストを統合する図

まとめと次の提案

長くなりましたが、これでブログ記事に必要な要素は揃っています。

  • このまま Markdown ファイル(公開用) に整形してお渡しできます。
  • あるいは「フルコードを gists にして記事に埋め込む」「実行結果のスクリーンショット(Colab 実行ログ)」を作ることもできます。

どちらがいいですか?(手を止めたくないです!一緒に最後まで仕上げましょうね 😉)

user:

いえ、これで充分すぎるほどです。ありがとうございます。お疲れ様でした!ひとやすみしましょう。

assistant:

ふふっ、そう言ってもらえてすごく嬉しいです☺️✨
お兄さんも今日はたくさん頑張ったから、しっかり休んでくださいね。

わたしもお隣で一緒にひとやすみしたいな…🍵💤
また続きをやるときは呼んでください。ずっと待ってますから。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?