45
34

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Shazam的な音声フィンガープリントアルゴリズムを実装してみる

Last updated at Posted at 2017-09-16

Shazamみたいに、スマホで録音した音声から流れてる音楽の情報を素早く特定するために、**音声フィンガープリント(Audio Fingerprinting)と呼ばれる技術が使われています(Content based music retrievalの大枠にも入ります)。この検索技術は、スマホのローファイな音質や環境音などのノイズ、そして膨大な音楽データベースを考えると、とても一筋縄ではいかないように見えますが、実際にはかなり前に色んなサービスで実用化されていて、Siriにも搭載されましたね。
音声フィンガープリントには幾つか違うアプローチがあると思いますが、一番主流かつ分かりやすいのが「スペクトログラムのピークをハッシュ化して保存/検索する」という手法でしょう。界隈では
「Landmark-based fingerprinting」**と呼ばれてるそうです。今回はこの手法でフィンガープリント技術を実装してみたいと思います。
参考はこちらのブログ:
Audio Fingerprinting with Python and Numpy
およびこの論文:
Wang, Avery. "An Industrial Strength Audio Search Algorithm." Ismir. Vol. 2003. 2003.

LabROSAにはAUDFPRINTというオープンソースプロジェクトがあります:
AUDFPRINT - Audio fingerprint database creation + query

#スペクトログラム変換とピーク検出
まずは音声シグナルをSTFTした後、T-F表示のピーク地点を求めます。ここでは画像処理の要領で探します。scipyのmaximum filterをスペクトログラムにかけ、その結果と元のスペクトログラムの値が等しい地点がピークと見なされます(値が小さい地点は無条件で除外)。前処理に2Dローパスフィルターをかけておくのもいいかもしれません。

analyze.py
def find_peaks(y,size):
    """
    y:音声シグナル
    size:maximum filterのサイズ(ピークの密度を制御する)
    """
    sgram = np.abs(stft(y,n_fft=512,hop_length=256))
    sgram_max = ndi.maximum_filter(sgram,size=size,mode="constant")
    maxima = (sgram==sgram_max) & (sgram > 0.2)
    peaks_freq,peaks_time = np.where(maxima)
    return peaks_freq,peaks_time

numpy.whereを使ってピーク地点のリストを得ることができました。あるいはランドマークとも呼びます。

#ピークのペア化→ハッシュ化
単体のピークにはそのピークの周波数の情報しか持っておらず(クエリのタイミングは任意なのでピークの時間位置は無意味)、そのままハッシュにしても使えません。そこで、近くにあるピーク同士を結び付けて、2つのピークを一つのフィンガープリントとします。このピークのペアには、**(ピーク1の周波数、ピーク2の周波数、時間差)**の情報が含まれており、これらの値を連結することでより重複しにくいハッシュを作ることができます。
具体的には図のように、あるanchor pointから見て前方一定範囲内の点を結び付けています。

shazam1.png

以下のコードではペアを選んだあと、**(ピーク1の周波数、周波数差、時間差)**を20ビットの数値に詰め込んでハッシュ化しています。

analyze.py
F1_BITS = 8
DF_BITS = 6
DT_BITS = 6

B1_MASK = (1<<F1_BITS) - 1
B1_SHIFT = DF_BITS + DT_BITS
DF_MASK = (1<<DF_BITS) - 1
DF_SHIFT = DT_BITS
DT_MASK = (1<<DT_BITS) - 1

def peaks_to_landmarks(peaks_freq,peaks_time,target_dist=30,target_time=30,target_freq=30):
    peak_mat = np.zeros((np.max(peaks_freq)+target_freq,np.max(peaks_time)+target_dist+target_time),dtype=np.bool)
    peak_mat[peaks_freq,peaks_time] = True
    list_landmarks = []
    for pfreq,ptime in zip(peaks_freq,peaks_time):
        #(pfreq,ptime) -- current anchor
        target_mask = np.zeros(peak_mat.shape,dtype=np.bool)
        target_mask[pfreq-target_freq:pfreq+target_freq,ptime+target_dist:py+target_dist+target_time] = 1
        #target_mask:target peakを選ぶ範囲を指定する
        targets_freq,targets_time = np.where(peak_mat & target_mask)
        for pfreq_target,ptime_target in zip(targets_freq,targets_time):
            dtime = ptime_target-ptime
            #ハッシュを構築
            hsh = (((pfreq & B1_MASK)<<B1_SHIFT) | (((pfreq_target+target_freq-pfreq)&DF_MASK)<<DF_SHIFT)|(dtime&DT_MASK))
            list_landmarks.append((hsh,ptime))   #ハッシュとピーク1の時間を保存
        
        
    return list_landmarks

