はじめに
動的時間伸縮法(DTW: Dynamic Time Warping)とは2つの波形データの類似度を測定する手法です。
以前の記事では、Pythonで fastdtw
というライブラリを用いて、DTW を簡易的に実装しました。
本記事では、ライブラリを使わずに DTW 距離を算出するプログラムをコーディングしていきます。
ライブラリを用いた DTW の実装方法については次の記事で説明しています。
なお、DTW のアルゴリズムについては次の記事で解説しています。
実装
疑似コード
次に日本語で処理内容を書いたPython風の疑似コードを示します。
以降に示すコードの意図が伝われば幸いです。
「Qの要素数(行) x Sの要素数(列)」の形状をとる累積距離行列を定義する
累積距離行列の1行目1列目のセルを求める
累積距離行列の1行目2列目にあるセルをすべて求める
# 累積距離行列の2行目以降を求める
for i in [2行目の行番号, ..., 最下行の行番号]:
1列目のセルを求める
2列目以降のセルをすべて求める
for j in [2列目の列番号, ..., 最右列の列番号]:
d_{i, j} = (q_i - s_j)の絶対値 + min(d_{i, j}の左上のセル, d_{i, j}の上のセル, d_{i, j}の左のセル)
# パスの探索
i = 最下行の行番号
j = 最右列の列番号
path = [(i, j)]
while 累積距離行列の1行目1列目のセルまで到達していない:
if 最も上の行まで探索した:
左に隣接しているセルを経路にする
elif 最も左の列まで探索した:
上に隣接しているセルを経路にする
# 最も上の行および最も左の列まで探索していない場合
else:
# 探索対象のセルで最小な箇所を取得する
d_min = min(探索中のセルに対して左上、上、左に隣接しているセル)
# 波形データの形状によって採用するセルの優先度を変える
if 波形Qの方が波形Sよりデータポイント数が多い:
if 探索対象のうち最小なセルが上にある:
上に隣接しているセルを経路にする
elif 探索対象のうち最小なセルが左上にある:
左上に隣接しているセルを経路にする
else:
左に隣接しているセルを経路にする
if 波形Sの方が波形Qよりデータポイント数が多い:
if 探索対象のうち最小なセルが左にある:
左に隣接しているセルを経路にする
elif 探索対象のうち最小なセルが左上にある:
左上に隣接しているセルを経路にする
else:
上に隣接しているセルを経路にする
if 波形Qと波形Sのデータポイント数が等しい:
if 探索対象のうち最小なセルが左上にある:
左上に隣接しているセルを経路にする
elif 探索対象のうち最小なセルが左にある:
左に隣接しているセルを経路にする
else:
上に隣接しているセルを経路にする
経路にしたセルの位置情報「(行番号, 列番号)」をパスのリストの先頭に追加する
コーディング
次に Python で実行できるように dtw()
関数としてコーディングしたプログラムを示します。
- 引数
- 第1引数
Q
: 波形データ - 第2引数
S
: もう片方の波形データ
- 第1引数
- 返り値
-
D[len(D) - 1][len(D[0]) - 1]
: DTW距離 -
path
: データポイントの対応関係(パス)
-
def dtw(Q, S):
# 累積距離行列の定義
D = []
for i in range(len(Q)):
D.append([])
for j in range(len(S)):
D[i].append(0)
# 累積距離行列の1行目を求める
D[0][0] = abs(Q[0] - S[0])
for j in range(1, len(S)):
D[0][j] = abs(Q[0] - S[j]) + D[0][j-1]
# 累積距離行列の2行目以降を求める
for i in range(1, len(Q)):
# 1列目のセル
D[i][0] = abs(Q[i] - S[0]) + D[i-1][0]
# 2列目以降のセル
for j in range(1, len(S)):
D[i][j] = abs(Q[i] - S[j]) + min(D[i-1][j-1], D[i-1][j], D[i][j-1])
# パスの探索
i = len(D) - 1
j = len(D[0]) - 1
path = [(i, j)]
while i != 0 and j != 0:
# 最も上の行まで探索した場合
if i == 0:
j -= 1
# 最も左の列まで探索した場合
elif j == 0:
i -= 1
# 最も上の行および最も左の列まで探索していない場合
else:
# 探索対象のセルで最小な箇所を取得する
d_min = min(D[i-1][j-1], D[i-1][j], D[i][j-1])
# 波形データの形状によって採用するセルの優先度を変える
if len(S) < len(Q):
if d_min == D[i-1][j]:
i -= 1
elif d_min == D[i-1][j-1]:
i -= 1
j -= 1
else:
j -= 1
elif len(S) > len(Q):
if d_min == D[i][j-1]:
j -= 1
elif d_min == D[i-1][j-1]:
i -= 1
j -= 1
else:
i -= 1
else:
if d_min == D[i-1][j-1]:
i -= 1
j -= 1
elif d_min == D[i][j-1]:
j -= 1
else:
i -= 1
# パスを追加する
path.insert(0, (i, j))
return D[len(D) - 1][len(D[0]) - 1], path
動作確認
波形データの用意
本記事では疑似的なデータとして、前回の記事 と同様の波形を用意します。
Q = [3, 4, 3, 3, 2, 3]
S = [1, 6, 4, 4, 1, 5]
DTW距離の計算
次のコードで DTW 距離を計算します。
dist, path = dtw(Q, S)
print(f"DTW距離: {dist}")
print(f"各点のパス:{path}")
次が出力されると成功です。
DTW距離: 9
各点のパス:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]
次のコードでデータポイント間の対応関係を確認できます。
from matplotlib import pyplot as plt
import japanize_matplotlib
# 2つの時系列データをプロットする
plt.plot(Q, label='Q')
plt.plot(S, label='S')
# データポイントの対応関係をプロットする
for index_Q, index_S in path:
plt.plot([index_Q, index_S], [Q[index_Q], S[index_S]], color='gray', linestyle='dotted', linewidth=1)
# プロット
plt.legend()
plt.title(f'波形データ(DTW距離={dist})')
plt.show()
次の画像がプロットされると成功です。
比較として、fastdtw
で DTW の処理をした結果を次に示します。
どちらも DTW 距離は等しいですが、データポイントのパスが異なります。
この原因として fastdtw
は計算量を抑えるために近似的な処理をしていることが考えられます。そのため、上記の結果からパスの適用が不自然になる可能性があることを確認できます。
本記事で実装した dtw()
関数の処理は入力された2つの波形データを総当たり的に計算していることで正しい結果を得られることができます。しかし、計算量が $O(nm)$ となり、各波形データのデータポイント数に大きく依存するので、注意が必要です。
参考