そもそもリピーティングとは
学校の英語の授業で、英語の先生が英語を喋って、その後に続いて生徒が喋るのがリピーティングなのですが、学校に通っていなくて家で勉強する時には
一文スピーキング
ブランク
また一文スピーキング
ブランク
:
という音源を作らないといけない。これを自動でできないかなぁ…と思って作ってみました。
Google Cloud Speech To Text はスピーチをテキストに変更するだけではなくて、時間オフセット(タイムスタンプ)を取得する事ができます。
JSONサンプル
"transcript": "Four score and twenty...(etc)...",
"confidence": 0.97186122,
"words": [
{
"startTime": "1.300s",
"endTime": "1.400s",
"word": "Four"
},
{
"startTime": "1.400s",
"endTime": "1.600s",
"word": "score"
},
これを利用して、手持ちのMP3の英語スピーチ音源を、一文ごとにカット。
そして10秒ブランクをつける。そして一つのMP3ファイルにする。
もう少し具体的にご説明をすると
- MP3ファイルをCloud Storageにアップロード(1MBを超えるとStoregeに入れないと解析できないため)
- Speech To Textで分析をする。
- 一文の音声の時間を計算し、一文の音声を取り出す。
- ブランクを10秒つける。
- 音源が終わるまで繰り返す。
準備
Google Cloudを用意する。
ここは割愛します。
ffmpegを用意する。
音声を操作するためのPythonライブラリです。
ここではMacで紹介します。
Homebrewをインストールする。
すでにインストールされているかを確認します。
brew --version
インストールされていない場合は以下を実行します。
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
そしてffmpegのインストール
brew install ffmpeg
ちゃんとインストールできたかを確認する。
ffmpeg -version
各ライブラリのインストール
requirements.txtは以下のようになります。
google-cloud-speech
google-cloud-storage
pydub
これらをインストール
python pip install requirements.txt
サービスアカウントを用意する。
Google Cloudコンソールから
「IAMと管理」→「サービスアカウント」を開き、「サービスアカウントを作成」をクリックします。
サービスアカウント名は何でもいいのですが、わかりやすいものにしてください。
例えば、sentence-cutter-speech2text
説明は必ず書き込んでください。時間が経つと絶対に忘れます。
例えば、文章ごとに分割するためようのサービスアカウント
権限は「Storage オブジェクトユーザ」を選んでください。
しかし、以下の権限しか使わないので、後々不要な権限は削除してください。
storage.objects.create
storage.objects.get
storage.objects.delete
作成後、サービスアカウントの「アクション」列にある3点メニューをクリックし、「キーを管理」→「キーを追加」→「新しいキーを作成」を選択します。
「JSON」を選択して「作成」をクリックすると、JSON形式の認証情報ファイルがダウンロードされます。このファイルは取り扱いに気をつけて、安全な場所に保管してください。パブリックな場所に置かないようにしてください。
バケットを用意する。
バケット名はグローバルでユニークであれば何でもいいのですが、できれば推測されにくい名前にしてあげてください。
Pythonのプログラムを用意する。
JSONのファイル名、MP3のファイル名、バケットのファイル名はご自身の物に変えてください。
実際にやってみたところ、一文の長さが切れているため、プラス0.5秒(500ミリ秒)しています。
import os
import re
import math
from pydub import AudioSegment
from google.cloud import speech_v1p1beta1 as speech
from google.cloud import storage
# --- 設定項目 ---
# 環境変数GOOGLE_APPLICATION_CREDENTIALSの設定 (ローカル実行用)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "" # JSONのファイル名のパスを記載してください
# 入力MP3ファイルのパス
INPUT_MP3_PATH = "" # ここに分割したいMP3ファイルのパスを指定してください
# 出力ファイル名とディレクトリ
# 単一のMP3ファイルとして出力するため、ファイル名を指定します
OUTPUT_MP3_FILENAME = "combined_sentences_with_blanks.mp3"
OUTPUT_DIR = "output_combined_audio" # 出力ファイルが保存されるディレクトリ
# Google Cloud Storage バケット名 (長い音声ファイルの場合に必要)
# ご自身のGCSバケット名に変更してください
GCS_BUCKET_NAME = "" # 例: "my-speech-to-text-audio-bucket-hogehoge-jfidsjf;ajg425^&*^"
# 音声ファイルの言語コード (例: 英語は "en-US")
# その他の言語コード: https://cloud.google.com/speech-to-text/docs/languages
LANGUAGE_CODE = "en-US"
# 音声認識モデル (より長い音声に適したモデル)
# "default" または "video", "latest_long" など
# https://cloud.cloud.google.com/speech-to-text/docs/multi-model-recognition
SPEECH_MODEL = "latest_long"
# 短い音声ファイルの最大秒数 (これより長いとGCS経由で処理)
MAX_INLINE_AUDIO_SECONDS = 58 # 約1分 (Google Cloud Speech-to-Textの制限)
# 各文の後に追加するブランクの長さ (ミリ秒)
BLANK_DURATION_MS = 10 * 1000 # 10秒
# --- 関数定義 ---
def upload_to_gcs(local_file_path, bucket_name):
"""
ローカルファイルをGoogle Cloud Storageにアップロードする。
"""
client = storage.Client()
bucket = client.bucket(bucket_name)
blob_name = os.path.basename(local_file_path)
blob = bucket.blob(blob_name)
print(f"GCSにファイルをアップロード中: {local_file_path} -> gs://{bucket_name}/{blob_name}")
blob.upload_from_filename(local_file_path)
print("アップロード完了。")
return f"gs://{bucket_name}/{blob_name}"
def delete_from_gcs(gcs_uri):
"""
Google Cloud Storageからファイルを削除する。
"""
try:
bucket_name = gcs_uri.split("//")[1].split("/")[0]
blob_name = "/".join(gcs_uri.split("//")[1].split("/")[1:])
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
if blob.exists():
blob.delete()
print(f"GCSから一時ファイルを削除しました: {gcs_uri}")
else:
print(f"GCSファイルは存在しませんでした: {gcs_uri}")
except Exception as e:
print(f"GCSファイル削除中にエラーが発生しました: {e}")
def get_transcription_with_timestamps(audio_path, language_code, model, gcs_bucket_name):
"""
Google Cloud Speech-to-Text APIを使用して文字起こしと単語のタイムスタンプを取得する。
"""
client = speech.SpeechClient()
# AudioSegmentを使って音声の長さを確認
audio_segment = AudioSegment.from_mp3(audio_path)
audio_duration_seconds = len(audio_segment) / 1000.0
print(f"音声ファイルの長さ: {audio_duration_seconds:.2f}秒")
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.MP3,
sample_rate_hertz=audio_segment.frame_rate, # MP3ファイルのサンプルレートを使用
language_code=language_code,
enable_word_time_offsets=True, # 単語のタイムスタンプを有効にする
enable_automatic_punctuation=True, # 自動句読点を有効にする (文の区切り検出に重要)
model=model,
)
gcs_uri = None
if audio_duration_seconds > MAX_INLINE_AUDIO_SECONDS:
# 長い音声ファイルはGCSにアップロード
gcs_uri = upload_to_gcs(audio_path, gcs_bucket_name)
audio = speech.RecognitionAudio(uri=gcs_uri)
print("長い音声ファイルのため、Long Running Operationを開始します。完了までお待ちください...")
operation = client.long_running_recognize(config=config, audio=audio)
response = operation.result(timeout=300) # タイムアウトは5分 (300秒)
print("Long Running Operationが完了しました。")
else:
# 短い音声ファイルは直接送信
with open(audio_path, "rb") as audio_file:
content = audio_file.read()
audio = speech.RecognitionAudio(content=content)
print("短い音声ファイルのため、直接APIを呼び出します。")
response = client.recognize(config=config, audio=audio)
print("API呼び出しが完了しました。")
# GCSから一時ファイルを削除
if gcs_uri:
delete_from_gcs(gcs_uri)
if not response.results:
print("文字起こしの結果が見つかりませんでした。")
return None, None
# 各結果(認識された音声セグメント)から単語とタイムスタンプを収集
# 結果は話者の区切りや文脈によって複数に分かれることがあります
all_words = []
full_transcript = ""
for result in response.results:
# 認識されたテキストと単語のリストを取得
alternative = result.alternatives[0]
full_transcript += alternative.transcript
for word_info in alternative.words:
all_words.append({
"word": word_info.word,
"start_time": word_info.start_time.total_seconds(),
"end_time": word_info.end_time.total_seconds()
})
return full_transcript, all_words
def split_audio_into_sentences(input_audio_path, output_directory, output_filename, all_words, blank_duration_ms):
"""
単語のタイムスタンプと句読点情報に基づいて音声を文ごとに分割し、
各文の後に無音を追加して結合し、単一のファイルとして保存する。
"""
if not all_words:
print("単語のタイムスタンプデータがないため、結合できません。")
return
# 出力ディレクトリが存在しない場合は作成
os.makedirs(output_directory, exist_ok=True)
audio = AudioSegment.from_mp3(input_audio_path)
# 結合される最終的なオーディオセグメントを初期化
combined_audio_output = AudioSegment.empty()
sentence_count = 0
current_sentence_words = []
current_sentence_start_time_ms = -1
for i, word_info in enumerate(all_words):
word = word_info["word"]
start_time_ms = math.floor(word_info["start_time"] * 1000)
end_time_ms = math.ceil(word_info["end_time"] * 1000)
# 最初の単語の場合、または新しい文の開始の場合
if not current_sentence_words:
current_sentence_start_time_ms = start_time_ms
current_sentence_words.append(word)
print(f" 検出された単語: {word} (開始: {word_info['start_time']:.2f}, 終了: {word_info['end_time']:.2f})")
# 単語が句読点 (. ? !) で終わるかをチェック
is_sentence_end_by_punctuation = word.endswith(('.', '?', '!'))
# 次の単語がある場合に、現在の単語と次の単語の間に一定以上の無音があるかをチェック
is_sentence_end_by_pause = False
if i + 1 < len(all_words):
next_word_start_time_ms = math.floor(all_words[i+1]["start_time"] * 1000)
if next_word_start_time_ms - end_time_ms > 500: # 500ミリ秒以上の無音を文の区切りとみなす
is_sentence_end_by_pause = True
elif i == len(all_words) - 1: # 最後の単語は常に文の終わり
is_sentence_end_by_punctuation = True
# 文の終わりが検出されたら、音声を切り出し、無音を追加して結合
if is_sentence_end_by_punctuation or is_sentence_end_by_pause:
sentence_count += 1
sentence_text = " ".join(current_sentence_words).strip()
if current_sentence_start_time_ms == -1 or end_time_ms == -1 or current_sentence_start_time_ms >= end_time_ms:
print(f" 警告: 文 '{sentence_text}' のタイムスタンプが無効です。スキップします。")
current_sentence_words = []
current_sentence_start_time_ms = -1
continue
# 終了時刻を0.5秒 (500ミリ秒) 延長する
extended_end_time_ms = min(end_time_ms + 500, len(audio)) # 音声の総時間を超えないように制限
print(f"\n--- 文 {sentence_count}: {sentence_text} ---")
print(f" 文のタイムスタンプ: 開始 {current_sentence_start_time_ms/1000:.2f}秒, 終了 {extended_end_time_ms/1000:.2f}秒 (元の終了: {end_time_ms/1000:.2f}秒)")
try:
sentence_audio_segment = audio[current_sentence_start_time_ms:extended_end_time_ms]
if len(sentence_audio_segment) > 0:
# 文のオーディオセグメントを結合に追加
combined_audio_output += sentence_audio_segment
print(f" 文のオーディオを追加しました。")
# 各文の後にブランクを追加 (最後の文には不要な場合があるが、ここでは一律追加)
if i < len(all_words) - 1: # 最後の文以外にブランクを追加
blank_segment = AudioSegment.silent(duration=blank_duration_ms, frame_rate=audio.frame_rate)
combined_audio_output += blank_segment
print(f" {blank_duration_ms/1000:.1f}秒のブランクを追加しました。")
else:
print(f" 警告: 文 '{sentence_text}' の音声セグメントが空のため、追加をスキップします。")
except Exception as e:
print(f" エラー: 文 '{sentence_text}' の処理中に問題が発生しました: {e}")
# 現在の文の単語リストをリセット
current_sentence_words = []
current_sentence_start_time_ms = -1
# 結合されたオーディオを単一のMP3ファイルとしてエクスポート
final_output_path = os.path.join(output_directory, output_filename)
if len(combined_audio_output) > 0:
try:
combined_audio_output.export(final_output_path, format="mp3")
print(f"\nすべての文とブランクが結合され、'{final_output_path}' として保存されました。")
print(f"総時間: {len(combined_audio_output) / 1000:.2f}秒")
except Exception as e:
print(f"\nエラー: 最終出力ファイルの保存中に問題が発生しました: {e}")
else:
print("\n警告: 結合されたオーディオが空のため、ファイルは保存されませんでした。")
print(f"\n合計 {sentence_count} 個の文が処理されました。")
# --- メイン処理 ---
if __name__ == "__main__":
if not os.path.exists(INPUT_MP3_PATH):
print(f"エラー: 入力ファイル '{INPUT_MP3_PATH}' が見つかりません。パスを確認してください。")
else:
print(f"入力ファイル: {INPUT_MP3_PATH}")
print("Google Cloud Speech-to-Text APIで文字起こしを実行中...")
full_transcript, all_words = get_transcription_with_timestamps(
INPUT_MP3_PATH, LANGUAGE_CODE, SPEECH_MODEL, GCS_BUCKET_NAME
)
if full_transcript and all_words:
print("\n--- 文字起こし結果 ---")
print(full_transcript)
print("\n--- 音声を結合して最終ファイルを作成中 ---")
split_audio_into_sentences(
INPUT_MP3_PATH, OUTPUT_DIR, OUTPUT_MP3_FILENAME, all_words, BLANK_DURATION_MS
)
else:
print("文字起こしが失敗したか、結果が空のため、音声の結合をスキップします。")
作ってみて
今まではプレイ中に一文終わった瞬間に自分で止めればいいのですが、毎回集中しながら聞くのもシンドく、これなら自分でリピーティング学習をする上で便利になりました。
と思っていたのですが、子どもたちにコレを勧めてて見たところ、「一文終わった瞬間に自分で止めればいい」と言われてしまいました。
こういう教材を見たことがないのですが、何故無いんだろう?と思っていたのですが、世間的にはニーズが少ないのかも…。