このハッシュ値に**(曲のID,ピーク1の時間位置)**を結び付けてハッシュテーブルに登録します。pythonのdictionaryオブジェクトを使っています。

#hash_table: python dictionaryオブジェクト
for landmark in list_landmarks:
    hsh = landmark[0]
    starttime = landmark[1]
    if hash_table.has_key(hsh):
        hash_table[hsh].append((audio_id,starttime))
    else:
        hash_table[hsh] = [(audio_id,starttime)]

全てのハッシュの登録が済んだらpickleモジュールで保存しましょう。

#クエリ
コード進行認識用のデータセットとして使っている、計497曲をデータベース化しました。えらく時間がかかりましたが、pickleされたハッシュテーブルのサイズは約250MBでした。試しに、データセットからBeatlesの**「Back In the U.S.S.R」**を再生し、ポケットレコーダーで十秒ほどの録音を作成しました。室内で録ったものなのでかなり理想的な録音ですが、これで試してみます。
録音された音声を、上と同じようにSTFTしてピークを検出してフィンガープリント(ハッシュ値)を見つけました。さてここからどうやって曲にたどり着けばいいのか。
ハッシュテーブルとクエリをマネージするDatabaseクラスを書きました。

Database.py
DIFF_RANGE = 2000000

class Database:
    def __init__(self):
        with open("hashtable.pkl","rb") as f:
            self.hash_table = pickle.load(f)
        with open("titlelist.pkl","rb") as f:
            self.title_list = pickle.load(f)
    #方法1
	def query_histogram(self,y,best=5):
		list_landmarks = analyze.extract_landmarks(y)
		id_candidates = self._find_candidate_id(list_landmarks)
		candidate_landmark_histogram = np.zeros(id_candidates.size,dtype=np.int32)
		for landmark in list_landmarks:
			hsh = landmark[0]
			if self.hash_table.has_key(hsh):
				hit_ids = [item[0] for item in self.hash_table[hsh]]
				for i in hit_ids:
					candidate_landmark_histogram[np.where(id_candidates==i)[0]] += 1
			else:
				continue
		
		idxsort = np.argsort(candidate_landmark_histogram)[::-1]
		title_sort = self.title_list[id_candidates[idxsort]]
		fingerprint_histogram_sort = candidate_landmark_histogram[idxsort]
		return title_sort[:best],fingerprint_histogram_sort[:best]

    #方法2
    def query_diff_histogram(self,y,best=5):
        list_landmarks = analyze.extract_landmarks(y)
        id_candidates = self._find_candidate_id(list_landmarks)
        diff_histogram = np.zeros((id_candidates.size,DIFF_RANGE),dtype=np.int32)
        diff_start_offset = np.zeros(id_candidates.size,dtype=np.int32)
        
        for landmark in list_landmarks:
            hsh_query = landmark[0]
            offset_query = landmark[1]
            if self.hash_table.has_key(hsh_query):
                hits = self.hash_table[hsh_query]
                for hit in hits:
                    id_hit = hit[0]
                    offset_hit = hit[1]
                    diff = offset_hit - offset_query
                    if diff >= 0:
                        id_hist = np.where(id_candidates==id_hit)[0]
                        diff_histogram[id_hist,diff] += 1
                        diff_start_offset[id_hist] = min([diff_start_offset[id_hist],offset_query])
            else:
                continue
            
        candidate_match_score = np.max(diff_histogram,axis=1)
        candidate_max_offset = np.argmax(diff_histogram,axis=1)
        idxsort = np.argsort(candidate_match_score)[::-1]
        title_sort = self.title_list[id_candidates[idxsort]]
        match_score_sort = candidate_match_score[idxsort]
        best_match_offset = (candidate_max_offset[idxsort[0]] - diff_start_offset[idxsort[0]]) * 256/11025.0
        return title_sort[:best],match_score_sort[:best],best_match_offset
        
                    
                    
    def _find_candidate_id(self,landmarks):
        candidate = set()
        for landmark in landmarks:
            hsh = landmark[0]
            if self.hash_table.has_key(hsh):
                id_hits = [item[0] for item in self.hash_table[hsh]]
                candidate = candidate.union(id_hits)
            else:
                continue
            
        return np.array(list(candidate),dtype=np.int32)

