画像処理、とりわけリアルタイムでの処理は FPGA での適用が期待される分野だと思う。GPU では画像の領域分割による高速化が得意であるが、FPGA では次から次へとリアルタイムにやってくる画素を、一部蓄積しながらパイプライン処理をすることで一定のスループットを守るという事が得意となる。
画像処理のお題でよくみかけるソベル・フィルターを Polyphony でも実現してみよう。なお、このプログラムは特定のblob(a24ee8)でのみ動作確認がされている。完全なサポートは次期バージョン(0.3.4)以降となることをお断りしておく。
カーネルは Tuple によって表現している。
K:Tuple[int4] = (-1, 0, 1,
-2, 0, 2,
-1, 0, 1)
フィルタは関数によって実現している。
def filter3x3(r0, r1, r2, r3, r4, r5, r6, r7, r8):
a0 = r0 * K[0]
a1 = r1 * K[1]
a2 = r2 * K[2]
a3 = r3 * K[3]
a4 = r4 * K[4]
a5 = r5 * K[5]
a6 = r6 * K[6]
a7 = r7 * K[7]
a8 = r8 * K[8]
s = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + a8
return normalize(s)
コンパイラが解釈する表現として難しかった点に言及しよう。
#...一部抜粋...
if phase == 0:
line0 = buf0
line1 = buf1
line2 = buf2
elif phase == 1:
line0 = buf1
line1 = buf2
line2 = buf0
else:
line0 = buf2
line1 = buf0
line2 = buf1
3x3 のフィルタを実現するには、複数のラインを処理する必要があり、そのため、ラインバッファを必要とする。必要なバッファは3ラインであり、それを使いまわす。上記のコードでバッファの使いまわしが可能だ。
このコードの問題は、時間によって line0, line1, line2 が別のバッファにアサインされ、プログラムの意図としてはそれぞれが別のバッファを指す。しかし、明示的にそれを書かれているわけではないので、コンパイラはその関連性を解析しなくてはならない。
厄介なことにループの中で使いまわされているために循環参照しているように見える。SSA を単純に作ることが出来ない。Polyphony は現時点でこの表現に一部対応しているにとどまる。将来的には明示的に表現されたものに対しての対応となるか、あるいは解析して限定的ではあるがいまよりは幅広くコンパイル可能にすることになるだろう。
ソース全体はテストベンチも含めてやや大きめではある。以下、簡易版のソースを掲げる。パイプラインでも動作する完全版は別の機会に紹介したい。
from polyphony import module
from polyphony import is_worker_running
from polyphony.io import Queue
from polyphony.typing import int4, int12, List, Tuple
from polyphony import pipelined
def max(a:int12, b:int12) -> int12:
return a if a > b else b
def min(a:int12, b:int12) -> int12:
return a if a < b else b
def normalize(v:int12):
return min(max(128 + v, 0), 255)
K:Tuple[int4] = (-1, 0, 1,
-2, 0, 2,
-1, 0, 1)
def filter3x3(r0, r1, r2, r3, r4, r5, r6, r7, r8):
a0 = r0 * K[0]
a1 = r1 * K[1]
a2 = r2 * K[2]
a3 = r3 * K[3]
a4 = r4 * K[4]
a5 = r5 * K[5]
a6 = r6 * K[6]
a7 = r7 * K[7]
a8 = r8 * K[8]
s = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + a8
return normalize(s)
@module
class PipelinedStreamFilter:
def __init__(self, width, height, blank_size):
self.inq = Queue(int12, 'in', maxsize=8)
self.outq = Queue(int12, 'out', maxsize=8)
self.append_worker(self.worker, width, height, blank_size)
def worker(self, width, height, blank_size):
buf0:List[int12] = [255] * (width + blank_size)
buf1:List[int12] = [255] * (width + blank_size)
buf2:List[int12] = [255] * (width + blank_size)
line0 = buf0
line1 = buf1
line2 = buf2
phase = 0
y = blank_size
r0:int12 = 255
r1:int12 = 255
r2:int12 = 255
r3:int12 = 255
r4:int12 = 255
r5:int12 = 255
r6:int12 = 255
r7:int12 = 255
r8:int12 = 255
while is_worker_running():
for x in pipelined(range(blank_size, width + blank_size)):
d2 = self.inq.rd()
line2[x] = d2
d1 = line1[x]
d0 = line0[x]
r0, r1, r2 = r1, r2, d0
r3, r4, r5 = r4, r5, d1
r6, r7, r8 = r7, r8, d2
out = filter3x3(r0, r1, r2,
r3, r4, r5,
r6, r7, r8)
print('out', out)
self.outq.wr(out)
phase = (phase + 1) % 3
if phase == 0:
line0 = buf0
line1 = buf1
line2 = buf2
elif phase == 1:
line0 = buf1
line1 = buf2
line2 = buf0
else:
line0 = buf2
line1 = buf0
line2 = buf1
r0 = r1 = r2 = 255
r3 = r4 = r5 = 255
r6 = r7 = r8 = 255
y += 1
if y == height + blank_size:
# TODO:
break
if __name__ == '__main__':
filter = PipelinedStreamFilter(512, 512, 2)
from polyphony import testbench
from polyphony import module
from polyphony.io import Port
from polyphony.typing import int12, List, Tuple
from polyphony import rule, pipelined
from sobel_filter import PipelinedStreamFilter as StreamFilter
# NOTE: env.internal_ram_threshold_size = 0 is required for this test.
TEST_DATA:List[int12] = [
141, 140, 137, 128, 126, 121, 119, 116, 113,
110, 114, 117, 114, 117, 124, 128, 132, 137,
141, 143, 143, 148, 152, 146, 148, 145, 143,
142, 147, 145, 147, 147, 144, 145, 149, 150,
]
EXPECTED_DATA:List[int12] = [
14, 13, 124, 116, 117, 121, 121, 123, 122,
0, 0, 127, 104, 106, 124, 125, 126, 125,
0, 0, 140, 121, 126, 139, 139, 138, 135,
0, 0, 142, 138, 148, 131, 129, 139, 132
]
W = 9
H = 4
@module
class FilterTester:
def __init__(self):
self.start = Port(bool, 'in', protocol='ready_valid')
self.finish = Port(bool, 'out', protocol='ready_valid')
self.sobel = StreamFilter(W, H, 2)
self.append_worker(self.test_source)
self.append_worker(self.test_sink)
def test_source(self):
self.start()
for y in range(H):
for x in pipelined(range(W)):
idx = y * W + x
p = TEST_DATA[idx]
print('in', p)
self.sobel.inq.wr(p)
def test_sink(self):
output:List[int12] = [None] * (W * H)
for y in range(H):
for x in pipelined(range(W)):
idx = y * W + x
out = self.sobel.outq.rd()
print('out', out)
output[idx] = out
for i in range(len(output)):
if output[i] != EXPECTED_DATA[i]:
print('error', i, output[i], EXPECTED_DATA[i])
self.finish(True)
@testbench
def test_stream(tester):
tester.start.wr(True)
tester.finish.rd()
if __name__ == '__main__':
tester = FilterTester()
test_stream(tester)