5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Raspberry Piでインターホンの音を検知してLINEに通知する (4)検知してLINEに通知する

Last updated at Posted at 2022-02-26

「インターホンの音を検知してLINE通知することで、どこにいてもイヤホンをしていても来客に気づきたい」と考えて作成したものをまとめています。
前回「(3)検知基準を検討する」では、インターホンの音をどのように検知すればいいかについて検討しました。
最終回として、実際に検知できるかどうかの確認と、検知したらLINE通知するところまでをまとめます。

LINEの通知が簡単に実装できて驚きました。

装置:Raspberry Pi 4
マイク:共立プロダクツ MI-305 [USBマイク]
プログラム言語:Python3 (PyAudioモジュールを使用。 PyAudio Documentation)

2022/03/10: 最後尾に最新版を載せました。

インターホンを検知する

前回までの内容をまとめると下記のプログラムになる。

Doorbell_detector.py
import pyaudio
import wave
import numpy as np
import time
from Pysound_Error_hider import noalsaerr # 第2回参照。ALSA Errorが出ない場合は不要。
import matplotlib.pyplot as plt

Check_every_time = True # 検知したときにFFTプロット。実際に運用するときはFalse。

RECORD_SECONDS = 1 # 第3回で使用した値に合わせる
threshold = 1.5e7 # 要調整
freq_indices = [ 610,  611,  612,  613,  615,  616, 1831, 1832, 1833, 1834, 1835, \
                 1836, 3056, 3057, 3058, 3059, 4277, 4278, 4280, 4281, 4282, 4283, \
                 4285 ] # 第3回で決めた値を入れる

input_device_index = 2 # 第1回で調べた値
CHUNK = 1024 * 8 # 第1~3回で使用した値に合わせる
FORMAT = pyaudio.paInt16
CHANNELS = 1 # モノラル入力
RATE = 44100 # 第1~3回で使用した値に合わせる
rng = int(RATE / CHUNK * RECORD_SECONDS)

def setup():
    with noalsaerr(): # 第2回参照。ALSA Errorが出ない場合はwith文は不要。
        p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    input_device_index = input_device_index,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK,
                    )
    return p, stream

def collect_data(stream, rng, CHUNK):
    frames = []
    for i in range(rng):
        data = stream.read(CHUNK, exception_on_overflow=False) # 第2回参照
        frames.append(data)
    d = np.frombuffer(b''.join(frames), dtype='int16')
    return d

def calc_FFTamp(frames, freq_indices):
    fft_data = np.abs(np.fft.fft(frames))
    amp = 0
    for i in freq_indices:
        amp += fft_data[i]
    return amp
    
def check_plot(d):
    fft_data = np.abs(np.fft.fft(d))    #FFTした信号の強度
    freqList = np.fft.fftfreq(d.shape[0], d=1.0/RATE)    #周波数(グラフの横軸)の取得
    plt.plot(freqList, fft_data)
    plt.xlim(0, 5000)    #0~5000Hzまでとりあえず表示する
    plt.show()

if __name__ == '__main__':
    p, stream = setup()
    print("Watching...")
    try:
        while True:
            d = collect_data(stream, rng, CHUNK)            
            amp = calc_FFTamp(d, freq_indices)
            if amp > threshold:
                print("Someone is at the door. (amp = {:.2e}/{:.1e})".format(amp,threshold))
                if Check_every_time:
                    check_plot(d)
                time.sleep(5)
                print("Keep watching...")
    except KeyboardInterrupt:
        print('You terminated the program.\nThe program ends.')
        stream.stop_stream()
        stream.close()
        p.terminate()

録音、FFT、インターホン音の周波数における強度の足し合わせを常に行い、
thresholdよりも大きければインターホンが鳴ったと判断する。
検出した強度と、thresholdの値がprintされる。
Check_every_timeがTrueだと、検知した際にFFTをプロットする。
(プロット画像を閉じるまで次に進まないので、運用するときはFalseにする。Trueにするのは、検知・誤検知の様子が見たいとき。)
検知後5秒間スリープして、再開。
という内容です。

