はじめに
もう5年くらい前(2015年頃)のことなのですが、耳の不自由な知人と飯に行ったときに
(交通系ICカードの)Suicaのタッチ音が聞こえなくて、タッチできたか不安なときがある
という話を聞きました。
当時、ちょうど秋葉原でRaspberry Pi(ラズパイ)を買ってみた頃で、何か工作のネタが無いかなと思っていたタイミングだったので、
Suica(PASMOでもICOCAでも何でもいいけど)のタッチ音を拾ったらLEDを光らせるマシンとか作れないかな?
と思い立って、作ってみたのでした。
ということをふと思い出したはいいのですが、機材(配線)をすでに解体してしまったので、どう説明したものかなあという感じです。
ちなみに、この記事を書いた時点でそんな状況だったのに、さらに投稿まで2年間放置状態にしていました…。
機材

- Raspberry Pi 1 Model B+ : 当時買ったのはなんとバージョン1です。後でも書きますがパフォーマンス面で難儀しました。今となってはラズパイ4まで出ていますが…。
- 卓上マイク(MS-STM55): 昔購入した有り合わせのマイク。RPiにつないで音を入力します。実用を考えたらピンマイクみたいなものの方がいいかも。
- USBオーディオ変換器(BSHSAU01BK): 確か、RPiの3.5インチプラグにつなぐと雑音が乗ったりしてうまく録れなかったので、USB経由でつなぐことにしたんだったかな。
- USBモバイルバッテリー(Anker Astro Mini): 持ち運びを意識して、RPiの電源をモバイルバッテリーから取ってみました。1Aしか出せないバッテリーですが、ちゃんと動きました。
- 7セグLED(OSL40562-IR): 基盤に刺さっていて0を表示している部品。0~9の数字が表示できる奴です。なんでただのLEDじゃないのかというと、タッチ音の鳴った回数を表示させたかったから。
- 導線とかユニバーサル基盤とか : 簡単に抜き差しできるので実験に最適。
- LANケーブル : PCからSSHで入ってコマンドを入力するために使っています。検出プログラムをサービスとして立ち上げるように設定しておけば、PC無しで動くはずなのでLANケーブルも要らなくなると思います(試してないけど)。今となってはラズパイにもWi-Fiが搭載されていますので、これも時代の流れですね。
上の画像では、さらにUSBの電流電圧チェッカーを接続しています。これはスタンバイ時(検出プログラムが走っていない時)だったと思いますが、USB電源は約5Vなので、1.4Wぐらいで動いていることになります。
配線とか
さっきの基盤の画像をよく見ると抵抗がついていますが、これは7セグLEDの電流を制限するためのものです。
7セグLEDのデータシートを見ると、
- LEDに流す電流は20mA以下1
- LEDで2.0Vほど電圧降下する(DC Forward Voltage)
ということなので、たまたま手元にあった220Ωの抵抗を介して、RPiのGPIOにつなぎました。
GPIOに関しては、例えば以下のページなどを見ていただければ。ON/OFFをコマンドやプログラムで制御できるようになっています。
さて、RPi1のGPIOは3.3Vなので、220Ωの抵抗を使えばLEDには6mAくらい流れることになるでしょうか2。

