こんにちは!データサイエンティストのじゃむです。
本日は、時系列データで使用する移動平均と移動分散を逐次的に求めるアルゴリズムの紹介です。
問題設定
長さTの時系列データX=(X_t)_{t=1,・・・,T}に対して、k項移動平均と移動分散を逐次的に求めることを考えます。
たとえば、X=[1,2,3,4,5]、k=2として、
移動平均 = [1.5, 2.5, 3.5, 4.5]
移動分散 = [0.25, 0.25, 0.25, 0.25]
をどうやったら効率よく算出できるかを考えます。
単純な方法
たとえば先ほどの例でX=[1,2,3,4,5]の2項移動平均を愚直に求めるとすると
(1+2)/2 = 1.5, (2+3)/2 =2.5 ・・・
同じように、2項移動分散を求めると、
((1-1.5)^2 + (2-1.5)^2)/2 = 0.25, ((2-2.5)^2 + (3-2.5)^2)/2 = 0.25, ・・・
となります。
一般的には、j番目のk項移動平均 $m_j$ と移動分散 $v_j$ は次のようにかけます。(j=0,1,2,・・・, T-1)
$$ m_j = \frac{1}{k}\sum_{i=j}^{j+k-1} X_i $$
$$ v_j = \frac{1}{k}\sum_{i=j}^{j+k-1} (X_i-m_i)^2 $$
このままでは、計算量が高々O(T*k)になるので、今回のアルゴリズムでは、毎回Σ計算をせずに計算量がO(T)となるように工夫します。
なにをするか
0番目のk項移動平均の計算は、
$$ m_0 = \frac{1}{k}\sum_{i=0}^{k-1} X_i $$
ですが、1番目の移動平均を計算するときに、0番目の移動平均を使うことを考えます。1番目の移動平均を計算するためには、$X_1$から$X_k$までの合計値が必要です。これは$X_0$から$X_{k-1}$までの合計値から、$X_k$を足して、$X_0$を引いたものと考えれば、 $X_1$ から $X_k$ までの合計値は $m_0k - X_0 + X_{k}$ と算出することができます。よってΣの計算をせずに各移動平均は、O(1)で求められ、$i$番目のk項移動平均は次のように逐次的に求められます。
$$m_i = m_{i-1} - \frac{X_{i-1} - X_{i+k-1}}{k}$$
次に、移動分散ですがこれは少々厄介で、次の分散公式が必要です。
$$\frac{1}{n}\sum_{i=1}^{n} (X[i]- m)^2 = \frac{1}{n}\sum_{i=1}^{n} X^2[i] - m^2 $$
ここで、$m$ は$X$ の平均です。これを使って、$v_j$ を書き換えると、
$$ v_j = \frac{1}{k}\sum_{i=j}^{j+k-1} (X[i]-m_i)^2 = \frac{1}{k}\sum_{i=j}^{j+k-1} X^2[i] -m^2_i$$
になります。移動平均と同じように$v_0$を求めると、上記の式から
$$ v_0 = \frac{1}{k}\sum_{i=0}^{k-1} X^2[i] -m^2_0$$
$v_1$を求めるには、$X_1$から$X_k$までの二乗和が必要ですが、移動平均と同じようにすれば、$X_0$から$X_{k-1}$までの二乗和から、$X_k$の二乗を足して、$X_0$の二乗を引けばよいです。よって、$X_1$から$X_k$までの二乗和は$k(v_0+m^2_0) + X^2_k - X^2_0$ になります。よって、分散公式から、$v_1$は、
$$ v_1 = (v_0+m^2_0) + \frac{X^2_k - X^2_0}{k} - m^2_{1} $$
と算出できます。よってΣの計算をせずに各移動分散は、O(1)で求められ、$i$番目のk項移動分散は次のように逐次的に求められます。
$$ v_i = (v_{i-1}+m^2_{i-1}) + \frac{X^2_{i+k-1} - X^2_{i-1}}{k} - m^2_{i} $$
ソースコード
pythonで実装すると、次のようになります。
import numpy as np
def calc_moving_kth_average_and_variance(x_array, k):
m_init = np.mean(x_array[:k]); mean_list = [m_init] #平均の初期値
v_init = np.var(x_array[:k]); var_list = [v_init] #分散の初期値
for i in range(len(x_array)-k): # 更新式
mean_list.append(mean_list[-1] + (-x_array[i] +x_array[i+k])/k)
var_list.append(var_list[-1]+mean_list[-2]**2 +(x_array[i+k]**2 - x_array[i]**2)/k -mean_list[-1]**2)
return mean_list, var_list