はじめに
入力信号に対し、あるフィルタサイズでmax/minをとっていくような処理が必要なケースがある。一番のユースケースは、画像処理におけるdilation(膨張)やerosion(収縮)処理である。個人的には、昔、ある幅を持ったnon-maximum suppression処理を行うために実装したことがある。
この処理は、ナイーブに実装すると信号長×フィルタサイズのオーダーの計算量が必要となるが、信号長のオーダーの計算量に抑えることができる「van Herk/Gil-Wermanアルゴリズム」というものが存在し、なるほどーという感じなので紹介する。
なお、長い信号に対してあるフィルタサイズのmin/maxをずっとかけていった際に、計算量がフィルタサイズに依存しないようにできるということなので、1回のmin/maxが定数オーダーということではない。
ちなみにこのアルゴリズムは、文献1 2でほぼ同時に発表されているらしく、"priority dispute" があるらしい。
van Herk/Gil-Wermanアルゴリズム
http://www.leptonica.com/grayscale-morphology.html
上記サイトを参考にvan Herk/Gil-Wermanアルゴリズムを説明する。図も上記より引用しているが、添字が間違っている箇所を修正した。
本アルゴリズムへの入力は、
$I$: 入力信号
$L$: フィルタサイズ(奇数)
となる。出力は入力信号と同じ長さのmin/maxフィルタがかけられた信号$I'$である。
下記では、maxのケースでの説明を行う。

このアルゴリズムでは、入力信号はフィルタサイズ$L$毎に処理が行われる。ここでは上図のように一番最初の処理単位で説明を行う。
この$L$毎の処理では、まずウィンドウの中心から前後$L-1$までmaxをとっていった長さ$2L-1$の配列$A$を準備する。このため、前後に$(L-1)/2$のマージンが必要となる。
すなわち、ウィンドウの中心は$A[L-1] = I[L-1]$であり、
$A[k] = max(I[k], A[k+1])$($k$ = $L-2, L-3, ..., 0$)
$A[k] = max(A[k-1], I[k])$($k$ = $L, L+1, ..., 2L-2$)
と逐次的に計算できる。

この配列$A$ができれば出力$I'$を求めるのは簡単で、$I'[J] = max(A[J - (L-1)/2], A[J + (L-1)/2])$となる。これは、ウィンドウ幅$L$内のmaxを、$I[L-1]$の前後に分割して行った後、統合していることになる。
実装
Pythonで実装する意義はないが、アルゴリズムの確認まで。
import numpy as np
import time
def main():
L = 7
sample_cnt = L * 50000 - 1
I = np.random.random(sample_cnt)
A = np.zeros(L * 2 - 1, dtype=np.float)
N = (sample_cnt - L + 1) // L
result1 = np.zeros(sample_cnt, dtype=np.float)
result2 = np.zeros(sample_cnt, dtype=np.float)
# van Herk/Gil-Wermanアルゴリズム
start_time = time.time()
for i in range(N):
center = (i + 1) * L - 1
start = center - L // 2
A[L - 1] = I[center]
for j in range(1, L):
A[L - 1 - j] = max(A[L - j], I[center - j])
A[L - 1 + j] = max(A[L - 2 + j], I[center + j])
for j in range(L):
result1[start + j] = max(A[j], A[j + L - 1])
print("{} [s] elapsed".format(time.time() - start_time))
# naive実装
start_time = time.time()
for i in range(L // 2, sample_cnt - L // 2):
result2[i] = I[i - L // 2: i + L // 2 + 1].max()
print("{} [s] elapsed".format(time.time() - start_time))
if __name__ == '__main__':
main()
0.6796669960021973 [s] elapsed
0.9065239429473877 [s] elapsed
ナイーブ実装よりは早い。
オチ↓
from scipy.ndimage import grey_dilation
# 略
start_time = time.time()
result3 = grey_dilation(I, size=(L,))
print("{} [s] elapsed".format(time.time() - start_time))
とすると、
0.009512186050415039 [s] elapsed
となる。