機械学習と特徴量
機械学習の実装にはPythonが使われることが非常に多いと思います。
その際、学習に用いる特徴量の抽出も Python 向けの便利なライブラリに大きく依存しがちです。
ここで問題になるのが、学習後に ONNX などでモデルをエクスポートし、別の環境で推論を行いたい場合です。推論時にモデルへ入力する特徴量は、学習時と同じ結果になっている必要があります。しかし特徴量の内容によっては、「同じ特徴量を抽出するつもり」で他言語のライブラリを使っても、計算の細かな違いによって値が再現できないことがあります。
私自身も実務の中でこの再現性に悩まされたことがありますが、最近ではAIコーディングツールの発展により、この特徴量処理を他言語で再現するための工数をかなり削減しやすくなってきました。
本記事では、その一例としてPythonのlibrosaとMFCCを題材に、特徴量計算を他環境で再現した実装を紹介します。
librosaとMFCC
Pythonの音響解析および信号処理のライブラリとしてlibrosaがあります。
https://librosa.org/
今回は音声の機械学習でよく使われるMFCC(メル周波数ケプストラム係数)の計算関数に着目してみます。
MFCCとは端的にいえば 「人の耳の聞こえ方に近い形で音の特徴を数値化したもの」 であり、音声認識などの分野でよく使われます。
https://librosa.org/doc/latest/generated/librosa.feature.mfcc.html#librosa.feature.mfcc
https://librosa.org/doc/latest/_modules/librosa/feature/spectral.html#mfcc
最も単純な利用方法は下記のようになります。
import librosa
y, sr_load = librosa.load('./hoge.wav', sr=16000)
mfccs = librosa.feature.mfcc(y, sr=sr_load)
librosa自体は優れたライブラリですが、その仕様に強く依存した特徴量計算を行うと、他言語への移植時に数値再現で苦労することがあります。
そこで今回はPython⇔C#の互換性があるMFCC計算を目指したいと思います。
MFCCの計算
まずはMFCCの計算方法についてみていきます。
大きくわけると下記の5段階から成ります。
- STFT(短時間フーリエ変換)
- パワースペクトログラム化
- メルフィルタバンク適用
- 対数圧縮(dB化)
- DCT(離散コサイン変換)→ MFCC
librosa版では内部的にはlibrosa.feature.melspectrogramを呼んでいます(事前に計算して、MFCC関数に渡すことも可能です)。
これらのステップに沿って実装していきます。
備考
本記事のPython実装およびC#実装は、librosa.feature.mfccの挙動を参考にしています。
またこの記事では分かりやすさ優先でFFT実装は素朴なものを使っています。
今回のような課題において、AIコーディングの相性がよく、強力な支援手段となります。実際に本記事の一部はAIコーディングによるものです。最終的な設計と検証は筆者が行っています。
実装
まずは元のlibrosaのMFCCと計算結果を可能な限り一致させることを狙ったPythonのコードを用意します。
これは必ずしも必要でないかもしれませんが、各処理の対応を確認しやすくするために実施します。
また今回は省略しますが、前述の各ステップ毎の再現確認を行う上でもわかりやすくて便利です。
import numpy as np
from librosa.util import frame
import scipy.fftpack
_HTK_MEL_FILTERS = {}
_HTK_DCT_MATRICES = {}
def _get_htk_mel_filterbank(sr, n_fft, n_mels):
key = f"{sr}_{n_fft}_{n_mels}"
if key not in _HTK_MEL_FILTERS:
base_freq = 2595.0
def hz_to_mel(freq):
return base_freq * np.log10(1.0 + freq / 700.0)
def mel_to_hz(mel):
return 700.0 * (10.0**(mel / base_freq) - 1.0)
f_max = sr / 2.0
mel_max = hz_to_mel(f_max)
mel_points = np.linspace(0, mel_max, n_mels + 2)
fft_freqs = np.fft.rfftfreq(n_fft, d=1.0/sr)
mel_f = mel_to_hz(mel_points)
fdiff = np.diff(mel_f)
ramps = np.subtract.outer(mel_f, fft_freqs)
filters = np.zeros((n_mels, len(fft_freqs)))
for i in range(n_mels):
lower = -ramps[i] / fdiff[i]
upper = ramps[i+2] / fdiff[i+1]
filters[i] = np.maximum(0, np.minimum(lower, upper))
enorm = 2.0 / (mel_f[2:n_mels+2] - mel_f[:n_mels])
filters *= enorm[:, np.newaxis]
_HTK_MEL_FILTERS[key] = filters
return _HTK_MEL_FILTERS[key]
def _get_htk_dct_matrix(n_mfcc, n_mels):
key = f"{n_mfcc}_{n_mels}"
if key not in _HTK_DCT_MATRICES:
full_dct_matrix = scipy.fftpack.dct(np.eye(n_mels), norm='ortho', axis=0)
_HTK_DCT_MATRICES[key] = full_dct_matrix[:n_mfcc]
return _HTK_DCT_MATRICES[key]
def mimic_mfcc(audio, sample_rate, n_mfcc=13, hl=0.01, fl=0.025,
n_mels=80):
y = audio.astype(np.float32)
sr = sample_rate
n_fft = int(fl * sr)
hop_length = int(hl * sr)
# 1. STFT (Short-Time Fourier Transform)
y_padded = np.pad(y, int(n_fft // 2), mode='reflect')
frames = frame(y_padded, frame_length=n_fft, hop_length=hop_length)
window = np.hanning(n_fft)
stft = np.fft.rfft(frames * window[:, np.newaxis], axis=0)
# 2. power spectrum
power_spectrum = np.abs(stft)**2
# 3. mel spectrogram - get filterbank from cache
filters = _get_htk_mel_filterbank(sr, n_fft, n_mels)
mel_spectrogram = np.dot(filters, power_spectrum)
# 4. power_to_db
ref = 1.0
amin = 1e-10
top_db = 80.0
log_spec = 10.0 * np.log10(np.maximum(amin, mel_spectrogram))
log_spec -= 10.0 * np.log10(np.maximum(amin, ref))
log_spec = np.maximum(log_spec, log_spec.max() - top_db)
# 5. DCT (discrete cosine transform) - get DCT matrix from cache
dct_matrix = _get_htk_dct_matrix(n_mfcc, n_mels)
mfccs = np.dot(dct_matrix, log_spec)
return mfccs
再現性確認(librosa vs. 再現実装)
では、librosaの関数と計算結果を比較していきます。
比較対象は下記です。
mfcc_librosa = librosa.feature.mfcc(
y=audio, sr=16000, n_mfcc=13, n_fft=400, hop_length=160, n_mels=80, htk=True
)
結果は下記の通りです。
相関はnp.corrcoefで算出しています。
5秒間の2本のサイン波を少しずつシフトさせながら足し合わせたテスト用信号100回分の試行に基づきます。
Mimic Mean: -32.97
Librosa Mean: -33.03
Correlation: 0.998776
完全一致ではありませんが、十分再現できていると思います。
C#実装
ここからは上記をベースにC#で実装を起こしていきます。
基本的にはNumpyとlibrosaのframe関数の再現を試みることになります。
public class MfccExtractor
{
// n_fft = int(fl * sr) - 400
private readonly int n_fft = 400;
// hop_length = int(hl * sr) - 160
private readonly int hop_length = 160;
// n_mels
private readonly int num_mel_filters = 80;
// n_mfcc
private readonly int n_mfcc = 13;
// sample_rate
private readonly int sample_rate = 16000;
private float[] hann_window;
private SparseFilter[] sparse_mel_filters;
private float[,] dct_matrix;
public MfccExtractor()
{
// 0. Precompute window, mel filterbank, DCT matrix
ApplyHanning(); // np.hanning(n_fft)
ComputeMelFilterbank(); // _get_htk_mel_filterbank(sr, n_fft, n_mels)
ComputeDCTMatrix(); // _get_htk_dct_matrix(n_mfcc, n_mels)
}
public float[,] ComputeMfcc(float[] audio)
{
// 1. STFT part: reflect-padding + framing + Hann window + rFFT
int padWidth = n_fft / 2;
float[] padded = PadReflect(audio, padWidth);
float[,] frames = FrameAudio(padded, n_fft, hop_length);
// 2. Power spectrum
float[,] powerSpectrum = ComputePowerSpectrum(frames);
// 3. Mel filterbank
float[,] melSpectrogram = ApplyMelFilters(powerSpectrum);
// 4. Power to dB
float[,] logMel = PowerToDb(melSpectrogram);
// 5. DCT -> MFCC
float[,] mfccs = ApplyDCT(logMel);
return mfccs;
}
// np.hanning(n_fft)
public void ApplyHanning()
{
hann_window = new float[n_fft];
// w[n] = 0.5 * (1 - cos(2*pi*n / (N-1)))
for (int i = 0; i < n_fft; i++)
{
hann_window[i] = 0.5f * (1.0f - (float)Math.Cos(2.0 * Math.PI * i / (n_fft - 1)));
}
}
public struct SparseFilter
{
public int count;
public int[] indices;
public float[] values;
}
// _get_htk_mel_filterbank(sr, n_fft, n_mels)
private void ComputeMelFilterbank()
{
// HTK-style mel filterbank
int fft_bins = n_fft / 2 + 1;
sparse_mel_filters = new SparseFilter[num_mel_filters];
float base_freq = 2595.0f;
// f_max = sr / 2.0
float f_max = sample_rate / 2.0f;
// mel_max = hz_to_mel(f_max)
float mel_max = base_freq * (float)Math.Log10(1.0 + f_max / 700.0f);
// mel_points = np.linspace(0, mel_max, n_mels + 2)
float[] mel_points = new float[num_mel_filters + 2];
for (int i = 0; i < num_mel_filters + 2; i++)
{
mel_points[i] = mel_max * i / (num_mel_filters + 1);
}
// mel_f = mel_to_hz(mel_points)
float[] mel_f = new float[num_mel_filters + 2];
for (int i = 0; i < num_mel_filters + 2; i++)
{
mel_f[i] = 700.0f * ((float)Math.Pow(10.0, mel_points[i] / base_freq) - 1.0f);
}
// fft_freqs = np.fft.rfftfreq(n_fft, d=1.0/sr)
float[] fft_freqs = new float[fft_bins];
for (int i = 0; i < fft_bins; i++)
{
fft_freqs[i] = i * sample_rate / (float)n_fft;
}
// fdiff = np.diff(mel_f)
float[] fdiff = new float[num_mel_filters + 1];
for (int i = 0; i < num_mel_filters + 1; i++)
{
fdiff[i] = mel_f[i + 1] - mel_f[i];
}
for (int mel = 0; mel < num_mel_filters; mel++)
{
float[] lower = new float[fft_bins];
float[] upper = new float[fft_bins];
for (int j = 0; j < fft_bins; j++)
{
lower[j] = -(mel_f[mel] - fft_freqs[j]) / fdiff[mel];
upper[j] = (mel_f[mel + 2] - fft_freqs[j]) / fdiff[mel + 1];
}
List<int> indices = new List<int>();
List<float> values = new List<float>();
// enorm = 2.0 / (mel_f[2:n_mels+2] - mel_f[:n_mels])
// filters *= enorm[:, np.newaxis]
float enorm = 2.0f / (mel_f[mel + 2] - mel_f[mel]);
for (int j = 0; j < fft_bins; j++)
{
float weight = Math.Max(0, Math.Min(lower[j], upper[j]));
weight *= enorm;
if (weight > 0)
{
indices.Add(j);
values.Add(weight);
}
}
sparse_mel_filters[mel].count = indices.Count;
sparse_mel_filters[mel].indices = indices.ToArray();
sparse_mel_filters[mel].values = values.ToArray();
}
}
// dct_matrix = scipy.fftpack.dct(np.eye(n_mels), norm='ortho', axis=0)[:n_mfcc]
private void ComputeDCTMatrix()
{
dct_matrix = new float[n_mfcc, num_mel_filters];
for (int k = 0; k < n_mfcc; k++)
{
for (int n = 0; n < num_mel_filters; n++)
{
// Base DCT-II formula
dct_matrix[k, n] = (float)Math.Cos(Math.PI * k * (n + 0.5) / num_mel_filters);
// Orthonormal scaling (norm='ortho')
if (k == 0)
dct_matrix[k, n] *= (float)Math.Sqrt(1.0 / num_mel_filters);
else
dct_matrix[k, n] *= (float)Math.Sqrt(2.0 / num_mel_filters);
}
}
}
// np.pad(y, pad_width, mode='reflect')
private float[] PadReflect(float[] signal, int padWidth)
{
int newLength = signal.Length + 2 * padWidth;
float[] padded = new float[newLength];
Array.Copy(signal, 0, padded, padWidth, signal.Length);
// Left reflect
for (int i = 0; i < padWidth; i++)
{
padded[padWidth - 1 - i] = signal[i + 1];
}
// Right reflect
for (int i = 0; i < padWidth; i++)
{
padded[padWidth + signal.Length + i] = signal[signal.Length - 2 - i];
}
return padded;
}
// librosa.util.frame(y_padded, frame_length=n_fft, hop_length=hop_length)
private float[,] FrameAudio(float[] audio, int frameLength, int hopLength)
{
int numFrames = 1 + (audio.Length - frameLength) / hopLength;
if (numFrames < 1) numFrames = 1;
float[,] frames = new float[frameLength, numFrames];
for (int frame = 0; frame < numFrames; frame++)
{
int start = frame * hopLength;
for (int i = 0; i < frameLength; i++)
{
if (start + i < audio.Length)
frames[i, frame] = audio[start + i];
else
frames[i, frame] = 0f;
}
}
return frames;
}
// window = np.hanning(n_fft)
// stft = np.fft.rfft(frames * window[:, np.newaxis], axis=0)
// power_spectrum = np.abs(stft)**2
private float[,] ComputePowerSpectrum(float[,] frames)
{
int frameLength = frames.GetLength(0);
int numFrames = frames.GetLength(1);
int fftSize = frameLength / 2 + 1;
float[,] powerSpectrum = new float[fftSize, numFrames];
for (int frame = 0; frame < numFrames; frame++)
{
float[] windowedFrame = new float[frameLength];
// frames * hann_window
for (int i = 0; i < frameLength; i++)
{
windowedFrame[i] = frames[i, frame] * hann_window[i];
}
float[] power = ComputeDFT(windowedFrame);
for (int i = 0; i < fftSize; i++)
{
powerSpectrum[i, frame] = power[i];
}
}
return powerSpectrum;
}
// np.fft.rfft(windowedFrame) , np.abs(...)**2
private float[] ComputeDFT(float[] signal)
{
int n = signal.Length;
int resultSize = n / 2 + 1;
float[] result = new float[resultSize];
Complex[] twiddles = new Complex[n];
for (int k = 0; k < n; k++)
{
double angle = -2.0 * Math.PI * k / n;
twiddles[k] = new Complex(Math.Cos(angle), Math.Sin(angle));
}
for (int k = 0; k < resultSize; k++)
{
Complex sum = Complex.Zero;
for (int n_idx = 0; n_idx < n; n_idx++)
{
int twiddle_idx = (k * n_idx) % n;
sum += signal[n_idx] * twiddles[twiddle_idx];
}
result[k] = (float)(sum.Real * sum.Real + sum.Imaginary * sum.Imaginary);
}
return result;
}
// mel_spectrogram = np.dot(filters, power_spectrum)
private float[,] ApplyMelFilters(float[,] powerSpectrum)
{
int fftSize = powerSpectrum.GetLength(0);
int numFrames = powerSpectrum.GetLength(1);
float[,] melSpectrogram = new float[num_mel_filters, numFrames];
for (int mel = 0; mel < num_mel_filters; mel++)
{
var filter = sparse_mel_filters[mel];
for (int frame = 0; frame < numFrames; frame++)
{
float sum = 0f;
for (int i = 0; i < filter.count; i++)
{
int freqIdx = filter.indices[i];
float filterVal = filter.values[i];
sum += filterVal * powerSpectrum[freqIdx, frame];
}
melSpectrogram[mel, frame] = sum;
}
}
return melSpectrogram;
}
// power_to_db with ref=1.0, amin=1e-10, top_db=80.0
private float[,] PowerToDb(float[,] melSpectrogram)
{
int melBins = melSpectrogram.GetLength(0);
int numFrames = melSpectrogram.GetLength(1);
float[,] logMel = new float[melBins, numFrames];
const float amin = 1e-10f;
const float top_db = 80.0f;
float max_value = float.MinValue;
for (int mel = 0; mel < melBins; mel++)
{
for (int frame = 0; frame < numFrames; frame++)
{
float value = Math.Max(amin, melSpectrogram[mel, frame]);
float logValue = 10.0f * (float)Math.Log10(value);
logMel[mel, frame] = logValue;
if (logValue > max_value)
max_value = logValue;
}
}
// log_spec = np.maximum(log_spec, log_spec.max() - top_db)
float floor_value = max_value - top_db;
for (int mel = 0; mel < melBins; mel++)
{
for (int frame = 0; frame < numFrames; frame++)
{
if (logMel[mel, frame] < floor_value)
logMel[mel, frame] = floor_value;
}
}
return logMel;
}
// mfccs = np.dot(dct_matrix, log_spec)
private float[,] ApplyDCT(float[,] logMel)
{
int melBins = logMel.GetLength(0);
int numFrames = logMel.GetLength(1);
float[,] mfccs = new float[n_mfcc, numFrames];
for (int mfcc = 0; mfcc < n_mfcc; mfcc++)
{
for (int frame = 0; frame < numFrames; frame++)
{
float sum = 0f;
for (int mel = 0; mel < melBins; mel++)
{
sum += dct_matrix[mfcc, mel] * logMel[mel, frame];
}
mfccs[mfcc, frame] = sum;
}
}
return mfccs;
}
}
再現性確認(Python vs. C#)
平均、標準偏差、最小値、最大値と相関係数をみてみます。
計算対象は同じ5秒の音声ファイルです。
Python MFCC:
Mean: -7.951052
Std: 14.636144
Min: -65.279686
Max: 15.754959
C# MFCC:
Mean: -7.951052
Std: 14.636150
Min: -65.279720
Max: 15.754960
Correlation: 0.999999999999128
ほぼ一致させられていることが確認できているかと思います。
おわりに
機械学習モデルをPython環境で開発すること自体はとても自然ですが、そのままライブラリに任せきりの特徴量設計をしてしまうと、ONNX化や別言語への展開の段階で同じ特徴量が再現できないという壁にぶつかることがあります。モデルの精度だけでなく、特徴量をどこまで再現可能な形で設計するかも、実務では重要な設計パラメータの一つだと思います。
今回紹介したMFCCとlibrosaの例は、その考え方を具体的に示したものです。本記事では詳細を割愛しましたが、複数の計算プロセスが関わる場合には、プロセスごとに段階的に再現性を検証していくことが必要になります。
AIコーディングツールの登場によって、「処理内容を理解したうえで、他言語で同等の実装を書く」という作業のハードルは確実に下がってきています。本記事が、そうしたツールを活用しつつも、特徴量設計と再現性にきちんと向き合うきっかけになれば幸いです。