LoginSignup
14
22

More than 3 years have passed since last update.

python の numpy で移動平均(running average)と移動標準偏差を簡単に計算したい

Last updated at Posted at 2017-12-29

背景

時系列データの移動平均(running average)や移動標準偏差を計算したい場合で、元のデータと全く同じデータ数で欲しかったり、平均からの差や比などもう少し細かな作業をしたい場合に、python の numpy だけでシンプルに書く方法の紹介です。コードをみればわかるひとは、google Colab のページ を参照ください。

時系列データに単純な移動平均だけをかけるだけなら、もっと楽な方法もあります。

Savitzky-Golay フィルタの使い方の例は、

from scipy.signal import savgol_filter
出力 = savgol_filter(入力データ, ウィンドウ数, フィルタ次数, mode="wrap")

となり、とても簡単。ここでは、データが少し癖があったり、微調整を自分で加えたい人向けに、コードの中身が見える形を心がけた例を示します。

コードの説明

入力データ

入力データは、

14_23_short_filt.qdp
264945396.378 479.531707317 97.8839998677 1.000976563
264945396.428 439.570731707 93.716794889 1.000976563
264945396.478 579.434146341 107.598219756 1.000976563
264945396.528 539.473170732 103.821660114 1.000976563
264945396.578 702.745098039 118.785601919 0.99609375
....

のように、

1 2 3 4
絶対時刻(1) 装置の出力(2) 装置の誤差(3) 不感時間の割合(4)

の書かれている4行のテキストファイルを想定する。

を使った場合はこの形式が多いであろう。

サンプルデータは下記から入手可能である。

サンプルのテキストをプロットすると、

のように、数万点の時系列データが含まれている。

(このデータは、前半で特徴的なブラックホールからのX線の準周期的な時間変動が見えます。パッと見るだけだと、何か構造があることには気がつかないですよね。)

このような場合に、ある時間幅での移動平均や、標準偏差を計算すると、隠された変動に気がつきやすいので、それを簡単にやる方法の紹介です。

使い方

./filterdata.py ファイル名 フィルターをかけたい時間幅 

という使い方である。

標準出力に下記の8つの情報が出される。

1 2 3 4 5 6 7 8
絶対時刻(1) 装置の出力(2) 装置の誤差(3) 不感時間の割合(4) 相対時刻(5) 標準偏差(6) 移動平均(7) (2)-(7)

結果の例

python ./filterdata.py 14_23_short.qdp 5 > 14_23_short_filt.qdp

として実行した。

gnuplot> plot "14_23_short_filt.qdp" u 5:2 w l, "" u 5:7 w l ti "mean", "" u 5:($6)*3 w l ti "std"

でプロットすると、平滑化のご利益がよく分かる。

5秒で平均を求めることで、揺らぎが抑制されて、平均値の変動の様子がキレイにみてとれる。標準偏差も時間が経つにつれて徐々に減っている。よく見ると、平均値と標準偏差に時間的な相関がある。(ブラックホールの N-sigma relation と呼ばれる現象で、明るくなると変動も大きくなることが知られている。)

サンプルコード

get_detrend_spline という関数に、フィルターの時間幅(dt), 相対的な時間の配列(rtime), 装置の出力(det), 装置のエラー(dete)を入れると、フィルターが計算される。オプションの order で polyfit の次元、usemean で単純な平均値を使うことに変更することもできる。

filterdata.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Example of filtering time-series data 
"""

import numpy as np
import sys 

def get_detrend_spline(dt, rtime, det, dete, order = 2, usemean = False):
    """ [input] 
        dt: time duration for filtering, 
        rtime: time (relative)
        det : detector output 
        dete : detector output error
        order : order used for polyfit 
        usemean : polynominla is used (Default). When True, np.mean() is used. 

        [output] all are filterred on a timescale of dt. 
        npstdlong : standard deviation 
        npavelong : mean  
        detrend_sub : det - mean
        detrend_sube : dete (nothing is done)
        detrend_ratio : det / mean
        detrend_ratioe : dete / mean
    """

    dtlong = dt
    rtime = np.array(rtime)
    det = np.array(det)
    dete = np.array(dete)

    stdlong = []
    avelong = []

    for i, delta in enumerate(rtime):

        if delta > 0 :
            tminlong = delta - dtlong
            tmaxlong = delta + dtlong

            localdetlong = det[np.where( (rtime >= tminlong) & (rtime < tmaxlong) )]
            localrtime = rtime[np.where( (rtime >= tminlong) & (rtime < tmaxlong) )]

            if np.alen(localdetlong) > 0:
                coefficients = np.polyfit(localrtime, localdetlong, order)
                polynomial = np.poly1d(coefficients)
                tmpstdlong = np.std(localdetlong)

                if usemean:
                    tmpavelong = np.mean(localdetlong)                    
                else:
                    tmpavelong = polynomial(delta)

            else:
#                print("zero found.  t=" + str(delta) +  " t1=" + str(tminlong) + " t2=" + str(tmaxlong) + " r=" + str(det[I]))
                tmpstdlong = 0.
                tmpavelong = 0.

            stdlong.append(tmpstdlong)
            avelong.append(tmpavelong)
        else:
            stdlong.append(0.)
            avelong.append(0.)

    npstdlong = np.array(stdlong)
    npavelong = np.array(avelong)

    detrend_sub = det - npavelong
    detrend_sube = dete
    detrend_ratio = det / npavelong
    detrend_ratioe = dete / npavelong

    print( " len(rtime) = " + str(len(rtime)) ) 
    print( " len(npavelong) = " + str(len(npavelong)) ) 
    return (npstdlong, npavelong, detrend_sub, detrend_sube, detrend_ratio, detrend_ratioe) 


argvs = sys.argv
argc = len(argvs) 

if (argc != 3):
    print ('\n[ERROR] Usage: # python %s inputfile dt \n' % argvs[0] )
    quit()

fname=argvs[1]
dt=float(argvs[2])


file=open(fname,"r")

time = []
rate = []
ratee = []
frac = []

for i, one in enumerate(file):
    one = one.split()
    time.append(float(one[0]))
    rate.append(float(one[1]))
    ratee.append(float(one[2]))
    frac.append(float(one[3]))

time  = np.array(time)
rate  = np.array(rate)
ratee = np.array(ratee)
frac  = np.array(frac)

rtime = []
for t in time:
    rtime.append(t - time[0])

rtime = np.array(rtime)

npstdlong, npavelong, detrend_sub, detrend_sube, detrend_ratio, detrend_ratioe = get_detrend_spline(dt, rtime, rate, ratee, order = 2, usemean = False)

for x1, x2, x3, x4, rt, std, ave, sub in zip(time, rate, ratee, frac, rtime, npstdlong, npavelong, detrend_sub):
    print(x1, x2, x3, x4, rt, std, ave, sub)

入力ファイルのフォーマットや、出力したい値などはケースバイケースでしょうから、適宜変更ください。

プロットして確認

import matplotlib.pyplot as plt
plt.errorbar(time,y=rate,yerr=ratee, label="raw data")
plt.errorbar(time,y=npavelong, label="running average for dt = " + str(dt), fmt="r.", alpha=0.8)
plt.legend()

などで、プロットしてデータを確認しよう。

スクリーンショット 2020-11-23 18.24.26.png

詳細は、google Colab のページを参照ください。

参考資料

14
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
22