DOAとは
Direction of arrivalの略で、信号の到来方向のことです。
特に、音声においてはSSL(Sound Source Localization)という言葉とほぼ同義に使われることがあります。
PyRoomAccousticsとは
PyRoomAccousticsは、最近音声関係の論文などでもよく見かける、Pythonでの音声信号処理ソフトウェアパッケージです。
- 短時間フーリエ変換(ブロック + オンライン)
- ビームフォーミング
- 到着方向(DOA)の検出
- 適応フィルタリング(NLMS、RLS)
- ブラインド音源分離(AuxIVA、Trnicon、ILRMA、SparseAuxIVA、FastMNMF、FastMNMF2)
- 単一チャンネルのノイズ除去(スペクトル減算、部分空間、反復ウィーナー)
などができます。
部屋の形状やマイク、音源の配置なども設定した詳細なシミュレーションできるのが特徴です。
過去の遺物
実は過去に同様のものを作成したことがある。
当時はReSpeakerというデバイスを使い、デバイスから音声到来方向を取得していた。
今回、PyRoomAccousticsでもDOAができると知り、Webカメラ単体で同様のものを作成してみた。
使用したWebカメラ
LogicoolのC920を使用した。このマイク以外でも、複数チャンネルを持つマイク間距離のわかるデバイスであれば何でもよい
ソースコード
C920を接続して実行すると、音源方向を出力し続けます。
カメラから見て右方向が0度、正面が90度、左方向が180度です。
PyRoomAccousticsでは、複数のアルゴリズムから一つを選んでDOAに使用することができます。
今回はNormMUSICを使用しています。
num_srcの値を変更すれば、追跡する音源の数を変更できますが、2以上を設定しても精度はイマイチでした。
また、今回使えるマイクが2つのため、音声が前方から来たものなのか、後方から来たものなのかは判別できません。(とりあえず前方からくるものと仮定しています。)
#!/usr/bin/env python3
import numpy as np
import sounddevice as sd
import pyroomacoustics as pra
from scipy import signal
import traceback
import sys
class RealtimeDOA:
def __init__(
self,
mic_spacing=0.07,
sample_rate=44100,
block_duration=0.5,
nfft=512,
freq_range=[300, 3500],
num_src=1,
azimuth_grid_size=180,
algorithm='NormMUSIC',
):
self.sample_rate = sample_rate
self.block_duration = block_duration
self.block_size = int(sample_rate * block_duration)
self.nfft = nfft
self.freq_range = freq_range
self.num_src = num_src
self.algorithm = algorithm
mic_positions = np.array([
[-mic_spacing / 2, 0, 0],
[mic_spacing / 2, 0, 0],
]).T
print(f"マイク配置:")
print(f" 左マイク: [{-mic_spacing/2:.4f}, 0, 0]")
print(f" 右マイク: [{mic_spacing/2:.4f}, 0, 0]")
print(f" マイク間隔: {mic_spacing*1000:.1f}mm")
print(f"サンプリングレート: {sample_rate}Hz")
print(f"ブロックサイズ: {self.block_size}サンプル ({block_duration}秒)")
print(f"FFTサイズ: {nfft}")
print(f"周波数範囲: {freq_range[0]}-{freq_range[1]}Hz")
print(f"音源数: {num_src}")
print(f"アルゴリズム: {algorithm}")
print(f"方位角範囲: 0° ~ 180° (右→正面→左)")
print()
azimuth_angles = np.linspace(0, 180, azimuth_grid_size) * np.pi / 180
colatitude = np.pi / 2 * np.ones_like(azimuth_angles)
if algorithm not in pra.doa.algorithms:
raise ValueError(f"未対応のアルゴリズム: {algorithm}. 使用可能: {list(pra.doa.algorithms.keys())}")
doa_class = pra.doa.algorithms[algorithm]
self.doa = doa_class(
mic_positions,
sample_rate,
nfft,
num_src=num_src,
azimuth=azimuth_angles,
colatitude=colatitude,
)
self.audio_buffer = np.zeros((2, self.block_size))
self.buffer_index = 0
print("DOA初期化完了。音声入力待機中...")
print("=" * 60)
def audio_callback(self, indata, frames, time_info, status):
try:
if status:
print(f"ステータス: {status}", file=sys.stderr)
audio_chunk = indata[:, :2].T
remaining_space = self.block_size - self.buffer_index
chunk_size = min(frames, remaining_space)
self.audio_buffer[:, self.buffer_index:self.buffer_index + chunk_size] = audio_chunk[:, :chunk_size]
self.buffer_index += chunk_size
if self.buffer_index >= self.block_size:
self.process_block()
self.buffer_index = 0
if chunk_size < frames:
overflow = frames - chunk_size
self.audio_buffer[:, :overflow] = audio_chunk[:, chunk_size:]
self.buffer_index = overflow
except Exception as e:
print(f"音声コールバックでエラーが発生しました:")
traceback.print_exc()
def process_block(self):
try:
rms = np.sqrt(np.mean(self.audio_buffer ** 2))
threshold = 0.01
if rms < threshold:
return
X = np.array([
signal.stft(self.audio_buffer[ch, :], fs=self.sample_rate, nperseg=self.nfft)[2]
for ch in range(2)
])
self.doa.locate_sources(X, freq_range=self.freq_range)
if hasattr(self.doa, 'azimuth_recon') and self.doa.azimuth_recon is not None:
for i, azimuth_rad in enumerate(self.doa.azimuth_recon):
azimuth_deg = np.degrees(azimuth_rad)
if 0 <= azimuth_deg <= 180:
print(f"検出: 音源{i+1} - 方位角: {azimuth_deg:6.1f}° (RMS: {rms:.4f}) [{self.algorithm}]")
except Exception as e:
print(f"音声処理中にエラーが発生しました:")
traceback.print_exc()
def start(self, device=None):
try:
print("使用可能なオーディオデバイス:")
print(sd.query_devices())
print()
if device is not None:
print(f"指定デバイス: {device}")
else:
print("デフォルトデバイスを使用")
print()
with sd.InputStream(
device=device,
channels=2,
samplerate=self.sample_rate,
blocksize=4096,
callback=self.audio_callback,
):
print("録音開始。Ctrl+Cで停止...")
print("=" * 60)
while True:
sd.sleep(1000)
except KeyboardInterrupt:
print("\n録音停止")
except Exception as e:
print(f"録音開始中にエラーが発生しました:")
traceback.print_exc()
def main():
try:
print("=" * 60)
print("リアルタイムDOA(到来方向推定)システム")
print("Logicool C920 ステレオマイク対応")
print("=" * 60)
print()
doa_system = RealtimeDOA(
mic_spacing=0.07,
sample_rate=44100,
block_duration=0.5,
nfft=512,
freq_range=[300, 3500],
num_src=1,
azimuth_grid_size=180,
algorithm='NormMUSIC',
)
doa_system.start(device=None)
except Exception as e:
print(f"メインプログラムでエラーが発生しました:")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
感想と今後の展望
無事ウェブカメラのみでDOAできた。
しかし、もともとそういった用途のマイクではないため、マイク間の距離が短すぎるためか、精度はそこまで出なかった。
いざ作ろうと思うと、複数チャンネルを一つのデバイスで取得できるマイクアレイデバイスがなかなか見つからない。
もし、オーディオインターフェースなどを使って、複数のマイクを部屋の各所に3次元的に配置し、一つのデバイスから複数チャンネルで取得できれば、音源の位置を座標で取得できるはず。
仮にもう一つのC920を縦向きに設置し、上下方向の角度も出せれば、二次元的な音源方向が割り出せるが、同一のPCで二つ以上の音声入力デバイスへ同時に接続できるのかは未検証。
音声を聞き取った方向にロボットが振り向くなどできると、かわいいかもしれない。