2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Polyphony にまつわる 25 の話題Advent Calendar 2017

Day 17

[新機能]:3x3 ソベル画像フィルタ(簡易版)

Last updated at Posted at 2017-12-16

画像処理、とりわけリアルタイムでの処理は FPGA での適用が期待される分野だと思う。GPU では画像の領域分割による高速化が得意であるが、FPGA では次から次へとリアルタイムにやってくる画素を、一部蓄積しながらパイプライン処理をすることで一定のスループットを守るという事が得意となる。

画像処理のお題でよくみかけるソベル・フィルターを Polyphony でも実現してみよう。なお、このプログラムは特定のblob(a24ee8)でのみ動作確認がされている。完全なサポートは次期バージョン(0.3.4)以降となることをお断りしておく。

カーネルは Tuple によって表現している。

K:Tuple[int4] = (-1, 0, 1,
                 -2, 0, 2,
                 -1, 0, 1)

フィルタは関数によって実現している。

def filter3x3(r0, r1, r2, r3, r4, r5, r6, r7, r8):
    a0 = r0 * K[0]
    a1 = r1 * K[1]
    a2 = r2 * K[2]
    a3 = r3 * K[3]
    a4 = r4 * K[4]
    a5 = r5 * K[5]
    a6 = r6 * K[6]
    a7 = r7 * K[7]
    a8 = r8 * K[8]
    s = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + a8
    return normalize(s)

コンパイラが解釈する表現として難しかった点に言及しよう。

            #...一部抜粋...

            if phase == 0:
                line0 = buf0
                line1 = buf1
                line2 = buf2
            elif phase == 1:
                line0 = buf1
                line1 = buf2
                line2 = buf0
            else:
                line0 = buf2
                line1 = buf0
                line2 = buf1

3x3 のフィルタを実現するには、複数のラインを処理する必要があり、そのため、ラインバッファを必要とする。必要なバッファは3ラインであり、それを使いまわす。上記のコードでバッファの使いまわしが可能だ。

このコードの問題は、時間によって line0, line1, line2 が別のバッファにアサインされ、プログラムの意図としてはそれぞれが別のバッファを指す。しかし、明示的にそれを書かれているわけではないので、コンパイラはその関連性を解析しなくてはならない。

厄介なことにループの中で使いまわされているために循環参照しているように見える。SSA を単純に作ることが出来ない。Polyphony は現時点でこの表現に一部対応しているにとどまる。将来的には明示的に表現されたものに対しての対応となるか、あるいは解析して限定的ではあるがいまよりは幅広くコンパイル可能にすることになるだろう。

ソース全体はテストベンチも含めてやや大きめではある。以下、簡易版のソースを掲げる。パイプラインでも動作する完全版は別の機会に紹介したい。

from polyphony import module
from polyphony import is_worker_running
from polyphony.io import Queue
from polyphony.typing import int4, int12, List, Tuple
from polyphony import pipelined


def max(a:int12, b:int12) -> int12:
    return a if a > b else b


def min(a:int12, b:int12) -> int12:
    return a if a < b else b


def normalize(v:int12):
    return min(max(128 + v, 0), 255)


K:Tuple[int4] = (-1, 0, 1,
                 -2, 0, 2,
                 -1, 0, 1)


def filter3x3(r0, r1, r2, r3, r4, r5, r6, r7, r8):
    a0 = r0 * K[0]
    a1 = r1 * K[1]
    a2 = r2 * K[2]
    a3 = r3 * K[3]
    a4 = r4 * K[4]
    a5 = r5 * K[5]
    a6 = r6 * K[6]
    a7 = r7 * K[7]
    a8 = r8 * K[8]
    s = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + a8
    return normalize(s)