ある(点灯している)1セグメントに注目すると、こんな感じの回路になっているはずです。
実際は7つのセグメントが、理科の豆電球の実験よろしく並列につながっているイメージですね。もっとも、実際に+側が3.3Vなのは点灯しているセグメントだけで、消灯しているセグメントは+側が0Vになっている点に注意が必要です。消灯しているセグメントは、+側も-側も0Vですから、電流は流れません。
プログラム
あとはRaspbian上のPythonで動くプログラムを書きます。
大きく分けて
- マイクから音を取り込むところ
- タッチ音を検出するところ
- LEDを光らせるところ
あたりを作る必要があります。
マイクからの録音
USBオーディオ変換器を介して接続したマイクはALSAデバイスとして扱えるので、alsaaudioモジュールを使って書いています。
デバッグ用にwavファイルからの読み込みもやりたかったので、両対応な感じで書いています。
Raspberry Piの注意点として、下手なプログラムを書くと重くてどうにもならなくなる、というのがありました。PCだと一瞬で終わる処理も、RPiにとっては(特に今回のようにリアルタイム動作させたい時は)一苦労です。
FFTは決まったフレーム数(2の冪乗)ずつ処理しないといけない一方、マイク入力は常に決まったフレーム数ずつ読めるとは限りません。
というわけで、リングバッファを作って読み込みデータを結合していき、FFT用に一定の長さで切り出すといった処理が入っています。ここを何も考えずにリストの結合とスライスとかで書いてしまうと、重すぎて確か実時間で動かなかったはず…。
# -*- coding: utf-8 -*-
import numpy
import alsaaudio
import wave
import collections
FrameArrayTuple = collections.namedtuple(
"FrameArrayTuple",
["array", "nframes_read"])
# Based on https://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
class RingBuffer:
"A 1D ring buffer using numpy arrays"
def __init__(self, length):
pow2 = int(numpy.ceil(numpy.log2(length)))
self.length = 2 ** pow2
self.data = numpy.zeros(self.length, dtype='float64')
self.index_top = 0
self.index_bottom = 0
def extend(self, x):
"adds array x to ring buffer"
if x.size > 0:
x_index = (self.index_bottom + numpy.arange(x.size)) & (self.length-1)
self.data[x_index] = x
self.index_bottom = x_index[-1] + 1
def get(self, n=None):
"Returns the first-in-first-out data in the ring buffer"
idx = (self.index_top + numpy.arange(min(n, self.count()) if n is not None else self.count())) & (self.length-1)
self.index_top = idx[-1] + 1
return self.data[idx]
def count(self):
c = (self.index_bottom - self.index_top + self.length) & (self.length-1)
return c
class PCMInputStream:
def __init__(self, maxNumFrame):
# 単独でインスタンス化できないが、継承先から呼び出すこと
self.maxNumFrame = maxNumFrame
self.op_array = self.getFramesToArrayOperator()
def readFrames(self):
raise NotImplementedError("readFrames() must be implemented")
def readFrameArray(self):
frames = self.readFrames()
return self.op_array(frames)
def getNumChannels(self):
raise NotImplementedError("getNumChannels() must be implemented")
def getFrameRate(self):
raise NotImplementedError("getFrameRate() must be implemented")
def getSampleWidthInBytes(self):
raise NotImplementedError("getSampleWidthInBytes() must be implemented")
def getFramesToArrayOperator(self):
sw = self.getSampleWidthInBytes()
if sw == 1:
fmt = "uint8"
shift_amp = 128.0
max_amp = 128.0
elif sw == 2:
fmt = "int16"
shift_amp = 0.0
max_amp = 32768.0
else:
raise ValueError("getSampleWidthInBytes() must be return 1 or 2")
return
return (lambda frames: (numpy.frombuffer(frames, dtype=fmt) - shift_amp) / max_amp)
def getFrameArrayIterator(self):
# 常に numMaxFrame ずつ返すように工夫する。
nframes_read = 0
num_channels = self.getNumChannels()
arr_buffer = RingBuffer(self.maxNumFrame * 10)
l = -1
while l != 0:
if arr_buffer.count() >= self.maxNumFrame * num_channels:
nframes_read += self.maxNumFrame
# チャネルに分ける
arr_channels = arr_buffer.get(self.maxNumFrame * num_channels).reshape(self.maxNumFrame, num_channels).T
yield FrameArrayTuple(arr_channels, nframes_read)
else:
arr = self.readFrameArray()
l = arr.shape[0]
assert l % num_channels == 0
# 読み込んだ内容を結合
arr_buffer.extend(arr)
# 最後のデータを返す
arr = arr_buffer.get()
nframes_read += arr.shape[0] / num_channels
arr_channels = arr.reshape(arr.shape[0] / num_channels, num_channels).T
yield FrameArrayTuple(arr_channels, nframes_read)
def close(self):
pass
class PCMInputStreamFromWave(PCMInputStream):
def __init__(self, filename, maxNumFrame):
self.wv = wave.open(filename, "r")
self.ch = self.wv.getnchannels()
self.rate = self.wv.getframerate()
self.sw = self.wv.getsampwidth()
self.maxNumFrame = maxNumFrame
PCMInputStream.__init__(self, maxNumFrame)
def readFrames(self):
return self.wv.readframes(self.maxNumFrame)
def getNumChannels(self):
return self.ch
def getFrameRate(self):
return self.rate
def getSampleWidthInBytes(self):
return self.sw
def close(self):
self.wv.close()
class PCMInputStreamFromMic(PCMInputStream):
def __init__(self, rate, sampleWidth, maxNumFrame):
self.ch = 1
self.rate = rate
self.sw = sampleWidth
self.maxNumFrame = maxNumFrame
# 録音デバイス初期化
self.pcm = alsaaudio.PCM(alsaaudio.PCM_CAPTURE)
self.pcm.setchannels(self.ch)
self.pcm.setrate(self.rate)
# 処理高速化のため、2回分まとめて読み込む
print self.pcm.setperiodsize(self.maxNumFrame * 4)
if self.sw == 1:
self.pcm.setformat(alsaaudio.PCM_FORMAT_U8)
elif self.sw == 2:
self.pcm.setformat(alsaaudio.PCM_FORMAT_S16_LE)
else:
raise ValueError("sampleWidth must be 1 or 2")
PCMInputStream.__init__(self, maxNumFrame)
def readFrames(self):
length, frames = self.pcm.read()
return frames
def getNumChannels(self):
return self.ch
def getFrameRate(self):
return self.rate
def getSampleWidthInBytes(self):
return self.sw
タッチ音を検出する
これも割と難儀だった思い出。
基本的には、音声にFFTで周波数解析して、与えられた(タッチ音の)周波数に他の周波数より大きなパワーがあれば「タッチされた」と判定していた…はずです。
但し、PCでプログラムを書く感覚で安易にforループを使っていると、例えば周波数ビンの成分から特定周波数のパワーを計算する時に重すぎて実時間で動かなくなってしまう(下手するとFFT自体の処理よりも重い)という事態に見舞われ、組み込みモジュールやNumPy, SciPyで書けるところは書き直すとか、いろいろやった記憶があります。
# -*- coding: utf-8 -*-
import numpy
import scipy.fftpack
import time
import bisect
import collections
DetectionHistoryTuple = collections.namedtuple(
"DetectionHistoryTuple",
["cond_energy", "energy_peak", "freq_center_detected"])
# 検知状態を表す定数
DETECTION_ON = "on"
DETECTION_OFF = "off"
class SuicaDetection:
# エネルギー計算の周波数の半径 [Hz]
FREQ_TOLERANCE = 50
# 履歴の保存数
NUM_HIST_SAVED = 3
# 立ち上がり判定時のエネルギー比率閾値
THRES_ENERGY_RATIO = 0.25
# float64 の最小値
EPS_FLOAT64 = numpy.finfo(numpy.float64).eps
def freq_filter_vector(self, freq_center, freq_tolerance):
freq_delta = self.freq_axis[1] - self.freq_axis[0]
# freq_center +/- freq_tolerance に含まれるエネルギーを
# 計算するための重みベクトルを返す。
energy_weight = numpy.array(
[(lambda freq_min, freq_max:
(1.0 if freq_min <= f and f + freq_delta <= freq_max
else (freq_max - f) / freq_delta if freq_min <= f <= freq_max <= f + freq_delta
else (f + freq_delta - freq_min) / freq_delta if f <= freq_min <= f + freq_delta <= freq_max
else (freq_tolerance * 2 / freq_delta) if f <= freq_min and freq_max <= f + freq_delta
else 0.0))
(freq_center - freq_tolerance, freq_center + freq_tolerance)
for f in self.freq_axis])
return energy_weight
def __init__(self, center_freqs, freq_axis):
self.ts = time.time()
self.hist = []
self.time_axis = []
self.nframes_read = 0
self.detected_freq = None
self.init_energy = None
self.freq_axis = freq_axis
# 検出したい中心周波数に対するエネルギー重み
self.center_freqs = center_freqs
self.energy_weight_array = numpy.array([
self.freq_filter_vector(center_freq, self.FREQ_TOLERANCE)
for center_freq in center_freqs
]).T
def input_array(self, arr):
assert len(arr.shape) == 1
num_frames = arr.shape[0]
self.detect(arr)
status = None
if self.detected_freq:
# 音が鳴っている時は、直前3回でその周波数帯の音が-5dB落ちたら終了
if all((t.energy_peak is None) or (t.energy_peak - self.init_energy) < -5 for t in self.hist[-3:]):
self.detected_freq = None
status = DETECTION_OFF
else:
# 音が鳴っていない時は、直前3回のうち2回エネルギー条件を満たせばOK
if len([t for t in self.hist[-3:] if t.cond_energy]) >= 2:
self.detected_freq = self.hist[-1].freq_center_detected
self.init_energy = self.hist[-1].energy_peak
status = DETECTION_ON
self.nframes_read += num_frames
return (self.nframes_read, status)
def detect(self, arr):
#print "start", time.time()
assert len(arr.shape) == 1
num_frames = arr.shape[0]
# FFT
f = scipy.fftpack.fft(arr)
e = numpy.square(numpy.absolute(f))
# エラー防止
e = numpy.maximum(e, self.EPS_FLOAT64)
# 総エネルギー(の定数倍)
energy_total = e.sum()
# 周波数ごとのエネルギー
# +/-の周波数のエネルギーをまとめる(2倍する)
energy_axis = 2 * e[0:num_frames/2]
log_energy_axis = 10 * numpy.log10(energy_axis)
# 指定周波数近辺のエネルギー比率を計算
energy_weighted = energy_axis.dot(self.energy_weight_array)
energy_ratio_max, freq_center_detected = \
max(zip(list(energy_weighted / energy_total), self.center_freqs),
key=lambda t: t[0])
# エネルギー条件
# 一番強い周波数に着目
cond_energy = (energy_ratio_max >= self.THRES_ENERGY_RATIO)
# +/- 100Hz以内の最大パワーを基準にする
idx_low = bisect.bisect_left(self.freq_axis, freq_center_detected - 100)
idx_high = bisect.bisect_right(self.freq_axis, freq_center_detected + 100)
energy_peak = log_energy_axis[idx_low:idx_high+1].max()
# 履歴に追加
self.hist.append(DetectionHistoryTuple(cond_energy=cond_energy,
energy_peak=energy_peak,
freq_center_detected=freq_center_detected))
# 古い履歴の削除
if len(self.hist) > self.NUM_HIST_SAVED:
self.hist.pop(0)
LEDを光らせる
光らせたい数字に合わせて、対応するGPIOのON/OFFを制御します。
どのGPIOを使うかは呼び出し側プログラムで設定します。
このON/OFF制御にはRPi.GPIOモジュールを使いました。
rootで実行しないとGPIOを制御できないみたいなのでそこは要注意。
# -*- coding: utf-8 -*-
import sys
import RPi.GPIO as GPIO
import time
class GPIO7seg:
sevenseg_on = [[0, 2, 3, 5, 6, 7, 8, 9],
[0, 1, 2, 3, 4, 7, 8, 9],
[0, 1, 3, 4, 5, 6, 7, 8, 9],
[0, 2, 3, 5, 6, 8, 9],
[0, 2, 6, 8],
[0, 4, 5, 6, 8, 9],
[2, 3, 4, 5, 6, 8, 9]]
def __init__(self, id_pin_seg):
self.id_pin_seg = id_pin_seg
GPIO.setmode(GPIO.BCM)
for i in id_pin_seg:
GPIO.setup(i, GPIO.OUT)
def digit(self, n):
for i in xrange(7):
GPIO.output(self.id_pin_seg[i], n not in self.sevenseg_on[i])
メイン部分
メインプログラムからさっきの各モジュールを呼びつつ、判定結果に応じてLEDを光らせます。
- 前のタッチ音が来てから1秒以内のタッチ音は数字をカウントアップ
- 前のタッチ音から1~2秒の間は、タッチ音の回数の内部カウンタは0に戻っているが、表示カウンタはリセットされていない状態
- 1回鳴った1.5秒後とかにもう1回鳴ったら、カウンタは2にはならずに(内部的にはリセットされて)引き続き1が出るようにします。最初は、1秒経った時点ですぐに表示を0に戻していましたが、見た目の動きが忙しすぎるなと思ったので、この部分を入れています。
- 前のタッチ音が来てから2秒経つと、表示されるカウンタが0に戻る
例えば「ピッ」と鳴ったら1が表示され、「ピピッ」と鳴ったら2が表示される、という感じです。
改札機通過の際には1回と2回で意味が違うので、それを分かるようにしてみようというプログラムです。
7セグLEDを光らせるためのGPIOですが、今回は15~21番ピンをそれぞれ7セグLEDのA~G(データシート参照)のセグメントに対応させています。
このメインプログラムを、(GPIO制御のため)rootで実行すると、うまくいけばマイクが拾ったタッチ音に反応して数字が変わるはず。
# !/usr/bin/env python2
# -*- coding: utf-8 -*-
import sys
import numpy
import itertools
import pcmmod
import suicadetection
import sevenseg
# ================================
def main():
# 信号長(サンプル数)
MAX_NUM_FRAMES = 512
# 検出したい周波数
FREQS_CENTER = [2550, 2700, 3000]
# 窓関数
ham_window = numpy.hamming(MAX_NUM_FRAMES)
# WAVE読み込み or マイク入力
if len(sys.argv) == 1:
# マイク
rate = 44100
sw = 2
sound = pcmmod.PCMInputStreamFromMic(rate, sw, MAX_NUM_FRAMES)
else:
# WAVE
sound = pcmmod.PCMInputStreamFromWave(sys.argv[1], MAX_NUM_FRAMES)
rate = sound.getFrameRate()
# 周波数軸
freq_axis = numpy.fft.fftfreq(MAX_NUM_FRAMES, 1.0/rate)[0:MAX_NUM_FRAMES/2]
sd = suicadetection.SuicaDetection(FREQS_CENTER, freq_axis)
# カウンタ (7SEG LED)
counter_ring = 0
id_pin_seg = [15, 16, 17, 18, 19, 20, 21]
gpio7 = sevenseg.GPIO7seg(id_pin_seg)
gpio7.digit(counter_ring)
# 最後にカウントアップした時間を見る
counted_last = None
# 波形を読み込む
# フルで読み込めた部分まで使う
for arr, nframes_read in itertools.takewhile(lambda t: t.array.shape[1] == MAX_NUM_FRAMES,
sound.getFrameArrayIterator()):
# 判定に使う: 2chならLを取る
time_frames, status = sd.input_array(arr[0] * ham_window)
if status == suicadetection.DETECTION_ON:
print float(time_frames) / rate, "ON"
# 0.1秒経たないとカウントしないようにする
counted_last = time_frames
pending = True
elif status == suicadetection.DETECTION_OFF:
print float(time_frames) / rate, "OFF"
print
pending = False
if counted_last is not None:
if time_frames > counted_last + rate * 2.0:
# 2秒経ったらLEDをリセットする
gpio7.digit(counter_ring)
counted_last = None
elif time_frames > counted_last + rate * 1.0:
# 1秒経ったらコンボ判定しない(内部だけカウンタをリセット)
counter_ring = 0
elif pending and time_frames > counted_last + rate * 0.1:
counter_ring += 1
gpio7.digit(counter_ring)
pending = False
if __name__ == "__main__":
main()
まとめ
タッチ音を判定するところで結構いろいろ調整したと思います。
単純にパワーだけで見てしまうと、うるさい駅構内とかで反応しまくってしまうし。
あとはタッチ音の周波数、実は意外と統一されていないらしいということが分かりました。
改札機の場合とコンビニのレジの場合を比べても、実はタッチ音の高さ(周波数)が違うので、どちらでも検出できるようにしようなどと考えていると、だんだんややこしいことに…。
ちなみに、タッチ音の周波数などそれまで気にしたことがなく、このプログラムを書くときに初めて具体的な数字をAudacityで調べてみました。
冒頭の知人に「タッチ音は2550Hzとか2700Hzとか鳴ってるみたい」と報告したら、「それで聞こえてないんだな」と納得していました。
バリアフリーだとかユニバーサルデザインだとか言われるようになって久しいですが、
目に見える段差とかだけ見てないですか?音とかも周波数帯域ちゃんと考えて設計してますか?
っていう気づきはあったと思います。
もっとも、低い音にしてしまうと、今度は雑踏に紛れて聞こえにくくなるのかもしれません。難しいですね。