thresholdは、誤検知ない程度に大きく、検知漏れが無い程度に小さくする。実際に動かし、printされた値を見ながら調整。
プログラム開始時、第2回で対応しきれなかった警告が出るが、最初だけで、繰り返している間は出ない。
Input Overflowが出ないように、collect_dataの中のexception_on_overflow=Falseを設定。

実際に動かしてインターホンを鳴らすと、ちゃんと検知できた。その時にプロットされた図は下記。
658 Hzと、その3, 5, 7倍の周波数を狙っているが、こう見ると658 Hzだけで良いかもしれない。

また、ラズパイ本体をガチャガチャ動かすと誤検知が起きる。その時の図は下記。
インターホンの音よりも乱雑で、想定通り。誤検知が起こるのも分かる。
低周波側の方が信号強いので、検出が658 Hzだけだと不安かも?
ちなみに、上の図と同じ位置にピークがあるように見えるが、ちゃんとズレている。

誤検知はあるものの、それが起きるのは限定的なので、うまく設定できていそう。

LINE通知の準備

参考:ラズパイで気温と湿度を測定、LINEで通知を受け取る ~後編~
LINEへの通知には、まずアクセストークンを発行する。
LINE Notify」 (LINEアカウントへのログインが必要なので、念のためWeb検索からのアクセスが良いかも)

ログイン後、右上の自分のアカウント名をクリック→マイページ→アクセストークンの発行(開発者向け)で発行できる。
(「サービス提供者様へ」のところではない。)
予め、どのトークルームを使用するか指定する必要がある。
トークン発行時、LINE Notifyアカウントから、指定のトークルームにLINE Notifyを招待するように案内が来るので、従う。
発行されたトークンをコピーして、下記を実行してみる。

send_LINE.py
import requests
url = "https://notify-api.line.me/api/notify" 
token = "(トークンをここにコピー)"
headers = {"Authorization" : "Bearer "+ token} 
message =  "Hello from Raspberry Pi" 
payload = {"message" :  message} 
r = requests.post(url, headers = headers, params=payload) 

簡単でした。通知までのラグもほとんど無さそうです。

インターホンを検知してLINEに通知する

というわけで、LINE通知を組み込んだ下記が、完成したプログラム。

Doorbell_notifier.py
import pyaudio
import wave
import numpy as np
import time
from Pysound_Error_hider import noalsaerr # 第2回参照。ALSA Errorが出ない場合は不要。
import matplotlib.pyplot as plt
import requests

Check_every_time = False # 検知したときにFFTプロット。実際に運用するときはFalse。
LINE_token = "(トークンをここにコピー)"

RECORD_SECONDS = 1 # 第3回で使用した値に合わせる
threshold = 1.5e7 # 要調整
freq_indices = [ 610,  611,  612,  613,  615,  616, 1831, 1832, 1833, 1834, 1835, \
                 1836, 3056, 3057, 3058, 3059, 4277, 4278, 4280, 4281, 4282, 4283, \
                 4285 ] # 第3回で決めた値を入れる

input_device_index = 2 # 第1回で調べた値
CHUNK = 1024 * 8 # 第1~3回で使用した値に合わせる
FORMAT = pyaudio.paInt16
CHANNELS = 1 # モノラル入力
RATE = 44100 # 第1~3回で使用した値に合わせる
rng = int(RATE / CHUNK * RECORD_SECONDS)

def setup():
    with noalsaerr(): # 第2回参照。ALSA Errorが出ない場合はwith文は不要。
        p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    input_device_index = input_device_index,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK,
                    )
    return p, stream

def collect_data(stream, rng, CHUNK):
    frames = []
    for i in range(rng):
        data = stream.read(CHUNK, exception_on_overflow=False) # 第2回参照
        frames.append(data)
    d = np.frombuffer(b''.join(frames), dtype='int16')
    return d

