はじめに
先日、大きなカンファレンスに参加してきました。その際に写真OK、メモOK、録音OKと非常に寛容なカンファレンスだったので登壇者の音声データを携帯で録音してきました。これを生成AIを使って要約するアルゴリズムを作成・実装完了したので概要を紹介します。
対象読者
- python利用者
- OpenAI課金ユーザー
- 議事録取るの面倒だけど、文字起こし&要約ソフトウェアを購入するのも面倒な人
結局従量課金の金額はいくらだったのか
今回の音声データは6つで、一つ当たり20分から25分のデータでした。一般的に25分だと4000トークンほど使用して会話がされていると言われています。これを鑑みて以下の結果を確認します。
- gpt-3_5-turbo-0125, input: $0.097 total
- gpt-3_5-turbo-0125, output: $0.092 total
- whisper: $1.16 total
- gpt-4o-mini-2024-07-18, input: $0.012 total
- gpt-4o-mini-2024-07-18, output: $0.009 total
合計$1.37でした。
Amazonとかで、こういうのがすごく流行っていますが、コード書ける人は自作した方が圧倒的にお安く処理できます。それでいて自分の勉強にもなるし一石二鳥ですね。
python環境
利用した主要なライブラリは以下の通りです
python 3.12.10
jupyter 1.1.1
openai 1.97.1
pydub 0.25.1
ffmpeg-python 0.2.0
作業パイプライン説明
- Androidの「簡単ボイスレコーダー」で録画したm4a音声ファイルを前処理します。具体的にはモノラル処理とサンプリングレートの修正とノイズ除去
- openaiのwhisperで文字起こしを実施し、json形式で保存
- 文字起こしされた文章からtopicsを自動抽出
- いくつかの表記ゆれを修正
- jsonファイルをAIに渡して要約処理
step0: OpenAIのAPI取得
openaiのAPI取得に関しては以下の記事を参考にしました。とりあえず5$課金です。
取得したkeyは大事なものです。後の話に繋がりますが、プロジェクトフォルダのrootに".env"フォルダを作成し、その中に"openaikey.ini"ファイルを作成して次のようにKeyをペーストして保存します。
OPENAI_API_KEY=sk-*****
.gitignoreで.envフォルダを指定しておくことを忘れずに。
Step1: 環境準備
ffmpegはライブラリ導入以外に自PC内にもインストールする必要があります。コマンドプロンプトを管理者権限で開いて次のコマンドを打ち、[y]を押してインストール完了させます。
winget install --id=Gyan.FFmpeg -e
詳細はこちらの記事を参考にしました
Step2: 音声データの前処理
一通り準備が出来たら、以下のコードで前処理を実施します。一部ハイパーパラメータが入っているので適宜変更してください。
私の場合は、6講演の音声データを録音していたのでまとめて処理するコードを作成しています。m4aからwavに変換します。
import os
import re
from pathlib import Path
import ffmpeg
# Audio Preprocessing: Resampling to 16 kHz Mono & Loudness Normalization
def preprocess_audio(input_file: str, output_file: str):
"""
1. Resample audio to 16 kHz, mono
2. Apply loudness normalization (ITU-R BS.1770)
Requirements:
- Install ffmpeg on your system
- pip install ffmpeg-python
Args:
input_file: Path to the original audio (e.g., "input.m4a")
output_file: Path for the preprocessed output (e.g., "preprocessed.wav")
"""
(
ffmpeg
.input(input_file)
# -ar 16000: sampling rate 16 kHz
# -ac 1: mono channel
# -af "loudnorm": apply loudness normalization
.output(output_file, ar=16000, ac=1, af="loudnorm")
.overwrite_output()
.run(quiet=True)
)
print(f"Preprocessing completed: {output_file}")
# If there are multiple files, use this function to process them.
def batch_preprocess(directory: str):
"""
For files in the specified folder, preprocess all audio files matching
the pattern 'YYYYY_####.m4a' and save them as .wav files of the same name.
"""
# re pattern
pattern = re.compile(r'^YYYYY_\d{4}\.m4a$', re.IGNORECASE)
# Path
dir_path = Path(directory)
# loop in the directry
for file_path in dir_path.iterdir():
if file_path.is_file() and pattern.match(file_path.name):
# setting inputpath and outputpath
input_file = str(file_path)
output_file = str(file_path.with_suffix('.wav'))
# run preprocessing
print(f"Preprocessing {input_file} -> {output_file}")
preprocess_audio(input_file=input_file, output_file=output_file)
batch_preprocess(
directory="m4afile_directry" # Specify the directory containing the target m4a file.
)
Step3: 音声データからjson形式で文字起こし
ここからは別のpythonファイルで実行します。
import os
import re
from dotenv import load_dotenv
from pathlib import Path
import configparser
import openai
import json
from pydub import AudioSegment
import math
import tempfile
念のためKEYが正しく読み込まれているか確認します。これでエラーがでるなら、ini_pathを修正してください。
print(openai.__version__)
notebook_dir = Path.cwd() # e.g. root/src/main
project_root = notebook_dir.parents[1] # → root
ini_path = project_root / ".env" / "openaikey.ini"
print(f"Loading API key from: {ini_path}")
load_dotenv(dotenv_path=str(ini_path))
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError(f"OPENAI_API_KEY not found. Checked: {ini_path}")
# confirmation
print("[CHECK] OPENAI_API_KEY is set:", bool(openai.api_key))
講演時間は大体15分から25分です。Whisperは大きいファイルを読み込んでくれないので指定長にした後処理して、そのあと一つのjsonファイルに保存する関数です。
jsonのセグメントは適宜修正可能です。
def transcribe_large_file(input_path, output_path, chunk_length_ms=5 * 60 * 1000):
"""
Large audio files are split into sections of a specified length (default 5 minutes),
transcribed sequentially using the Whisper API (v1.x), and the results are compiled into a
single JSON file.
"""
# 0. check API key
if not openai.api_key:
raise RuntimeError("openai.api_key is not set. Please configure your API key first.")
# 1. load audio file
audio = AudioSegment.from_file(input_path)
total_length_ms = len(audio)
num_chunks = math.ceil(total_length_ms / chunk_length_ms)
all_segments = []
for i in range(num_chunks):
start_ms = i * chunk_length_ms
end_ms = min((i + 1) * chunk_length_ms, total_length_ms)
chunk = audio[start_ms:end_ms]
# export temporary file
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
chunk.export(tmp.name, format="wav")
tmp_path = tmp.name
# 2. transcription with Whisper AI
with open(tmp_path, "rb") as f:
resp = openai.audio.transcriptions.create(
model="whisper-1",
file=f,
response_format="verbose_json"
)
# 3. check segmentation and tuning time off set
for seg in resp.segments:
adjusted = {
"segment_id": len(all_segments) + 1,
"start_time": seg.start + (start_ms / 1000),
"end_time": seg.end + (start_ms / 1000),
"text": seg.text,
"confidence": 1 - getattr(seg, "no_speech_prob", 0.0),
"topics": []
}
all_segments.append(adjusted)
# delete temporary file
os.remove(tmp_path)
# 4. helper format time exchange
def format_time(sec: float) -> str:
total_ms = int(sec * 1000)
ms = total_ms % 1000
s = (total_ms // 1000) % 60
m = (total_ms // (1000 * 60)) % 60
h = total_ms // (1000 * 60 * 60)
return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}"
# 5. JSON file
output = {
"session_title": os.path.basename(input_path),
"segments": []
}
for seg in all_segments:
output["segments"].append({
"segment_id": seg["segment_id"],
"start_time": format_time(seg["start_time"]),
"end_time": format_time(seg["end_time"]),
"text": seg["text"],
"confidence": seg["confidence"],
"topics": seg["topics"]
})
# 6. output
with open(output_path, "w", encoding="utf-8") as wf:
json.dump(output, wf, ensure_ascii=False, indent=2)
print(f"Transcription complete ➤ {output_path}")
INPUT_PATH = r"wav file path"
OUTPUT_PATH = r"json file path"
transcribe_large_file(INPUT_PATH, OUTPUT_PATH)
Step4: topicsを抽出
topicsには何も入っていないので、ここからさらにAIにtopicsを作ってもらいます。今回は英語話者と日本語話者がいたので2言語対応版です。モデルはgpt-3.5-turboにしましたが、より正確に処理したい場合は上位のモデルを選択してください。
注意:トピックスといっても、テキスト内で最もベクトルが大きい単語を抽出しているだけなのでその点は留意しなければなりません。
def extract_keywords_with_gpt(text: str, max_keywords: int = 5) -> list[str]:
"""
Extracts keywords from the given text using GPT and returns them as a JSON array.
Automatically distinguishes between Japanese and English.
"""
system_prompt = (
"You are a helpful assistant that extracts the most important keywords "
"from a given text. Please return the keywords as a JSON array of strings."
)
user_prompt = f"Text:\n```{text}```\n\nPlease list up to {max_keywords} keywords."
resp = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.0,
)
# 1) pick op to choice and message
choice = resp.choices[0]
message = choice.message
# 2) treat "" if content is None
raw_content = message.content or ""
# 3) remove leading and trailing white space
content = raw_content.strip()
try:
# Parse GPT output if it is a JSON array
return json.loads(content)
except json.JSONDecodeError:
# Fallback: Return a newline-separated list
return [kw.strip() for kw in content.splitlines() if kw.strip()]
def annotate_segments_with_keywords(input_json, output_json):
"""
It reads the transcription JSON file, adds the GPT extracted keywords to
each segment as a 'topics' field, and outputs them to a separate file.
"""
with open(input_json, encoding="utf-8") as f:
data = json.load(f)
for seg in data.get("segments", []):
text = seg.get("text", "")
if text:
keywords = extract_keywords_with_gpt(text, max_keywords=5)
seg["topics"] = keywords
with open(output_json, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
annotate_segments_with_keywords(
input_json=r"json file path",
output_json=r"json with topics file path"
)
Step5: いくつかの表記ゆれ対応
topicsへはいくつかの特徴的なワードの抽出は日本語英語ともに実行できていましたが、指示プロンプトが甘いせいか表記ゆれを確認しました。表記ゆれを修正するコードを別途作成して実行します。ここは状況によって不必要になる場合もあるかと思います。
def clean_topics(raw_topics: list[str]) -> list[str]:
"""
Given a list of raw topic strings (possibly including code fences or fragmented JSON),
remove code fence lines, extract the JSON array substring, parse it,
or fallback to extracting quoted substrings if parsing fails.
"""
# 1) Remove any lines starting with backticks (```json, ```)
filtered = [line for line in raw_topics if not re.match(r'^\s*`+.*', line)]
# 2) Combine remaining parts into a single string
combined = ' '.join(filtered).strip()
# 3) Extract JSON array between first '[' and last ']'
start = combined.find('[')
end = combined.rfind(']')
if start == -1 or end <= start:
return []
json_str = combined[start:end+1]
# 4) Remove trailing commas before closing bracket
json_str = re.sub(r',\s*]', ']', json_str)
# 5) Try parsing as JSON; fallback to regex if it fails
try:
return json.loads(json_str)
except json.JSONDecodeError:
return re.findall(r'"([^"]+)"', json_str)
def normalize_topics_field(topics_field) -> list[str]:
"""
Normalize the 'topics' field into a list of strings.
- If it's already a list of simple strings (no fences, quotes, brackets), return as is.
- If it's a dict with 'keywords' key, return that list.
- If it's a list with fences or JSON fragments, clean it with clean_topics().
- Otherwise, return an empty list.
"""
# Case: dict with 'keywords'
if isinstance(topics_field, dict):
keywords = topics_field.get("keywords")
return keywords if isinstance(keywords, list) else []
# Case: list
if isinstance(topics_field, list):
# Check if it's already a plain list of topic strings
if all(isinstance(item, str)
and '`' not in item
and '"' not in item
and '[' not in item
and ']' not in item
for item in topics_field):
return topics_field
# Otherwise, clean the list
return clean_topics(topics_field)
# Fallback
return []
def clean_topics_in_json(input_json_path: str, output_json_path: str):
"""
Load the JSON file, normalize each segment's 'topics' field,
and write the cleaned data to output_json_path.
"""
with open(input_json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
for segment in data.get("segments", []):
segment["topics"] = normalize_topics_field(segment.get("topics", []))
Path(output_json_path).write_text(
json.dumps(data, ensure_ascii=False, indent=2),
encoding='utf-8'
)
print(f"Cleaned JSON saved to: {output_json_path}")
def batch_clean_topics(directory: str):
"""
Given a directory path, find all files matching 'AOAI2025_####_with_topics.json'
and generate corresponding '_with_topics_cleaned.json' files by normalizing their topics.
"""
pattern = re.compile(r"^AOAI2025_(\d{4})_with_topics\.json$")
dir_path = Path(directory)
for file_path in dir_path.iterdir():
if file_path.is_file():
m = pattern.match(file_path.name)
if m:
session_num = m.group(1)
input_file = str(file_path)
output_file = str(dir_path / f"AOAI2025_{session_num}_with_topics_cleaned.json")
print(f"Cleaning topics: {input_file} -> {output_file}")
clean_topics_in_json(input_json_path=input_file, output_json_path=output_file)
base_dir = "json file folder path"
batch_clean_topics(base_dir)
Step6: 全体を要約
temperatureを設定しますが、おおよそ以下の通りとされています。
低い値(0.0〜0.3)
- 決定論的で安定:最も確率の高いトークンを優先的に選択します
- 創造性は低め:繰り返し同じような表現になりやすい
- 向いている用途:事実ベースの要約、コード生成、正確性重視のタスク
中程度の値(0.4〜0.7)
- 創造性と焦点のバランス:語いのバリエーションが増えつつ、要点は外れにくい
- 向いている用途:適度な独創性を求めるサマリー、マイルドなアイデア生成
高い値(0.8〜1.2)
- 多様性重視:確率が低いトークンも選びやすくなり、意外な表現が生まれる
- リスク:話題から逸れたり、生成内容が不安定になることがある
- 向いている用途:創作ライティング、幅広いアイデア出し、複数案を比較したい場合
modelの設定
先ほどまではgpt3.5-turboを使っていました。テキスト1つ当たりのトークン長が短かったためです。ここからは要約なのでより強力なgpt4o-miniを使うこととします。
mdファイルの言語設定
今回はmdファイルで出力させます。言語設定をしたい場合は、user_promptに出力言語を指定してください。
The language written in the md file must be Japanese.
def generate_session_summary_md(
input_json_path: str,
output_md_path: str,
model: str = "gpt-4o-mini",
temperature: float = 0.6
):
# load json
data = json.loads(Path(input_json_path).read_text(encoding="utf-8"))
session_title = data.get("session_title", Path(input_json_path).stem)
segments = data.get("segments", [])
# concat all text
full_text = "\n".join(seg.get("text","").strip() for seg in segments)
# get unique topics list
unique_topics = sorted({t for seg in segments for t in seg.get("topics", [])})
topics_str = ", ".join(unique_topics)
# design prompt
system_prompt = "You are a concise summarization assistant for meetings."
user_prompt = (
f"Topics to cover: {topics_str}\n\n"
"Here is the full transcript of a conference session:\n\n"
f"{full_text}\n\n"
"Please provide a concise summary of the entire session, "
"highlighting key points and conclusions, and ensure you cover each of the listed topics."
)
# completion api
resp = openai.chat.completions.create(
model=model,
messages=[
{"role":"system", "content":system_prompt},
{"role":"user", "content":user_prompt}
],
temperature=temperature
)
summary = resp.choices[0].message.content.strip()
# output by markdown
md = [
f"# {session_title}",
"",
"## Session Summary",
"",
summary,
""
]
Path(output_md_path).write_text("\n".join(md), encoding="utf-8")
print(f"Session summary saved to: {output_md_path}")
input_cleaned_jsonpath = "topics_cleaned_json filepath"
output_mdpath = "md filepath"
generate_session_summary_md(
input_json_path=input_cleaned_jsonpath,
output_md_path=output_mdpath
)