「インターホンの音を検知してLINE通知することで、どこにいてもイヤホンをしていても来客に気づきたい」と考えて作成したものをまとめています。
前回「(3)検知基準を検討する」では、インターホンの音をどのように検知すればいいかについて検討しました。
最終回として、実際に検知できるかどうかの確認と、検知したらLINE通知するところまでをまとめます。
LINEの通知が簡単に実装できて驚きました。
装置:Raspberry Pi 4
マイク:共立プロダクツ MI-305 [USBマイク]
プログラム言語:Python3 (PyAudioモジュールを使用。 PyAudio Documentation)
2022/03/10: 最後尾に最新版を載せました。
インターホンを検知する
前回までの内容をまとめると下記のプログラムになる。
import pyaudio
import wave
import numpy as np
import time
from Pysound_Error_hider import noalsaerr # 第2回参照。ALSA Errorが出ない場合は不要。
import matplotlib.pyplot as plt
Check_every_time = True # 検知したときにFFTプロット。実際に運用するときはFalse。
RECORD_SECONDS = 1 # 第3回で使用した値に合わせる
threshold = 1.5e7 # 要調整
freq_indices = [ 610, 611, 612, 613, 615, 616, 1831, 1832, 1833, 1834, 1835, \
1836, 3056, 3057, 3058, 3059, 4277, 4278, 4280, 4281, 4282, 4283, \
4285 ] # 第3回で決めた値を入れる
input_device_index = 2 # 第1回で調べた値
CHUNK = 1024 * 8 # 第1~3回で使用した値に合わせる
FORMAT = pyaudio.paInt16
CHANNELS = 1 # モノラル入力
RATE = 44100 # 第1~3回で使用した値に合わせる
rng = int(RATE / CHUNK * RECORD_SECONDS)
def setup():
with noalsaerr(): # 第2回参照。ALSA Errorが出ない場合はwith文は不要。
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
input_device_index = input_device_index,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
return p, stream
def collect_data(stream, rng, CHUNK):
frames = []
for i in range(rng):
data = stream.read(CHUNK, exception_on_overflow=False) # 第2回参照
frames.append(data)
d = np.frombuffer(b''.join(frames), dtype='int16')
return d
def calc_FFTamp(frames, freq_indices):
fft_data = np.abs(np.fft.fft(frames))
amp = 0
for i in freq_indices:
amp += fft_data[i]
return amp
def check_plot(d):
fft_data = np.abs(np.fft.fft(d)) #FFTした信号の強度
freqList = np.fft.fftfreq(d.shape[0], d=1.0/RATE) #周波数(グラフの横軸)の取得
plt.plot(freqList, fft_data)
plt.xlim(0, 5000) #0~5000Hzまでとりあえず表示する
plt.show()
if __name__ == '__main__':
p, stream = setup()
print("Watching...")
try:
while True:
d = collect_data(stream, rng, CHUNK)
amp = calc_FFTamp(d, freq_indices)
if amp > threshold:
print("Someone is at the door. (amp = {:.2e}/{:.1e})".format(amp,threshold))
if Check_every_time:
check_plot(d)
time.sleep(5)
print("Keep watching...")
except KeyboardInterrupt:
print('You terminated the program.\nThe program ends.')
stream.stop_stream()
stream.close()
p.terminate()
録音、FFT、インターホン音の周波数における強度の足し合わせを常に行い、
thresholdよりも大きければインターホンが鳴ったと判断する。
検出した強度と、thresholdの値がprintされる。
Check_every_timeがTrueだと、検知した際にFFTをプロットする。
(プロット画像を閉じるまで次に進まないので、運用するときはFalseにする。Trueにするのは、検知・誤検知の様子が見たいとき。)
検知後5秒間スリープして、再開。
という内容です。
thresholdは、誤検知ない程度に大きく、検知漏れが無い程度に小さくする。実際に動かし、printされた値を見ながら調整。
プログラム開始時、第2回で対応しきれなかった警告が出るが、最初だけで、繰り返している間は出ない。
Input Overflowが出ないように、collect_dataの中のexception_on_overflow=Falseを設定。
実際に動かしてインターホンを鳴らすと、ちゃんと検知できた。その時にプロットされた図は下記。
658 Hzと、その3, 5, 7倍の周波数を狙っているが、こう見ると658 Hzだけで良いかもしれない。
また、ラズパイ本体をガチャガチャ動かすと誤検知が起きる。その時の図は下記。
インターホンの音よりも乱雑で、想定通り。誤検知が起こるのも分かる。
低周波側の方が信号強いので、検出が658 Hzだけだと不安かも?
ちなみに、上の図と同じ位置にピークがあるように見えるが、ちゃんとズレている。
誤検知はあるものの、それが起きるのは限定的なので、うまく設定できていそう。
LINE通知の準備
参考:ラズパイで気温と湿度を測定、LINEで通知を受け取る ~後編~
LINEへの通知には、まずアクセストークンを発行する。
「LINE Notify」 (LINEアカウントへのログインが必要なので、念のためWeb検索からのアクセスが良いかも)
ログイン後、右上の自分のアカウント名をクリック→マイページ→アクセストークンの発行(開発者向け)で発行できる。
(「サービス提供者様へ」のところではない。)
予め、どのトークルームを使用するか指定する必要がある。
トークン発行時、LINE Notifyアカウントから、指定のトークルームにLINE Notifyを招待するように案内が来るので、従う。
発行されたトークンをコピーして、下記を実行してみる。
import requests
url = "https://notify-api.line.me/api/notify"
token = "(トークンをここにコピー)"
headers = {"Authorization" : "Bearer "+ token}
message = "Hello from Raspberry Pi"
payload = {"message" : message}
r = requests.post(url, headers = headers, params=payload)
簡単でした。通知までのラグもほとんど無さそうです。
インターホンを検知してLINEに通知する
というわけで、LINE通知を組み込んだ下記が、完成したプログラム。
import pyaudio
import wave
import numpy as np
import time
from Pysound_Error_hider import noalsaerr # 第2回参照。ALSA Errorが出ない場合は不要。
import matplotlib.pyplot as plt
import requests
Check_every_time = False # 検知したときにFFTプロット。実際に運用するときはFalse。
LINE_token = "(トークンをここにコピー)"
RECORD_SECONDS = 1 # 第3回で使用した値に合わせる
threshold = 1.5e7 # 要調整
freq_indices = [ 610, 611, 612, 613, 615, 616, 1831, 1832, 1833, 1834, 1835, \
1836, 3056, 3057, 3058, 3059, 4277, 4278, 4280, 4281, 4282, 4283, \
4285 ] # 第3回で決めた値を入れる
input_device_index = 2 # 第1回で調べた値
CHUNK = 1024 * 8 # 第1~3回で使用した値に合わせる
FORMAT = pyaudio.paInt16
CHANNELS = 1 # モノラル入力
RATE = 44100 # 第1~3回で使用した値に合わせる
rng = int(RATE / CHUNK * RECORD_SECONDS)
def setup():
with noalsaerr(): # 第2回参照。ALSA Errorが出ない場合はwith文は不要。
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
input_device_index = input_device_index,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
return p, stream
def collect_data(stream, rng, CHUNK):
frames = []
for i in range(rng):
data = stream.read(CHUNK, exception_on_overflow=False) # 第2回参照
frames.append(data)
d = np.frombuffer(b''.join(frames), dtype='int16')
return d
def calc_FFTamp(frames, freq_indices):
fft_data = np.abs(np.fft.fft(frames))
amp = 0
for i in freq_indices:
amp += fft_data[i]
return amp
def check_plot(d):
fft_data = np.abs(np.fft.fft(d)) #FFTした信号の強度
freqList = np.fft.fftfreq(d.shape[0], d=1.0/RATE) #周波数(グラフの横軸)の取得
plt.plot(freqList, fft_data)
plt.xlim(0, 5000) #0~5000Hzまでとりあえず表示する
plt.show()
def send_LINE(token, amp, threshold):
url = "https://notify-api.line.me/api/notify"
token = token
headers = {"Authorization" : "Bearer "+ token}
message = "インターホンが鳴っているかも(検出強度{:.2e}/{:.1e})".format(amp,threshold)
payload = {"message" : message}
r = requests.post(url, headers = headers, params=payload)
if __name__ == '__main__':
p, stream = setup()
print("Watching...")
try:
while True:
d = collect_data(stream, rng, CHUNK)
amp = calc_FFTamp(d, freq_indices)
if amp > threshold:
print("Someone is at the door. (amp = {:.2e}/{:.1e})".format(amp,threshold))
send_LINE(LINE_token, amp, threshold)
if Check_every_time:
check_plot(d)
time.sleep(5)
print("Keep watching...")
except KeyboardInterrupt:
print('You terminated the program.\nThe program ends.')
stream.stop_stream()
stream.close()
p.terminate()
print文と同様に、検知時にampの値とthresholdの値を送るようにした。
thresholdの調整の参考に。
まとめ
第1-4回を通して、Raspberry Piを使って録音する方法や、インターホン音の解析、検知してLINEに通知するところまでをまとめました。
現在1週間程度動かしていますが、自分が認識している範囲では全部検知できており、不在時にも何度か通知が来ました(そして、配達の不在票が入っていました)。上述のようにラズベリーパイ本体をガチャガチャすると誤検知がありますが、会話、テレビ、ゲームなどで誤検知したことは無く、快適に運用できています。thresholdの値はラフに設定するだけで良さそうです。
今後も、日常の中の不便に気づき、解決していけたらなあと思います。
その他の記事:
Raspberry Piでインターホンの音を検知してLINEに通知する (1)インターホンの音を録音する
Raspberry Piでインターホンの音を検知してLINEに通知する (2)PyAudio録音時の警告・エラーに対処する
Raspberry Piでインターホンの音を検知してLINEに通知する (3)検知基準を検討する
改善のためのアイデア
とりあえず形にはなっていると思いますが、まだまだ改善の余地があるのは間違いないですね。
現在気になっていること、できれば良いなあと思っていることをまとめておきます。
- 検出精度
もちろん検出精度は限りなく上げたい。
指定の周波数の強度だけ見ているが、他の周波数との比で見るとかはどうか?←2022/03/10実装 - 常時FFTで良いか?
FFTにはそこそこの計算コストがかかっていると思う。消費電力や、ラズパイのオーバーヒートが気になる。特に夏場、稼働し続けられるか心配。一定の音量が聞こえた際にFFTするようにしようかな。 - 時報みたいなもの
Raspberry Piがちゃんと稼働しているか心配なので、毎日決まった時間に「動いているよ~」とLINEに通知するとか。最終的には煩わしいだけ? - 時間制限
例えば深夜は通知しないようにする、深夜の通知は朝にまとめて送る、など。そもそも深夜に来るのは重要な案件かもしれないが。 - カメラと連動
インターホンを検知したら、誰が来たかを撮影してLINEに送ることもできそう。インターホンに画面があれば良いが、玄関にカメラを別途設置するのもあり?共用玄関だと難しいか。
参考
ラズパイで気温と湿度を測定、LINEで通知を受け取る ~後編~
最新版
指定の周波数での強度(amp)について、
「絶対値が大きい」&「他の周波数での強度(amp2)と比べて大きい」
で検出すると誤検知が激減したので最新版を載せておく。
freq_indices2で、各ピークの2倍の周波数を指定している。
ピークが1f, 3f, 5f, ...だったので、2f, 6f, 10f, ...を見ることになり、ここでの強度は出ないはず。
amp/amp2がthreshold2よりも大きいという条件を加えた。
import pyaudio
import wave
import numpy as np
import time
from Pysound_Error_hider import noalsaerr # 第2回参照。ALSA Errorが出ない場合は不要。
import matplotlib.pyplot as plt
import requests
Check_every_time = False # 検知したときにFFTプロット。実際に運用するときはFalse。
LINE_token = "(トークンをここにコピー)"
RECORD_SECONDS = 1 # 第3回で使用した値に合わせる
threshold = 1.5e7 # 要調整
threshold2 = 5
freq_indices = [ 610, 611, 612, 613, 615, 616, 1831, 1832, 1833, 1834, 1835, \
1836, 3056, 3057, 3058, 3059, 4277, 4278, 4280, 4281, 4282, 4283, \
4285 ] # 第3回で決めた値を入れる
freq_indices2 = [ f*2 for f in freq_indices ]
input_device_index = 2 # 第1回で調べた値
CHUNK = 1024 * 8 # 第1~3回で使用した値に合わせる
FORMAT = pyaudio.paInt16
CHANNELS = 1 # モノラル入力
RATE = 44100 # 第1~3回で使用した値に合わせる
rng = int(RATE / CHUNK * RECORD_SECONDS)
def setup():
with noalsaerr(): # 第2回参照。ALSA Errorが出ない場合はwith文は不要。
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
input_device_index = input_device_index,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
)
return p, stream
def collect_data(stream, rng, CHUNK):
frames = []
for i in range(rng):
data = stream.read(CHUNK, exception_on_overflow=False) # 第2回参照
frames.append(data)
d = np.frombuffer(b''.join(frames), dtype='int16')
return d
def calc_FFTamp(frames, freq_indices, freq_indices2):
fft_data = np.abs(np.fft.fft(frames))
amp, amp2 = 0, 0
for i in freq_indices:
amp += fft_data[i]
for i in freq_indices2:
amp2 += fft_data[i]
return amp, amp2
def check_plot(d):
fft_data = np.abs(np.fft.fft(d)) #FFTした信号の強度
freqList = np.fft.fftfreq(d.shape[0], d=1.0/RATE) #周波数(グラフの横軸)の取得
plt.plot(freqList, fft_data)
plt.xlim(0, 5000) #0~5000Hzまでとりあえず表示する
plt.show()
def send_LINE(token, amp, amp2, threshold, threshold2):
url = "https://notify-api.line.me/api/notify"
token = token
headers = {"Authorization" : "Bearer "+ token}
message = "が鳴ってるよ\n強度 {:.2e} --- 基準 {:.1e}\n比率 {:.2e} --- 基準 {:.1e}".format(amp,threshold,amp/amp2,threshold2)
payload = {"message" : message}
r = requests.post(url, headers = headers, params=payload)
if __name__ == '__main__':
p, stream = setup()
print("Watching...")
try:
while True:
d = collect_data(stream, rng, CHUNK)
amp, amp2 = calc_FFTamp(d, freq_indices, freq_indices2)
if (amp > threshold)&(amp/amp2 > threshold2):
print("Someone is at the door.")
send_LINE(LINE_token, amp, amp2, threshold, threshold2)
if Check_every_time:
check_plot(d)
time.sleep(5)
print("Keep watching...")
except KeyboardInterrupt:
print('You terminated the program.\nThe program ends.')
stream.stop_stream()
stream.close()
p.terminate()