def calc_FFTamp(frames, freq_indices):
    fft_data = np.abs(np.fft.fft(frames))
    amp = 0
    for i in freq_indices:
        amp += fft_data[i]
    return amp
    
def check_plot(d):
    fft_data = np.abs(np.fft.fft(d))    #FFTした信号の強度
    freqList = np.fft.fftfreq(d.shape[0], d=1.0/RATE)    #周波数(グラフの横軸)の取得
    plt.plot(freqList, fft_data)
    plt.xlim(0, 5000)    #0~5000Hzまでとりあえず表示する
    plt.show()

def send_LINE(token, amp, threshold):
    url = "https://notify-api.line.me/api/notify" 
    token = token
    headers = {"Authorization" : "Bearer "+ token} 
    message =  "インターホンが鳴っているかも(検出強度{:.2e}/{:.1e})".format(amp,threshold) 
    payload = {"message" :  message} 
    r = requests.post(url, headers = headers, params=payload)

if __name__ == '__main__':
    p, stream = setup()
    print("Watching...")
    try:
        while True:
            d = collect_data(stream, rng, CHUNK)            
            amp = calc_FFTamp(d, freq_indices)
            if amp > threshold:
                print("Someone is at the door. (amp = {:.2e}/{:.1e})".format(amp,threshold))
                send_LINE(LINE_token, amp, threshold)
                if Check_every_time:
                    check_plot(d)
                time.sleep(5)
                print("Keep watching...")
    except KeyboardInterrupt:
        print('You terminated the program.\nThe program ends.')
        stream.stop_stream()
        stream.close()
        p.terminate()

print文と同様に、検知時にampの値とthresholdの値を送るようにした。
thresholdの調整の参考に。

まとめ

第1-4回を通して、Raspberry Piを使って録音する方法や、インターホン音の解析、検知してLINEに通知するところまでをまとめました。
現在1週間程度動かしていますが、自分が認識している範囲では全部検知できており、不在時にも何度か通知が来ました(そして、配達の不在票が入っていました)。上述のようにラズベリーパイ本体をガチャガチャすると誤検知がありますが、会話、テレビ、ゲームなどで誤検知したことは無く、快適に運用できています。thresholdの値はラフに設定するだけで良さそうです。
今後も、日常の中の不便に気づき、解決していけたらなあと思います。

その他の記事:
Raspberry Piでインターホンの音を検知してLINEに通知する (1)インターホンの音を録音する
Raspberry Piでインターホンの音を検知してLINEに通知する (2)PyAudio録音時の警告・エラーに対処する
Raspberry Piでインターホンの音を検知してLINEに通知する (3)検知基準を検討する

改善のためのアイデア

とりあえず形にはなっていると思いますが、まだまだ改善の余地があるのは間違いないですね。
現在気になっていること、できれば良いなあと思っていることをまとめておきます。

  • 検出精度
    もちろん検出精度は限りなく上げたい。
    指定の周波数の強度だけ見ているが、他の周波数との比で見るとかはどうか? ←2022/03/10実装
  • 常時FFTで良いか?
    FFTにはそこそこの計算コストがかかっていると思う。消費電力や、ラズパイのオーバーヒートが気になる。特に夏場、稼働し続けられるか心配。一定の音量が聞こえた際にFFTするようにしようかな。
  • 時報みたいなもの
    Raspberry Piがちゃんと稼働しているか心配なので、毎日決まった時間に「動いているよ~」とLINEに通知するとか。最終的には煩わしいだけ?
  • 時間制限
    例えば深夜は通知しないようにする、深夜の通知は朝にまとめて送る、など。そもそも深夜に来るのは重要な案件かもしれないが。
  • カメラと連動
    インターホンを検知したら、誰が来たかを撮影してLINEに送ることもできそう。インターホンに画面があれば良いが、玄関にカメラを別途設置するのもあり?共用玄関だと難しいか。

参考

ラズパイで気温と湿度を測定、LINEで通知を受け取る ~後編~

最新版

