「Pythonで長い会議を見える化」シリーズ、先日の動向記事で得た知識をもとに、全面リライトしてみました。
以下のような修正を加えました。
- 特徴量として、フーリエ変換した周波数成分を使っていたが、人間が聴く音の特徴量として基本的に利用されているMFCC(メル周波数ケプストラム係数)を採用
- 無音有音のクラスタリングであるVAD(Voice Activity Detector)と、有音部の話者ダイアリゼーションを区別して、段階的に処理することに変更
- 全体的にクラスとメソッドに書き換えて、汎用性・再利用性を向上
本来、話者ダイアリゼーションには、i-vecterやx-vectorといった特徴量を使うのですが、複雑になるため、それらの基礎となるMFCCを使っています。結果として12次元のMFCCでも、十分な結果を得られることができました。
なお、MFCCについては、前回の記事でもリンクしています、「様々な音響特徴量 それぞれの使い方や意味を教えて下さい」がとてもわかりやすいです。
今回、コードが長くなってきましたので、最初に結果を示します。
以下がMFCCの生データです。MFCCは低次元ベクトルが強く出る傾向がありますので、各行の下端が赤っぽいだけで、全体的に青っぽいグラフになります。
以下がMFCCデータを、kmeansでクラスタリングしてVADした結果です。VADについてはほぼ完璧な精度になっています。なお、MFCC低次元ベクトルが強い正規化前のデータの方が、VADの精度が良いようです。このことは、次でも言及します。
以下がMFCCを正規化したデータです。青っぽさが消えて全体的に数値が分散しています。無音部は黄色っぽくなっており、高次元MFCCでは有音部よりも値が大きくなるという、面白い特徴があります。なお完全にパワー0の入力だとMFCCも0となり、このような特徴は現れません。すなわち、無音有音の区別には、低次元MFCCに着目すると良さそうです。実際に、低次元MFCCの数値が大きく出る正規化前のデータを使った方が、VADの精度がよくなります。
以下が正規化後のデータをもとに、有音部を5秒単位でSpectralClusteringにより5クラスに分類した、話者ダイアリゼーション結果です。結果をもとに短時間の凸凹はならしたものにしています。とても、正解データに近い分類ができています。「前々回」の最後のグラフと比較しても、正解データにかなり近づく方向に改善されていることがわかるでしょう。なお、有音だけに限定して話者ダイアリゼーションをしようとすると、低次元だけでなく高次元のMFCCも使う必要があるようです。実際に、低次元が強く出てしまう正規化前だと精度が少し悪くなります。
全体的に、これまで数万次元のフーリエ変換データで分析していたのに対して、12次元のMFCCで同等以上の結果が得られました。特徴量としてのMFCCが、非常に優秀であることがわかります。おかげで、処理時間がかなり短縮されました。
ただし、現状のコードだとクラス分類数の自動算出ができません。また、これ以上の多クラス分類を試みようとすると、会話時間が長い人について間違ったクラス分割が行われる傾向があります。そのあたりを、今後改善していきたいと思います。単純にコサイン類似度を使ったくらいでは改善せず、MFCCの限界かもしれません。MFCCは、無音、人間の声、楽器など、大きく音色が異なるものを区別するにはよいですが、人間の声同士を区別するのは、ある程度は可能ですがそれほど得意では無い、という感じです。
以下が今回のコードです。コサイン類似度版の変更箇所もコメントで埋め込んでいます。コサイン類似度の効果は感じられないため、封印しています。
mainの呼び出しサンプルを実行すると、今回の記事のグラフを順番に表示するようになっています。このように再利用しやすいコードになっています。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import librosa
import soundfile
from sklearn import preprocessing, cluster
# 全てのメソッドは呼び出しオブジェクトを変更しない非破壊型
# 返り値がSpeech型であるため、メソッドチェーンで呼び出し可能
class Speech(pd.DataFrame):
@property
def mfcc(self): # Mel-Frequency Cepstrum Coefficients
return self.loc[:, 'mfcc_1':'mfcc_12'].copy()
@mfcc.setter
def mfcc(self, v):
self.loc[:, 'mfcc_1':'mfcc_12'] = v
@property
def window(self):
return self.time.iloc[1] - self.time.iloc[0]
@property
def duration(self):
return self.time.iloc[-1] + self.window
def copy(self):
return Speech(pd.DataFrame(self).copy())
# wavファイルを読み込み、window秒間隔でmfcc化
@classmethod
def read(cls, filename, *, window=1):
# 入力はwavファイルであること(mp3等は入力前にツールで変換する)
# Pythonでmp3を直接扱いにはpydubとffmpegが必要
# librosaのloadはフレームレート変換が低速であるため使わない
x, fs = soundfile.read(filename, always_2d=True)
x = x.mean(axis=1) # 2CHの平均をとってモノラル化する
df = cls.convert_mfcc(x, fs, window)
return Speech(df)
# 音声rawデータをもとに、window秒間隔でmfcc化
@classmethod
def convert_mfcc(cls, x, fs, window=1):
n_fft = int(window * fs)
mfcc = librosa.feature.mfcc(x, sr=fs, n_mfcc=20, n_fft=n_fft,
hop_length=n_fft, center=False)
mfcc = mfcc[1:13] # 1〜12次元のみ抽出
t = np.arange(0, window * mfcc.shape[1], window)
assert len(t) == mfcc.shape[1]
df = pd.DataFrame(np.concatenate([[t], mfcc]).T)
df = df.add_prefix('mfcc_').rename(columns={'mfcc_0': 'time'})
df['label'] = 1
return Speech(df)
# mfccデータを正規化、正規化手段はオプション選択可能
def normalized(self, scaler=preprocessing.MinMaxScaler()):
res = self.copy()
res.mfcc = scaler.fit_transform(res.mfcc)
return res
# 有音無音のクラスタリング
def vad(self): # Voice Activity Detector
res = self.copy()
clustering = cluster.KMeans(n_clusters=2).fit(res.mfcc)
res['label'] = clustering.labels_
# mfccの低周波数側の大きさを比較して、無音側を判断し、クラスタ0にする
df_mean = res.groupby('label').mean()
sum_mean0, sum_mean1 = 0, 0
criteria = 1.1
for coefficient in res.mfcc.columns:
sum_mean0 += df_mean.loc[0, coefficient]
sum_mean1 += df_mean.loc[1, coefficient]
if sum_mean0 > sum_mean1 * criteria:
res.label = 1 - res.label
break
elif sum_mean1 > sum_mean0 * criteria:
break
return res
# ダイアリゼーション: window秒の幅のmfccデータを使って話者分離する
# 有音部がactive_rate以上の区間のみ対象
def diarized(self, n_clusters=7, window=5, active_rate=0.5):
res = self.copy()
# window幅で縦に平均値を取る
# 無音部分はいったん0として、後で割り戻すことで平均計算から外す
res['window'] = (res.index * res.window / window).astype(int)
res0 = res.copy()
res0.mfcc = res0.mfcc.mask(res0.label == 0, 0)
res_win = Speech(res0.groupby('window').mean())
res_win.time = res_win.time - res_win.time.loc[0]
# active_rateよりも有音比率が大きい部分だけ抽出
res_active = Speech(res_win[res_win.label >= active_rate])
res_active.mfcc = res_active.mfcc.div(res_active.label, axis=0)
# クラスタリング
res_active.label = cluster.SpectralClustering(n_clusters=n_clusters,
affinity='nearest_neighbors').fit_predict(res_active.mfcc) + 1
# クラスタリング(コサイン類似度版)
#res_active.label = cluster.SpectralClustering(n_clusters=n_clusters,
# affinity='precomputed').fit_predict(res_active.cos_sim()) + 1
# 得られた結果と無音部分をマージ
res_win.label = res_active.label
res_win.label = res_win.label.fillna(0).astype(int)
# 前後が同じクラスタで1フレームだけ別なら、補完して同じものとみなす
for i in range(1, len(res_win) - 1):
if (res_win.loc[i - 1, 'label'] == res_win.loc[i + 1, 'label']):
res_win.loc[i, 'label'] = res_win.loc[i - 1, 'label']
# 得られた結果をwindow集約前のデータにマージ
res = res.drop('label', axis=1).merge(res_win.reset_index()
[['label', 'window']]).drop('window', axis=1)
return Speech(res)
# mfccデータに対するコサイン類似度行列を計算
def cos_sim(self):
mfcc = self.copy().mfcc.to_numpy()
d = mfcc @ mfcc.T
norm = (mfcc * mfcc).sum(axis=1, keepdims=True) ** .5
return d / norm / norm.T
# グラフ表示、chart='hoge' または chart=['hoge', 'poi', ...] で形式指定
def show(self, *, width=300, chart=[]):
res = self.copy()
if type(chart) is str:
chart = [chart]
# グラフ描画を複数行にするための幅と行数を求める
plt_num = int(res.duration / width - 0.01) + 1
# 描画エリアの準備
fig, ax = plt.subplots(plt_num)
if plt_num == 1:
ax = (ax,) # グラフ1つの時はaxがスカラーなので調整
for i in range(plt_num):
# 描画範囲のデータフレームを取得(描画エリア内部+前後の行)
head = res[res.time <= width * i]
body = res[(width * i < res.time) &
(res.time < width * (i + 1))]
tail = res[width * (i + 1) <= res.time]
df = pd.concat([head.tail(1), body, tail.head(1)]).copy()
df_sep = pd.concat([df[df.label.diff() != 0], df.tail(1)])
# データを描画 C.shapeよりもlen(X),len(Y)は1つずつ大きくする
if 'mfcc' in chart:
X = df.time - width * i
Y = np.arange(13)
C = Speech(df[:-1]).mfcc.T
ax[i].pcolormesh(X, Y, C, shading='flat', cmap='jet')
# クラスタを描画
if 'label' in chart:
X = df_sep.time - width * i
Y = np.arange(2)
C = df_sep[:-1].label.to_numpy().reshape(1, -1)
vmin, vmax = 0, res.label.max()
ax[i].pcolormesh(X, Y, C, shading='flat', cmap='rainbow',
vmin=vmin, vmax=vmax)
# クラスタ境界を描画
if 'separator' in chart:
x = df_sep.time - width * i
ymax = ax[i].get_ylim()[1]
ax[i].vlines(x, 0, ymax, colors='black')
# 描画領域の整形
ax[i].set_xlim(0, width)
ax[i].tick_params(labelleft=False)
ax[i].set_ylabel(f'{width * i // 60}') # 分で表示
if i == 0: # 最初のグラフだけ表題をつけて、全体の表題に見せる
ax[i].set_title('Meeting spectrogram')
ax[i].set_ylabel(f'{width * i // 60}(m)')
if i == plt_num - 1: # 最後のグラフだけx軸ラベルをつける
ax[i].set_xlabel('Time(s)')
else:
ax[i].tick_params(labelbottom=False)
plt.show()
# 呼び出しサンプル
if __name__ == '__main__':
df = Speech.read('meetings/meeting1.wav')
df.show(chart='mfcc')
df = df.vad()
df.show(chart='label')
df = df.normalized()
df.show(chart='mfcc')
df = df.diarized(n_clusters=5)
df.show(chart=['label', 'separator'])