音声信号処理の勉強ノート - 4. 重畳加算法を実装してみた
重畳加算法(Overlap-Add, OLA)とは?
音声信号処理において、重畳加算法(Overlap-Add, OLA)は、信号をブロック単位で処理する際に発生する境界部分の不連続性を滑らかに繋げるために使われる手法です。ピッチシフトやタイムストレッチにおいてよく使用されます。
基本的なアイデアは次の通りです:
- 入力信号を一定間隔でオーバーラップするブロックに分割する。
- 各ブロックを窓関数(Window Function)を使ってスムーズに処理する。
- 処理済みブロックを適切な位置で足し合わせて元の信号を再構築する。
これにより、リアルタイムで連続的な音声処理を実現できます。
Rustで実装する重畳加算法
リングバッファ(RingBuffer)の実装
pub struct RingBuffer<const N: usize> {
buffer: [f32; N],
read_index: usize,
write_index: usize,
}
impl<const N: usize> RingBuffer<N> {
pub fn new(delay: usize) -> Self {
const { assert!(N > 0, "RingBuffer capacity must be greater than zero") };
Self {
buffer: [0.0; N],
read_index: 0,
write_index: delay % N,
}
}
#[inline]
pub fn push(&mut self, value: f32) {
self.buffer[self.write_index] = value;
self.write_index = (self.write_index + 1) % N;
}
#[inline]
pub fn pop(&mut self) -> f32 {
let value = self.buffer[self.read_index];
self.buffer[self.read_index] = 0.0;
self.read_index = (self.read_index + 1) % N;
value
}
#[inline]
pub fn peek_from_write_pos(&self, offset: usize) -> f32 {
let idx = (self.write_index + offset) % N;
self.buffer[idx]
}
#[inline]
pub fn peek_mut(&mut self, offset: usize) -> &mut f32 {
let idx = (self.read_index + offset) % N;
&mut self.buffer[idx]
}
#[inline]
pub fn advance_write_pos(&mut self, steps: usize) {
self.write_index = (self.write_index + steps) % N;
}
}
OverlapAdd処理の実装
pub trait BlockProcessor<const N: usize> {
fn process_block(&mut self, _block: &mut [f32; N]) {
// Default implementation does nothing
}
}
pub struct OverlapAdd<P, const BLOCK_SAMPLES: usize, const INTERVAL_SAMPLES: usize>
where
P: BlockProcessor<BLOCK_SAMPLES>,
{
pub processor: P, // Processor for each block
input_ring: RingBuffer<BLOCK_SAMPLES>, // Input history
output_ring: RingBuffer<BLOCK_SAMPLES>, // Accumulated overlap-add buffers
block: [f32; BLOCK_SAMPLES], // Current block
window: [f32; BLOCK_SAMPLES], // Window for cross-fade overlap-add
interval_counter: usize,
}
impl<P, const BLOCK_SAMPLES: usize, const INTERVAL_SAMPLES: usize>
OverlapAdd<P, BLOCK_SAMPLES, INTERVAL_SAMPLES>
where
P: BlockProcessor<BLOCK_SAMPLES>,
{
pub fn new(window: [f32; BLOCK_SAMPLES], processor: P) -> Self {
Self {
input_ring: RingBuffer::new(0),
output_ring: RingBuffer::new(INTERVAL_SAMPLES - 1),
block: [0.0; BLOCK_SAMPLES],
window,
interval_counter: 0,
processor,
}
}
pub fn process(&mut self, buffer: &mut [f32]) {
let mut interval_counter = self.interval_counter;
let num_samples = buffer.len();
#[allow(clippy::needless_range_loop)]
for index in 0..num_samples {
self.input_ring.push(buffer[index]);
interval_counter += 1;
if interval_counter >= INTERVAL_SAMPLES {
interval_counter = 0;
for i in 0..BLOCK_SAMPLES {
// We can't simply pop the inputs since we need to reuse them
self.block[i] = self.input_ring.peek_from_write_pos(i) * self.window[i];
}
self.processor.process_block(&mut self.block); // The actual audio processing should happen here
for i in 0..BLOCK_SAMPLES {
*self.output_ring.peek_mut(i) += self.block[i] * self.window[i];
}
self.output_ring.advance_write_pos(INTERVAL_SAMPLES);
}
buffer[index] = self.output_ring.pop();
}
self.interval_counter = interval_counter;
}
}
いくつかの説明
- リングバッファは固定サイズの配列を循環して利用するため、処理中にメモリの割り当てを避け、リアルタイム性を損なうリスクを下げることができます。
- 入力バッファの長さがブロックサイズとは限らないため、サンプルが読み込まれるごとにカウントし、一定数に達したら一括で処理を行います。こうすることで、ブロック単位の計算を維持しながら、様々な長さの入力を扱うことができます。
- リングバッファを使った現在の実装では、最後のサンプルをブロックに取り込んだ段階でブロック処理を実行します。その結果、ブロックサイズが
N
の場合、実質的にN−1
サンプル分のディレイが発生します。 - 現在のリングバッファの実装では、処理をサンプル単位で行っていますが、実際にはスライス単位で一括処理したほうが効率的です。適切にスライス処理に移行すると、CPUのキャッシュ効率が向上し、ベクトル命令(SIMD)の特性を受けやすくなり、処理効率が向上します。
テスト
#[cfg(test)]
mod tests {
use super::*;
const N: usize = 32;
const I: usize = N / 2;
struct NoOpProcessor;
impl BlockProcessor<N> for NoOpProcessor {}
#[test]
fn test_overlap_add_perfect_reconstruction() {
let delay = N - 1;
let sine_window =
std::array::from_fn(|i| (std::f32::consts::PI * (i as f32 + 0.5) / N as f32).sin());
let mut ola = OverlapAdd::<_, N, I>::new(sine_window, NoOpProcessor);
let len = N * 4;
let mut buffer = vec![0.0; len];
for i in 0..len {
buffer[i] = ((i as f32) * 0.01).sin();
}
let buffer_ref = buffer.clone();
ola.process(&mut buffer);
let epsilon = 1e-6;
for i in 0..delay {
assert!(
buffer[i].abs() < epsilon,
"Transient at sample {}: {}",
i,
buffer[i]
);
}
for i in delay..len {
let expected = buffer_ref[i - delay];
assert!(
(buffer[i] - expected).abs() < epsilon,
"Mismatch at sample {}: got {}, expected {}",
i,
buffer[i],
expected
);
}
}
}
いくつかの説明
- テストでは正弦波形の窓関数(Sine Window)を使っていますが、これを二度適用するとハン窓(Hann Window)になります(
N/I = 2
の場合のみテストが通ります)。オーバーラップ比 (N/I) に応じて窓関数を再スケールするなどの工夫を入れると、より柔軟な再構成が可能になります。 - テストコードでは、窓関数を適用する以外の特別な処理を一切行っていないため、処理後の信号は入力と同じに見えます。しかし実際のアプリケーションでは、ブロック処理の段階でピッチシフトやフィルタリングなどの処理を行い、目的の効果を得ることになります。