ゼロ位相フィルタリング
ゼロ位相フィルタリングとは、フィルタリングを一度前から後ろへ行い、次に後ろから前へもう一度行うことで、ズレを打ち消します。これにより、下図のように信号の波の位置が元のまま、つまり「位相のずれゼロ」で、ノイズだけだ綺麗に取り除けます。
今回は、このゼロ位相フィルタリングをpythonではなく、Go言語で実装したいと思います。
手動でfiltfilt関数を実装
Go言語ではpythonでのScyPyのように、ゼロ位相フィルタリングのライブラリ等はないため、自作する必要があります。初めからGo言語で実装すると混乱しそうだったので、一度pythonでfiltfilt関数を自作してみました。
SciPyと自作filtfilt関数の比較コード
import numpy as np
import scipy.signal as signal
import matplotlib.pyplot as plt
# サンプリング設定
fs = 500 # サンプリング周波数 [Hz]
t = np.linspace(0, 1, fs, endpoint=False)
# 正弦波 + ノイズの信号
frequency = 5 # 正弦波の周波数 [Hz]
original_signal = np.sin(2 * np.pi * frequency * t)
noise = np.random.normal(0, 0.5, original_signal.shape)
observed_signal = original_signal + noise
# Butterworthローパスフィルタ(10Hz以下通過)
order = 4
cutoff = 10 # カットオフ周波数 [Hz]
b, a = signal.butter(order, cutoff / (0.5 * fs), btype='low')
# 自作のfiltfilt関数
def filtfilt_manual(b, a, x):
padlen = min(3 * max(len(a), len(b)), len(x) - 1)
x_padded = np.concatenate((
2 * x[0] - x[padlen:0:-1],
x,
2 * x[-1] - x[-2:-padlen-2:-1]
))
zi = signal.lfilter_zi(b, a) * x_padded[0]
y_fwd, zf = signal.lfilter(b, a, x_padded, zi=zi)
y_rev, _ = signal.lfilter(b, a, y_fwd[::-1], zi=zf)
return y_rev[::-1][padlen:-padlen]
# フィルタリング
filtered_scipy = signal.filtfilt(b, a, observed_signal)
filtered_manual = filtfilt_manual(b, a, observed_signal)
plt.figure(figsize=(12, 6))
plt.plot(t, observed_signal, label='ノイズ付き信号', alpha=0.5)
plt.plot(t, filtered_scipy, label='filtfilt (SciPy)', linewidth=2)
plt.plot(t, filtered_manual, '--', label='filtfilt (自作)', linewidth=2)
plt.xlabel('時間 [s]', fontsize=16)
plt.ylabel('振幅', fontsize=16)
plt.title('SciPyと自作のfiltfilt関数の比較', fontsize=18)
plt.legend(loc='upper right', fontsize=14)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.grid(True)
plt.tight_layout()
plt.show()
グラフの最後の方以外はほぼ同じような結果が得られています。
Go言語でfiltfilt関数を実装
pythonで自作したfiltfilt関数をGo言語で実装します。
コードまとめ
// 多項式係数を求める関数(Pythonの np.poly に相当)
func poly(roots []complex128) []float64 {
n := len(roots)
coeffs := make([]complex128, n+1)
coeffs[0] = 1
for _, root := range roots {
for j := n; j > 0; j-- {
coeffs[j] -= root * coeffs[j-1]
}
}
// 実数部分のみを返す
realCoeffs := make([]float64, n+1)
for i := range coeffs {
realCoeffs[i] = real(coeffs[i])
}
return realCoeffs
}
// バターワースフィルターの係数
func butterworthCoeff(fs, fc float64, n int) ([]float64, []float64) {
nyquist := fs / 2
normalCutoff := fc / nyquist
// アナログプロトタイプ極の計算
poles := make([]complex128, n)
for k := 1; k <= n; k++ {
angle := math.Pi * float64(2*k+n-1) / float64(2*n)
poles[k-1] = cmplx.Exp(complex(0, angle))
}
// s領域での極を算出
warped := 2 * fs * math.Tan(math.Pi*normalCutoff/2)
for i := range poles {
poles[i] *= complex(warped, 0)
}
// 双一次変換でz領域の極に変換
digitalPoles := make([]complex128, n)
for i, p := range poles {
digitalPoles[i] = (complex(2*fs, 0) + p) / (complex(2*fs, 0) - p)
}
// 分母の係数計算(デジタル極)
a := poly(digitalPoles)
// ゼロ点は全て z = -1
zeros := make([]complex128, n)
for i := range zeros {
zeros[i] = -1
}
b := poly(zeros)
// ゲイン調整(z=1でのゲインを1にする)
sumA := 0.0
sumB := 0.0
for _, coeff := range a {
sumA += coeff
}
for _, coeff := range b {
sumB += coeff
}
gain := sumA / sumB
for i := range b {
b[i] *= gain
}
// fmt.Print("a", a)
// fmt.Print("b", b)
return b, a
}
// lfilterは、初期状態ziを用いてフィルタ処理を行う
func lfilter(b, a, x, zi []float64) (y, zf []float64) {
n := len(x)
order := len(a) - 1
y = make([]float64, n)
z := make([]float64, len(zi))
copy(z, zi)
for i := 0; i < n; i++ {
y[i] = b[0]*x[i] + z[0]
for j := 0; j < order-1; j++ {
z[j] = b[j+1]*x[i] - a[j+1]*y[i] + z[j+1]
}
if order > 0 {
z[order-1] = b[order]*x[i] - a[order]*y[i]
}
}
zf = make([]float64, len(z))
copy(zf, z)
return y, zf
}
// lfilterZiは、フィルタの初期状態を計算する
func lfilterZi(b, a []float64) []float64 {
// 係数正規化(a[0] == 1 が前提)
if a[0] != 1 {
a0 := a[0]
for i := range a {
a[i] /= a0
}
for i := range b {
b[i] /= a0
}
}
m := len(a) - 1 // フィルタの次数
if m <= 0 {
return []float64{}
}
// companion 行列(MATLABスタイル)を m×m 行列 C として構築する
// 1行目: C[0,j] = -a[j+1] (j=0,...,m-1)
// 2行目以降: 下位単位行列(シフト)
C := make([][]float64, m)
for i := 0; i < m; i++ {
C[i] = make([]float64, m)
}
for j := 0; j < m; j++ {
C[0][j] = -a[j+1]
}
for i := 1; i < m; i++ {
C[i][i-1] = 1
}
// companion 行列の転置 A = Cᵀ を構築する
A := make([][]float64, m)
for i := 0; i < m; i++ {
A[i] = make([]float64, m)
for j := 0; j < m; j++ {
A[i][j] = C[j][i]
}
}
// (I - A) を構築する
IminusA := make([][]float64, m)
for i := 0; i < m; i++ {
IminusA[i] = make([]float64, m)
for j := 0; j < m; j++ {
if i == j {
IminusA[i][j] = 1 - A[i][j]
} else {
IminusA[i][j] = -A[i][j]
}
}
}
// 右辺ベクトル Bvec = b[1:] - a[1:]*b[0] を作成
Bvec := make([]float64, m)
for i := 0; i < m; i++ {
Bvec[i] = b[i+1] - a[i+1]*b[0]
}
// 連立方程式 (I - A)*zi = Bvec を解く
zi := solveLinearSystem(IminusA, Bvec)
// fmt.Println("signal.lfilter_zi", zi)
return zi
}
// solveLinearSystemは、LU分解を用いて小規模な線形系を解く
func solveLinearSystem(A [][]float64, B []float64) []float64 {
n := len(B)
aData := make([]float64, n*n)
bData := make([]float64, n)
// 2D スライス -> 1D スライス(Gonum の行列形式に適応)
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
aData[i*n+j] = A[i][j]
}
bData[i] = B[i]
}
A_mat := mat.NewDense(n, n, aData)
B_vec := mat.NewVecDense(n, bData)
x := mat.NewVecDense(n, nil)
// LU 分解を用いた線形方程式の解法
err := x.SolveVec(A_mat, B_vec)
if err != nil {
fmt.Println("Error solving system:", err)
return nil
}
return x.RawVector().Data
}
// reverseSlice は float64 スライスを反転して返します
func reverseSlice(x []float64) []float64 {
n := len(x)
y := make([]float64, n)
for i, v := range x {
y[n-1-i] = v
}
return y
}
// minは整数の小さい方を返す
func min(a, b int) int {
if a < b {
return a
}
return b
}
// maxは整数の大きい方を返す
func max(a, b int) int {
if a > b {
return a
}
return b
}
// filtfiltManual は、x の奇数対称パディング、
// 初期状態計算、順方向フィルタ、逆方向フィルタを適用し、
// パディング部分を除去してフィルタ済み信号を返します。
func filtfiltManual(b, a, x []float64) []float64 {
// パディング長の決定
padLen := min(3*max(len(a), len(b)), len(x)-1)
// 奇数対称パディングの作成
// 左側パディング:2*x[0] - x[padLen:0:-1]
// → Python では x[padLen:0:-1] は [x[padLen], x[padLen-1], …, x[1]] となる
left := make([]float64, padLen)
for i := 0; i < padLen; i++ {
left[i] = 2*x[0] - x[padLen-i]
}
// 右側パディング:2*x[-1] - x[-2:-padLen-2:-1]
// → x[-2:-padLen-2:-1] は [x[n-2], x[n-3], …, x[n-padLen-1]] (n = len(x))
right := make([]float64, padLen)
n := len(x)
for i := 0; i < padLen; i++ {
right[i] = 2*x[n-1] - x[n-2-i]
}
// パディングした信号 xPadded を作成
xPadded := append(left, x...)
xPadded = append(xPadded, right...)
// 初期状態の計算
// ここでは以前作成した lfilterZi(b, a) を使い、さらに xPadded[0] を掛ける
zi := lfilterZi(b, a)
for i := range zi {
zi[i] *= xPadded[0]
}
// fmt.Println("zi:", zi)
// 順方向フィルタ適用
// lfilter 関数は、与えた初期状態 zi を使ってフィルタ処理を行い、
// 出力 y と最終状態 zf を返すものとする
yFwd, zf := lfilter(b, a, xPadded, zi)
// 逆方向フィルタ適用
// まず順方向の出力 yFwd を反転し、初期状態として先の最終状態 zf を使いフィルタ処理
yFwdRev := reverseSlice(yFwd)
yRev, _ := lfilter(b, a, yFwdRev, zf)
yRev = reverseSlice(yRev)
// 最後に、パディング部分(先頭 padLen および末尾 padLen)を除去して返す
result := yRev[padLen : len(yRev)-padLen]
return result
}
poly関数
多項式の根から多項式の係数を求める関数です。極やゼロ点の情報から多項式係数を得るために使います。
- 入力:多項式の根(複素数の配列)
- 出力:多項式の係数(実数の配列)
func poly(roots []complex128) []float64 {
n := len(roots)
coeffs := make([]complex128, n+1)
coeffs[0] = 1
for _, root := range roots {
for j := n; j > 0; j-- {
coeffs[j] -= root * coeffs[j-1]
}
}
// 実数部分のみを返す
realCoeffs := make([]float64, n+1)
for i := range coeffs {
realCoeffs[i] = real(coeffs[i])
}
return realCoeffs
}
butterworthCoeff 関数
バターワースフィルターの係数を計算します。デジタル信号処理で良く使うバターワースフィルターを設計するために使います。
- 入力:
- サンプリング周波数(fs)
- カットオフ周波数 (fc)
- フィルターの次数 (n)
- 出力
- フィルター係数(分子 b, 分母 a)
// バターワースフィルターの係数
func butterworthCoeff(fs, fc float64, n int) ([]float64, []float64) {
nyquist := fs / 2
normalCutoff := fc / nyquist
// アナログプロトタイプ極の計算
poles := make([]complex128, n)
for k := 1; k <= n; k++ {
angle := math.Pi * float64(2*k+n-1) / float64(2*n)
poles[k-1] = cmplx.Exp(complex(0, angle))
}
// s領域での極を算出
warped := 2 * fs * math.Tan(math.Pi*normalCutoff/2)
for i := range poles {
poles[i] *= complex(warped, 0)
}
// 双一次変換でz領域の極に変換
digitalPoles := make([]complex128, n)
for i, p := range poles {
digitalPoles[i] = (complex(2*fs, 0) + p) / (complex(2*fs, 0) - p)
}
// 分母の係数計算(デジタル極)
a := poly(digitalPoles)
// ゼロ点は全て z = -1
zeros := make([]complex128, n)
for i := range zeros {
zeros[i] = -1
}
b := poly(zeros)
// ゲイン調整(z=1でのゲインを1にする)
sumA := 0.0
sumB := 0.0
for _, coeff := range a {
sumA += coeff
}
for _, coeff := range b {
sumB += coeff
}
gain := sumA / sumB
for i := range b {
b[i] *= gain
}
return b, a
}
lfilter 関数
デジタル信号に対して線形フィルター処理を行います。一般的なフィルター処理に使います。
- 入力
- フィルターの分子係数(a)と分母係数(b)
- 入力信号(x)
- 初期状態(zi)
- 出力
- フィルター処理された信号(y)
- フィルター処理後の最終状態(zf)
// lfilterは、初期状態ziを用いてフィルタ処理を行う
func lfilter(b, a, x, zi []float64) (y, zf []float64) {
n := len(x)
order := len(a) - 1
y = make([]float64, n)
z := make([]float64, len(zi))
copy(z, zi)
for i := 0; i < n; i++ {
y[i] = b[0]*x[i] + z[0]
for j := 0; j < order-1; j++ {
z[j] = b[j+1]*x[i] - a[j+1]*y[i] + z[j+1]
}
if order > 0 {
z[order-1] = b[order]*x[i] - a[order]*y[i]
}
}
zf = make([]float64, len(z))
copy(zf, z)
return y, zf
}
lfilterZi 関数
フィルターを適用する際に安定した初期状態を計算します。フィルターを適用する際、開始時の過渡応答を抑えるために必要です。
- 入力
- フィルター係数(b, a)
- 出力
- 安定な初期状態(zi)
// lfilterZiは、フィルタの初期状態を計算する
func lfilterZi(b, a []float64) []float64 {
// 係数正規化(a[0] == 1 が前提)
if a[0] != 1 {
a0 := a[0]
for i := range a {
a[i] /= a0
}
for i := range b {
b[i] /= a0
}
}
m := len(a) - 1 // フィルタの次数
if m <= 0 {
return []float64{}
}
// companion 行列(MATLABスタイル)を m×m 行列 C として構築する
// 1行目: C[0,j] = -a[j+1] (j=0,...,m-1)
// 2行目以降: 下位単位行列(シフト)
C := make([][]float64, m)
for i := 0; i < m; i++ {
C[i] = make([]float64, m)
}
for j := 0; j < m; j++ {
C[0][j] = -a[j+1]
}
for i := 1; i < m; i++ {
C[i][i-1] = 1
}
// companion 行列の転置 A = Cᵀ を構築する
A := make([][]float64, m)
for i := 0; i < m; i++ {
A[i] = make([]float64, m)
for j := 0; j < m; j++ {
A[i][j] = C[j][i]
}
}
// (I - A) を構築する
IminusA := make([][]float64, m)
for i := 0; i < m; i++ {
IminusA[i] = make([]float64, m)
for j := 0; j < m; j++ {
if i == j {
IminusA[i][j] = 1 - A[i][j]
} else {
IminusA[i][j] = -A[i][j]
}
}
}
// 右辺ベクトル Bvec = b[1:] - a[1:]*b[0] を作成
Bvec := make([]float64, m)
for i := 0; i < m; i++ {
Bvec[i] = b[i+1] - a[i+1]*b[0]
}
// 連立方程式 (I - A)*zi = Bvec を解く
zi := solveLinearSystem(IminusA, Bvec)
// fmt.Println("signal.lfilter_zi", zi)
return zi
}
solveLinearSystem 関数
連立一次方程式を解きます。lfilterZi内で初期状態を求めるために利用します。
- 入力
- 係数行列(A)
- 定数ベクトル(B)
- 出力
- 方程式の解(ベクトル)
// solveLinearSystemは、LU分解を用いて小規模な線形系を解く
func solveLinearSystem(A [][]float64, B []float64) []float64 {
n := len(B)
aData := make([]float64, n*n)
bData := make([]float64, n)
// 2D スライス -> 1D スライス(Gonum の行列形式に適応)
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
aData[i*n+j] = A[i][j]
}
bData[i] = B[i]
}
A_mat := mat.NewDense(n, n, aData)
B_vec := mat.NewVecDense(n, bData)
x := mat.NewVecDense(n, nil)
// LU 分解を用いた線形方程式の解法
err := x.SolveVec(A_mat, B_vec)
if err != nil {
fmt.Println("Error solving system:", err)
return nil
}
return x.RawVector().Data
}
reverseSlice, min, max 関数
小さな補助関数群です。
- reverseSlice:スライスを逆順にする
- min, max:二つの整数の大小を返す
// reverseSlice は float64 スライスを反転して返します
func reverseSlice(x []float64) []float64 {
n := len(x)
y := make([]float64, n)
for i, v := range x {
y[n-1-i] = v
}
return y
}
// minは整数の小さい方を返す
func min(a, b int) int {
if a < b {
return a
}
return b
}
// maxは整数の大きい方を返す
func max(a, b int) int {
if a > b {
return a
}
return b
}
filtfiltManual 関数
入力信号をゼロ位相フィルター処理します。
// filtfiltManual は、x の奇数対称パディング、
// 初期状態計算、順方向フィルタ、逆方向フィルタを適用し、
// パディング部分を除去してフィルタ済み信号を返します。
func filtfiltManual(b, a, x []float64) []float64 {
// パディング長の決定
padLen := min(3*max(len(a), len(b)), len(x)-1)
// 奇数対称パディングの作成
// 左側パディング:2*x[0] - x[padLen:0:-1]
// → Python では x[padLen:0:-1] は [x[padLen], x[padLen-1], …, x[1]] となる
left := make([]float64, padLen)
for i := 0; i < padLen; i++ {
left[i] = 2*x[0] - x[padLen-i]
}
// 右側パディング:2*x[-1] - x[-2:-padLen-2:-1]
// → x[-2:-padLen-2:-1] は [x[n-2], x[n-3], …, x[n-padLen-1]] (n = len(x))
right := make([]float64, padLen)
n := len(x)
for i := 0; i < padLen; i++ {
right[i] = 2*x[n-1] - x[n-2-i]
}
// パディングした信号 xPadded を作成
xPadded := append(left, x...)
xPadded = append(xPadded, right...)
// 初期状態の計算
// ここでは以前作成した lfilterZi(b, a) を使い、さらに xPadded[0] を掛ける
zi := lfilterZi(b, a)
for i := range zi {
zi[i] *= xPadded[0]
}
// fmt.Println("zi:", zi)
// 順方向フィルタ適用
// lfilter 関数は、与えた初期状態 zi を使ってフィルタ処理を行い、
// 出力 y と最終状態 zf を返すものとする
yFwd, zf := lfilter(b, a, xPadded, zi)
// 逆方向フィルタ適用
// まず順方向の出力 yFwd を反転し、初期状態として先の最終状態 zf を使いフィルタ処理
yFwdRev := reverseSlice(yFwd)
yRev, _ := lfilter(b, a, yFwdRev, zf)
yRev = reverseSlice(yRev)
// 最後に、パディング部分(先頭 padLen および末尾 padLen)を除去して返す
result := yRev[padLen : len(yRev)-padLen]
return result
}
まとめ
Go言語で実装してみると、改めてpythonライブラリの偉大さを感じました。どう考えてもゼロ位相フィルタリングをGo言語で実装するのは面倒なので、皆さんはpythonで実装しましょう。