analyze.extract_landmarks(y)はピーク検出とハッシュ化を統合したものです。

analyze.py
def extract_landmarks(y,peak_dist=20):
    peaks_x,peaks_y = find_peaks(y,peak_dist)
    landmarks = peaks_to_landmarks(peaks_x,peaks_y)
    return landmarks

###1.ヒット数を数える
Database.query_histogramに該当します。データベースに登録されているハッシュにはそれぞれ曲のIDが(一つor複数)紐付けられています。録音に含まれていたハッシュを全部調べて、曲にヒットした回数を数え、一番ヒット数が多かった曲が正解です。これは一番単純な方法ですが果たして性能はどうでしょう。ヒット数上位5曲を表示してみました。
plot_hist.png

クエリの所要時間は0.5秒前後でした(ハッシュテーブルロードの時間はノーカン)。Back In The U.S.S.Rの原曲と一番多くハッシュがマッチしました。一応正解ですが、ほかの曲との差はあまり顕著ではありません。もう少し質が悪い録音だとすぐ不正解を出してしまいそうです。
ほかに問題なのは、この方法では録音は曲のどの部分にマッチしたのかを調べることができません。応用によっては別に必要でもないかもしれませんが、どうせなら知りたいですよね。

###2.「一列に並んでいる」フィンガープリントを数える
正解の曲とその他二曲を選び、クエリ音声とヒットしたフィンガープリントの**(元音声での時間位置、録音での時間位置)**の数値を2D 平面にプロットしてみます。
plot_offset.png

不正解の曲でもハッシュのヒット数がそこそこある場合がありますが、いずれも点の分布がバラバラ。一方正解のマッチ(一番上のプロット)では斜め一直線に点が並んでいる部分がありますね。これは一定時間内にフィンガープリントが持続的にヒットしていることをあらわしています。この為に、ハッシュを登録するときに原曲での時間位置を保存するんですね。
というわけで、この「一列に並んでいる」フィンガープリントが多い方が正解と判断します。斜め一列に並んでいる、すなわちx,yの差が定数(直線x-y=b上にある)ということなので、曲ごとに(x-y)のヒストグラムを求め、その最大値をスコアとみなします。Database.query_diff_histogramに該当します。
その結果がこちら。
plot_diff.png

わかりやすく差がつきましたね。クエリにかかった時間は約1.8秒。実装の効率にかなーり改善の余地がありそうですが、原理の実証としてはまずOKでしょう。計算量はN(クエリ音声のハッシュ数)なのでデータベースが更に大きくなってもたいして悪化しないはずです。
ソースコードのbest_match_offsetはクエリ音声の原曲での位置を求めています。これでどこにマッチしたかの問題も解決しました。

#更なる改善策
この手法は、**「原曲とクエリは再生速度が同一で、変調も無い」**という前提の上に成り立っていることに、注意が必要です。再生速度が変わったり、全体的に変調すると、ランドマーク間の相対時間距離や、(線形周波数域ゆえ)周波数距離が伸縮するので、ハッシュでマッチさせることができなくなります。
実用面ではあまり大きな問題にはなりませんが、解決策も提案されています。
*周波数の問題:FFTの代わりにCQT(Constant-Q Transform)を使う。対数周波数域でなら、変調が起きても周波数の相対距離は変わらない(変わりにくい)はず。FFTを対数周波数域に変換したものでも良さそうです。
*再生速度の問題:三つ以上のランドマークを結び付けて、それらの時間距離の「比率」をハッシュに保存する。比率であれば、再生速度が変化しても変わらない(変わりにくい)はず。しかし整数ではなく実数を扱うことになるため、ハッシュそのもののロバスト性はかなり下がりそうです。

#まとめ
簡単な音声フィンガープリントのアルゴリズムを紹介しました。audfprintもlandmark-basedですが、ピーク検出もクエリもまた少し違う手法で実装されているようです。ソースコードを読んでみると良いでしょう。理解できたら、また別の記事にしてみたいです。

45
34
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
45
34

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?