指定の周波数での強度(amp)について、
「絶対値が大きい」&「他の周波数での強度(amp2)と比べて大きい」
で検出すると誤検知が激減したので最新版を載せておく。

freq_indices2で、各ピークの2倍の周波数を指定している。
ピークが1f, 3f, 5f, ...だったので、2f, 6f, 10f, ...を見ることになり、ここでの強度は出ないはず。
amp/amp2がthreshold2よりも大きいという条件を加えた。

Doorbell_notifier_20220310.py
import pyaudio
import wave
import numpy as np
import time
from Pysound_Error_hider import noalsaerr # 第2回参照。ALSA Errorが出ない場合は不要。
import matplotlib.pyplot as plt
import requests

Check_every_time = False # 検知したときにFFTプロット。実際に運用するときはFalse。
LINE_token = "(トークンをここにコピー)"

RECORD_SECONDS = 1 # 第3回で使用した値に合わせる
threshold = 1.5e7 # 要調整
threshold2 = 5
freq_indices = [ 610,  611,  612,  613,  615,  616, 1831, 1832, 1833, 1834, 1835, \
                 1836, 3056, 3057, 3058, 3059, 4277, 4278, 4280, 4281, 4282, 4283, \
                 4285 ] # 第3回で決めた値を入れる
freq_indices2 = [ f*2 for f in freq_indices ]

input_device_index = 2 # 第1回で調べた値
CHUNK = 1024 * 8 # 第1~3回で使用した値に合わせる
FORMAT = pyaudio.paInt16
CHANNELS = 1 # モノラル入力
RATE = 44100 # 第1~3回で使用した値に合わせる
rng = int(RATE / CHUNK * RECORD_SECONDS)

def setup():
    with noalsaerr(): # 第2回参照。ALSA Errorが出ない場合はwith文は不要。
        p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    input_device_index = input_device_index,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK,
                    )
    return p, stream

def collect_data(stream, rng, CHUNK):
    frames = []
    for i in range(rng):
        data = stream.read(CHUNK, exception_on_overflow=False) # 第2回参照
        frames.append(data)
    d = np.frombuffer(b''.join(frames), dtype='int16')
    return d

def calc_FFTamp(frames, freq_indices, freq_indices2):
    fft_data = np.abs(np.fft.fft(frames))
    amp, amp2 = 0, 0
    for i in freq_indices:
        amp += fft_data[i]
    for i in freq_indices2:
        amp2 += fft_data[i]
    return amp, amp2
    
def check_plot(d):
    fft_data = np.abs(np.fft.fft(d))    #FFTした信号の強度
    freqList = np.fft.fftfreq(d.shape[0], d=1.0/RATE)    #周波数(グラフの横軸)の取得
    plt.plot(freqList, fft_data)
    plt.xlim(0, 5000)    #0~5000Hzまでとりあえず表示する
    plt.show()

def send_LINE(token, amp, amp2, threshold, threshold2):
    url = "https://notify-api.line.me/api/notify" 
    token = token
    headers = {"Authorization" : "Bearer "+ token} 
    message =  "が鳴ってるよ\n強度 {:.2e} --- 基準 {:.1e}\n比率 {:.2e} --- 基準 {:.1e}".format(amp,threshold,amp/amp2,threshold2)
    payload = {"message" :  message} 
    r = requests.post(url, headers = headers, params=payload)

if __name__ == '__main__':
    p, stream = setup()
    print("Watching...")
    try:
        while True:
            d = collect_data(stream, rng, CHUNK)            
            amp, amp2 = calc_FFTamp(d, freq_indices, freq_indices2)
            if (amp > threshold)&(amp/amp2 > threshold2):
                print("Someone is at the door.")
                send_LINE(LINE_token, amp, amp2, threshold, threshold2)
                if Check_every_time:
                    check_plot(d)
                time.sleep(5)
                print("Keep watching...")
    except KeyboardInterrupt:
        print('You terminated the program.\nThe program ends.')
        stream.stop_stream()
        stream.close()
        p.terminate()
5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?