隠れマルコフモデル・ビタービアルゴリズムをPythonを使って実装してみた
- 続・わかりやすいパターン認識―教師なし学習入門―の隠れマルコフモデル・ビタービアルゴリズムをPythonを使って実装してみました
隠れマルコフモデル・ビタービアルゴリズム
本の流れに沿ってコードを書いたので、少々汚いです!
c = 3 # 状態数
s = [ "w1", "w2", "w3" ] # 状態
m = 2 # 出力数
v = [ "v1", "v2" ] # 出力記号
n = 3 # 観測回数
# それぞれの状態の初期確率
p = [ 1/3, 1/3, 1/3 ]
# 状態遷移確率
A = [ [ 0.1, 0.7, 0.2 ],
[ 0.2, 0.1, 0.7 ],
[ 0.7, 0.2, 0.1 ] ]
# 状態毎のでのそれぞれの観測確率
B = [ [ 0.9, 0.1 ],
[ 0.6, 0.4 ],
[ 0.1, 0.9 ] ]
# p1をp[0], p2をp[1], p3をp[2]
# v1をv[0], v2をv[1]
# x1をx[0], x2をx[1], x3をx[2]
# として以下表現する
# 観測結果
x = [ 0, 1, 0 ] # v1, v2, v1という結果
class HMM:
psi = []
PSI = []
t = 1
def __init__(self, s, v, p, A, B):
self.outputs = v
self.states = s
self.state_probabilities = p
self.transition_probabilities = A
self.emission_probabilities = B
def viterbi_with_result(self, observations):
self.observations = observations
# Step1 初期化
# 1回目なのでt = 1なので
# x1 = v1, x2 = v1, x3 = v1
self.psi.append({})
self.PSI.append({})
for index, state in enumerate(self.states):
sp = self.state_probabilities[index]
ep = self.emission_probabilities[index][observations[0]]
self.psi[0][index] = (round(sp * ep, 4), index)
self.PSI[0][index] = (0, None)
self.print_process()
# Step2 再帰的計算
def calc_max_probability(current_state_index):
results = []
prev_t = self.t - 1
for prev_state_index, state in enumerate(self.states):
a = self.transition_probabilities[prev_state_index][current_state_index]
prev_probability = self.psi[prev_t - 1][prev_state_index][0]
results.append((prev_probability * a, prev_state_index))
return max(results)
for observation in observations[1:]:
self.t = self.t + 1
self.psi.append({})
self.PSI.append({})
for index, state in enumerate(self.states):
prob, max_state_index = calc_max_probability(index)
self.PSI[self.t - 1][index] = (prob, max_state_index)
b = self.emission_probabilities[index][observations[self.t - 1]]
self.psi[self.t - 1][index] = (round(prob * b, 4), index)
self.print_process()
# STEP3
print("================================")
print("結果")
max_prob = max(self.psi[-1].values())
last_i = max_prob[1]
results = [None for t in range(self.t)]
results[-1] = last_i
for index in range(self.t)[1:]:
t = self.t - index
if t < 0: continue
results[t - 1] = self.PSI[t][results[t]][1]
print("■ 最も確率の高い遷移")
print(" -> ".join(map(lambda x: self.states[x], results)))
print("■ 上記の遷移確率")
print(max_prob[0])
def print_process(self):
index = self.t - 1
current_psi = self.psi[index]
print("================================")
print(str(self.t) + "回目 結果: " + self.outputs[self.observations[self.t-1]])
print("ψ" + str(self.t) + "(1): " + self.states[current_psi[0][1]] + ": " + str(current_psi[0][0]))
print("ψ" + str(self.t) + "(2): " + self.states[current_psi[1][1]] + ": " + str(current_psi[1][0]))
print("ψ" + str(self.t) + "(3): " + self.states[current_psi[2][1]] + ": " + str(current_psi[2][0]))
print("Ψ")
print(max(self.PSI[index].values()))
if __name__=="__main__":
hmm = HMM(s, v, p, A, B)
hmm.viterbi_with_result(x)