はじめに
今日もパターン認識の一つ技術として重要なDPマッチングについて、コードを書いてみたので紹介します。DPは dynamic programmingで日本語では「動的計画法」と訳されています。これは、ナップサック問題のように、部分問題に分割して最適化問題を解く解法として紹介されることもありますが、ここでは時系列のマッチングに特化して話します。同じ内容をDTW(dynamic time warping)と呼ぶこともあります。要は、時系列の最適なマッチングを求める問題です。
最適な時系列のマッチング、って何?と思うかもしれませんが、例えば二つのテキストファイルを比較するdiff コマンドのようなものです。一致する行もあれば、抜けている行、余分に挿入されている行、文字が書き換わっている行、などがあるます。これらを含めて最適な行の対応付けは、DPを用いて行えます。
どんな問題に使えるか?
動的計画法の立場からは、「最適性の原理」が成り立つ場合、ということになります。問題を部分問題に分解し、それぞれの部分問題の最適化が相互に影響しない場合に限ります。時系列なら、マルコフモデル、がその一例です。ある時刻の状態変化と別の時刻の状態変化が独立であり、観測確率も独立なら使えます。
DPマッチングによる文字列のマッチングの原理と実装
マッチングは最適経路探索問題
2つの時系列のマッチングは、経路探索の問題として解けます。
最適性の原理で逐次更新
今、時系列のマッチングについて、コストを削除、置換、挿入による不一致をそれぞれ1とするように設計します。これは、削除や挿入に関する遷移コストを1にする、シンボルの不一致のコストを1にする、ことにします。実装は、下記のようになります。
これを最適性の原理を用いて、逐次更新していきます。更新するときは、かならず、最も低いコスト(最適解)だけでなく、どのノードからのパスが最適であったかを保持します。
from numpy import ndarray, nan
from numpy import array, zeros, ones, argmin
from typing import List, Tuple
def local_cost(v1:str, v2:str) -> float:
"""
"""
if v1 == v2:
return 0.0
else:
return 1.0
def trancost(i1:Tuple[int,int], i2:Tuple[int,int]) -> float:
"""
遷移コスト
"""
if i1[0] == i2[0] -1 and i1[1] == i2[1] - 1:
return 0.0
else:
return 1.0
def dpmatching_forward(in_query:List[str], in_template:List[str]) -> (float, list):
"""
"""
_cum = ones((len(in_query), len(in_template))) * nan
b_pt = dict()
# i1=0
_cum[0,0] = 0.0 + local_cost(in_query[0], in_template[0])
for i2 in range(1, len(in_template)):
_cum[0, i2] = _cum[0, i2-1] + trancost((0, i2-1), (0, i2)) \
+ local_cost(in_query[0], in_template[i2])
b_pt[(0, i2)] = (0, i2-1)
# i1=1, ...
for i1 in range(1, len(in_query)):
_cum[i1,0] = _cum[i1-1, 0] + trancost((i1-1,0), (i1,0)) + local_cost(in_query[i1], in_template[0])
b_pt[i1, 0] = (i1-1, 0)
for i2 in range(1, len(in_template)):
bp_candidates = [(i1-1,i2-1), (i1,i2-1), (i1-1,i2)]
cost_candidates = [_cum[idx[0],idx[1]] + trancost(idx,(i1,i2)) \
+ local_cost(in_query[i1], in_template[i2]) for idx in bp_candidates]
_cum[i1,i2] = min(cost_candidates)
b_pt[(i1,i2)] = bp_candidates[argmin(cost_candidates)]
return _cum, b_pt
トレースバック
マッチングの最終状態は、最後のシンボル通しの対応なので、これが経路の終点になるはずです。ここから、先に求めたバックポインタを辿って戻ります。最適解となるための経路である有ることが保証されているので、開始点まで戻ることで最適経路が得られます。
def dpmatching_backtrace(b_pt:dict, end_pt:Tuple[int, int], start_pt:Tuple[int,int]) -> List[Tuple[int,int]]:
"""
"""
p = end_pt
b_path:List[Tuple[int,int]] = []
while True:
b_path.append(p)
if p == start_pt:
break
try:
_p = b_pt[p]
p = _p
except KeyError as e:
raise(e)
b_path.reverse()
return b_path
実行例
実際にこれを用いて、DPマッチングをしてみます。
2つのシンボル列
テンプレート:C E E F G (ドレミファソ)
入力:C E E F C G (どミミファドソ)
の最適なマッチングを考えます。
input_template = ["C", "D", "E", "F", "G"]
input_query = ["C", "E", "E", "F", "C", "G"]
cum, b_pt = dpmatching_forward(input_query, input_template)
end_pt = (cum.shape[0]-1, cum.shape[1]-1)
back_path = dpmatching_backtrace(b_pt, end_pt, (0,0))
for idx in back_path:
print(idx, input_query[idx[0]], input_template[idx[1]])
実行すると、下記の結果を得ます。
(0, 0) C C 0.0
(1, 1) E D 1.0
(2, 2) E E 1.0
(3, 3) F F 1.0
(4, 3) C F 3.0
(5, 4) G G 3.0
最初はC-C で一致していて、エラーは0です。次に、E-D でシンボルの不一致によりエラーが1になります。次に、E-E, F-Fと一致するので、エラーの個数は1のままです。つぎに、C-Fとなり、余計なシンボルが挿入され、さらにシンボルの不一致もありエラーが2つ上ます。最後はG-Gで一致しているので誤差は増えません。3つのエラーとするのが、最適なマッチングになります。
まとめ
とりあえず、解説がないまま、DPマッチングが実装できたっぽいので、報告しました。
今後、初めて読む人にも分かるように文章を追加してこのページを完成させようと思います。PCの電池も無くなりそう。涙。とりあえず、勢いをつけておかないと先に進めないので、投稿します。。。m(__)m
(2021/03/30)