はじめに
本記事では、Pythonを用いてMP3ファイルの結合と音声区間検出による分割を行う方法を紹介します。
これらの処理は、音声データの編集、分析、活用において非常に役立ちます。具体的な例としては、以下のようなものが挙げられます。
- 複数の音声ファイルから1つの音声ファイルを作成する
- 音声データから無音区間を削除する
- 音声認識や音声合成の精度向上のための前処理
本記事では、PyDubとWebRTC VADというライブラリを用いて、上記の処理を実現するコードを紹介します。
今回のスクリプトは音声合成モデルを作成する際の学習データ作成に利用しています。音声合成モデルは以下の記事で紹介しています。
想定読者
- 音声データ処理に興味のある方
- Pythonの基本的な知識をお持ちの方
- MP3ファイルの結合や音声区間の分割処理をしたい方
実行環境
- Python 3.7以上
- pip: pydub, webrtcvad
- Google Colab、WSL2
コード
コードの説明
1. MP3ファイル結合(merge_speech.ipynb)
# 最初のファイルを読み込む
combined = AudioSegment.from_mp3(os.path.join(subfolder_path, mp3_files[0]))
# 残りのファイルを結合
for mp3_file in mp3_files[1:]:
audio = AudioSegment.from_mp3(os.path.join(subfolder_path, mp3_file))
combined += audio
combined.export(output_file, format='mp3')
pydubのAudioSegment機能を使い、結合したい順に音声ファイルを追加していき、一つのmp3ファイルとして出力する。
mp3ファイルが格納されているフォルダや結合したい順に合わせて、コードのソート部分を変更する。
今回はdata/XXX/01_〇〇.mp3とdata/XXX/02_〇〇.mp3のようなデータをまとめてXXX.mp3としてmerge_mp3に出力するようなコードを作成している。
# dataフォルダのパスを指定
data_folder = 'data'
# 結合後のファイルを保存するフォルダを指定
output_folder = 'merge_mp3'
os.makedirs(output_folder, exist_ok=True)
# dataフォルダ内のサブフォルダを取得
subfolders = [f for f in os.listdir(data_folder) if os.path.isdir(os.path.join(data_folder, f))]
for subfolder in subfolders:
subfolder_path = os.path.join(data_folder, subfolder)
# mp3ファイルを取得し、ソート
mp3_files = sorted([f for f in os.listdir(subfolder_path) if f.endswith('.mp3') and (f.startswith('01') or f.startswith('02'))])
2. 音声区間検出(split_speech.ipynb)
このコードは生成AIを使って、ベースを作成しております。
import collections
import contextlib
import sys
import wave
import pathlib
import webrtcvad
def read_audio(path):
"""MP3またはWAVファイルを読み込みます。
パスを受け取り、(PCMオーディオデータ, サンプルレート)を返します。
"""
import pydub
audio = pydub.AudioSegment.from_file(path)
# モノラルに変換
if audio.channels > 1:
audio = audio.set_channels(1)
# サンプルレートを確認
sample_rate = audio.frame_rate
assert sample_rate in (8000, 16000, 32000, 48000), f"サポートされていないサンプルレート: {sample_rate}"
# PCMデータを取得
pcm_data = audio.raw_data
return pcm_data, sample_rate
def read_wave(path):
"""WAVファイルを読み込みます。
パスを受け取り、(PCMオーディオデータ, サンプルレート)を返します。
"""
with contextlib.closing(wave.open(path, 'rb')) as wf:
num_channels = wf.getnchannels()
assert num_channels == 1
sample_width = wf.getsampwidth()
assert sample_width == 2
sample_rate = wf.getframerate()
assert sample_rate in (8000, 16000, 32000, 48000)
pcm_data = wf.readframes(wf.getnframes())
return pcm_data, sample_rate
def write_wave(path, audio, sample_rate):
"""WAVファイルを書き込みます。
パス、PCMオーディオデータ、サンプルレートを受け取ります。
"""
with contextlib.closing(wave.open(path, 'wb')) as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(audio)
class Frame(object):
"""オーディオデータの「フレーム」を表現するクラス"""
def __init__(self, bytes, timestamp, duration):
self.bytes = bytes
self.timestamp = timestamp
self.duration = duration
def frame_generator(frame_duration_ms, audio, sample_rate):
"""PCMオーディオデータからオーディオフレームを生成します。
フレーム長(ミリ秒)、PCMデータ、サンプルレートを受け取り、
指定された長さのフレームを生成します。
"""
n = int(sample_rate * (frame_duration_ms / 1000.0) * 2)
offset = 0
timestamp = 0.0
duration = (float(n) / sample_rate) / 2.0
while offset + n < len(audio):
yield Frame(audio[offset:offset + n], timestamp, duration)
timestamp += duration
offset += n
def vad_collector(sample_rate: int, frame_duration_ms: int,
padding_duration_ms: int, vad: webrtcvad.Vad, frames: list[Frame],
voice_trigger_on_thres: float=0.9, voice_trigger_off_thres: float=0.1) -> list[dict]:
"""音声非音声セグメント処理
Args:
sample_rate (int): 単位時間あたりのサンプル数[Hz]
frame_duration_ms (int): フレーム長
padding_duration_ms (int): ガード長
vad (webrtcvad.Vad): VADオブジェクト
frames (list[Frame]): フレーム分割された音声データ
voice_trigger_on_thres (float, optional): 音声セグメント開始と判断する閾値。デフォルト値は0.9
voice_trigger_off_thres (float, optional): 音声セグメント終了と判断する閾値。デフォルト値は0.1
Returns:
list[dict]: セグメント結果
"""
# ガードするフレーム数
num_padding_frames = int(padding_duration_ms / frame_duration_ms)
# フレームバッファ
frame_buffer = []
# 音声検出トリガーの状態
triggered = False
voiced_frames = []
vu_segments = []
for frame in frames:
is_speech = vad.is_speech(frame.bytes, sample_rate)
frame_buffer.append((frame, is_speech))
# 非音声セグメントの処理
if not triggered:
# 過去フレームの音声判定数を計算
num_voiced = len([f for f, speech in frame_buffer[-num_padding_frames:] if speech])
# 音声セグメント開始の判定
if num_voiced > voice_trigger_on_thres * num_padding_frames:
triggered = True
# 非音声セグメントの保存
audio_data = b''.join([f.bytes for f, _ in frame_buffer[:-num_padding_frames]])
vu_segments.append({"vad": 0, "audio_size": len(audio_data), "audio_data": audio_data})
# 音声フレームの保持
for f, _ in frame_buffer[-num_padding_frames:]:
voiced_frames.append(f)
frame_buffer = []
# 音声セグメントの処理
else:
voiced_frames.append(frame)
# 過去フレームの非音声判定数を計算
num_unvoiced = len([f for f, speech in frame_buffer[-num_padding_frames:] if not speech])
# 音声セグメント終了の判定
if num_unvoiced > (1 - voice_trigger_off_thres) * num_padding_frames:
triggered = False
# 音声セグメントの保存
audio_data = b''.join([f.bytes for f in voiced_frames])
vu_segments.append({"vad": 1, "audio_size": len(audio_data), "audio_data": audio_data})
voiced_frames = []
frame_buffer = []
# 最終セグメントの処理
if triggered:
audio_data = b''.join([f.bytes for f in voiced_frames])
vu_segments.append({"vad": 1, "audio_size": len(audio_data), "audio_data": audio_data})
else:
audio_data = b''.join([f.bytes for f, _ in frame_buffer])
vu_segments.append({"vad": 0, "audio_size": len(audio_data), "audio_data": audio_data})
return vu_segments
3. 音声区間の分割(split_speech.ipynb)
音声区間検出を使って、音声区間と非音声区間をセグメント分割し、保存するコード。
音声区間を明確に取得したい場合はvad_collectorのvoice_trigger_off_thresの閾値を高くするとよい
def split_segments(input_dir, filename, vad_aggressiveness: int=3, frame_duration_ms: int=30, padding_duration_ms: int=300):
"""音声区間の分割
Args:
input_dir (str): 音声ファイルのあるディレクトリ
filename (str): 音声ファイル名
vad_aggressiveness (int, optional): VADの積極性. Defaults to 3.
frame_duration_ms (int, optional): フレーム長. Defaults to 30.
padding_duration_ms (int, optional): ガード長. Defaults to 300.
"""
# ファイル名から拡張子を取り除く
basename = os.path.splitext(filename)[0]
# ファイルパスを取得
filepath = os.path.join(input_dir, filename)
# 音声データを読み込み
audio_data, sample_rate = read_audio(filepath)
# VADのインスタンスを作成
vad = webrtcvad.Vad(vad_aggressiveness)
# 音声データからフレームを生成
frames = frame_generator(frame_duration_ms, audio_data, sample_rate)
# 音声非音声セグメント処理
segments = vad_collector(
sample_rate, frame_duration_ms, padding_duration_ms,
vad, frames
)
# ファイルの保存
for i, segment in enumerate(segments):
if segment["vad"]:
# 音声セグメントの場合
output_filename = f"{basename}_vad_{i}.wav"
output_filepath = os.path.join(input_dir, output_filename)
write_wave(output_filepath, segment["audio_data"], sample_rate)
print(f"音声区間: {output_filepath}")
参考情報
- PyDub: https://pypi.org/project/pydub/
- WebRTC VAD: https://pypi.org/project/webrtcvad/
最後に
本記事では、Pythonを用いてMP3ファイルの結合と音声区間検出による分割を行う方法を紹介しました。これらの処理を用いることで、音声データの編集、活用が可能になります。
ぜひ、本記事を参考にしてみてください。