@module
class PipelinedStreamFilter:
    def __init__(self, width, height, blank_size):
        self.inq = Queue(int12, 'in', maxsize=8)
        self.outq = Queue(int12, 'out', maxsize=8)
        self.append_worker(self.worker, width, height, blank_size)

    def worker(self, width, height, blank_size):
        buf0:List[int12] = [255] * (width + blank_size)
        buf1:List[int12] = [255] * (width + blank_size)
        buf2:List[int12] = [255] * (width + blank_size)
        line0 = buf0
        line1 = buf1
        line2 = buf2
        phase = 0
        y = blank_size
        r0:int12 = 255
        r1:int12 = 255
        r2:int12 = 255
        r3:int12 = 255
        r4:int12 = 255
        r5:int12 = 255
        r6:int12 = 255
        r7:int12 = 255
        r8:int12 = 255
        while is_worker_running():
            for x in pipelined(range(blank_size, width + blank_size)):
                d2 = self.inq.rd()
                line2[x] = d2
                d1 = line1[x]
                d0 = line0[x]
                r0, r1, r2 = r1, r2, d0
                r3, r4, r5 = r4, r5, d1
                r6, r7, r8 = r7, r8, d2
                out = filter3x3(r0, r1, r2,
                                r3, r4, r5,
                                r6, r7, r8)
                print('out', out)
                self.outq.wr(out)
            phase = (phase + 1) % 3
            if phase == 0:
                line0 = buf0
                line1 = buf1
                line2 = buf2
            elif phase == 1:
                line0 = buf1
                line1 = buf2
                line2 = buf0
            else:
                line0 = buf2
                line1 = buf0
                line2 = buf1
            r0 = r1 = r2 = 255
            r3 = r4 = r5 = 255
            r6 = r7 = r8 = 255
            y += 1
            if y == height + blank_size:
                # TODO:
                break


if __name__ == '__main__':
    filter = PipelinedStreamFilter(512, 512, 2)
from polyphony import testbench
from polyphony import module
from polyphony.io import Port
from polyphony.typing import int12, List, Tuple
from polyphony import rule, pipelined
from sobel_filter import PipelinedStreamFilter as StreamFilter


# NOTE: env.internal_ram_threshold_size = 0 is required for this test.


TEST_DATA:List[int12] = [
    141, 140, 137, 128, 126, 121, 119, 116, 113,
    110, 114, 117, 114, 117, 124, 128, 132, 137,
    141, 143, 143, 148, 152, 146, 148, 145, 143,
    142, 147, 145, 147, 147, 144, 145, 149, 150,
]

EXPECTED_DATA:List[int12] = [
    14, 13, 124, 116, 117, 121, 121, 123, 122,
    0, 0, 127, 104, 106, 124, 125, 126, 125,
    0, 0, 140, 121, 126, 139, 139, 138, 135,
    0, 0, 142, 138, 148, 131, 129, 139, 132
]


W = 9
H = 4


@module
class FilterTester:
    def __init__(self):
        self.start = Port(bool, 'in', protocol='ready_valid')
        self.finish = Port(bool, 'out', protocol='ready_valid')
        self.sobel = StreamFilter(W, H, 2)
        self.append_worker(self.test_source)
        self.append_worker(self.test_sink)

    def test_source(self):
        self.start()
        for y in range(H):
            for x in pipelined(range(W)):
                idx = y * W + x
                p = TEST_DATA[idx]
                print('in', p)
                self.sobel.inq.wr(p)

    def test_sink(self):
        output:List[int12] = [None] * (W * H)
        for y in range(H):
            for x in pipelined(range(W)):
                idx = y * W + x
                out = self.sobel.outq.rd()
                print('out', out)
                output[idx] = out
        for i in range(len(output)):
            if output[i] != EXPECTED_DATA[i]:
                print('error', i, output[i], EXPECTED_DATA[i])
        self.finish(True)


@testbench
def test_stream(tester):
    tester.start.wr(True)
    tester.finish.rd()


if __name__ == '__main__':
    tester = FilterTester()
    test_stream(tester)
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?