Help us understand the problem. What is going on with this article?

[新機能]:3x3 ソベル画像フィルタ(簡易版)

More than 1 year has passed since last update.

画像処理、とりわけリアルタイムでの処理は FPGA での適用が期待される分野だと思う。GPU では画像の領域分割による高速化が得意であるが、FPGA では次から次へとリアルタイムにやってくる画素を、一部蓄積しながらパイプライン処理をすることで一定のスループットを守るという事が得意となる。

画像処理のお題でよくみかけるソベル・フィルターを Polyphony でも実現してみよう。なお、このプログラムは特定のblob(a24ee8)でのみ動作確認がされている。完全なサポートは次期バージョン(0.3.4)以降となることをお断りしておく。

カーネルは Tuple によって表現している。

K:Tuple[int4] = (-1, 0, 1,
                 -2, 0, 2,
                 -1, 0, 1)

フィルタは関数によって実現している。

def filter3x3(r0, r1, r2, r3, r4, r5, r6, r7, r8):
    a0 = r0 * K[0]
    a1 = r1 * K[1]
    a2 = r2 * K[2]
    a3 = r3 * K[3]
    a4 = r4 * K[4]
    a5 = r5 * K[5]
    a6 = r6 * K[6]
    a7 = r7 * K[7]
    a8 = r8 * K[8]
    s = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + a8
    return normalize(s)

コンパイラが解釈する表現として難しかった点に言及しよう。

            #...一部抜粋...

            if phase == 0:
                line0 = buf0
                line1 = buf1
                line2 = buf2
            elif phase == 1:
                line0 = buf1
                line1 = buf2
                line2 = buf0
            else:
                line0 = buf2
                line1 = buf0
                line2 = buf1

3x3 のフィルタを実現するには、複数のラインを処理する必要があり、そのため、ラインバッファを必要とする。必要なバッファは3ラインであり、それを使いまわす。上記のコードでバッファの使いまわしが可能だ。

このコードの問題は、時間によって line0, line1, line2 が別のバッファにアサインされ、プログラムの意図としてはそれぞれが別のバッファを指す。しかし、明示的にそれを書かれているわけではないので、コンパイラはその関連性を解析しなくてはならない。

厄介なことにループの中で使いまわされているために循環参照しているように見える。SSA を単純に作ることが出来ない。Polyphony は現時点でこの表現に一部対応しているにとどまる。将来的には明示的に表現されたものに対しての対応となるか、あるいは解析して限定的ではあるがいまよりは幅広くコンパイル可能にすることになるだろう。

ソース全体はテストベンチも含めてやや大きめではある。以下、簡易版のソースを掲げる。パイプラインでも動作する完全版は別の機会に紹介したい。

from polyphony import module
from polyphony import is_worker_running
from polyphony.io import Queue
from polyphony.typing import int4, int12, List, Tuple
from polyphony import pipelined


def max(a:int12, b:int12) -> int12:
    return a if a > b else b


def min(a:int12, b:int12) -> int12:
    return a if a < b else b


def normalize(v:int12):
    return min(max(128 + v, 0), 255)


K:Tuple[int4] = (-1, 0, 1,
                 -2, 0, 2,
                 -1, 0, 1)


def filter3x3(r0, r1, r2, r3, r4, r5, r6, r7, r8):
    a0 = r0 * K[0]
    a1 = r1 * K[1]
    a2 = r2 * K[2]
    a3 = r3 * K[3]
    a4 = r4 * K[4]
    a5 = r5 * K[5]
    a6 = r6 * K[6]
    a7 = r7 * K[7]
    a8 = r8 * K[8]
    s = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + a8
    return normalize(s)


@module
class PipelinedStreamFilter:
    def __init__(self, width, height, blank_size):
        self.inq = Queue(int12, 'in', maxsize=8)
        self.outq = Queue(int12, 'out', maxsize=8)
        self.append_worker(self.worker, width, height, blank_size)

    def worker(self, width, height, blank_size):
        buf0:List[int12] = [255] * (width + blank_size)
        buf1:List[int12] = [255] * (width + blank_size)
        buf2:List[int12] = [255] * (width + blank_size)
        line0 = buf0
        line1 = buf1
        line2 = buf2
        phase = 0
        y = blank_size
        r0:int12 = 255
        r1:int12 = 255
        r2:int12 = 255
        r3:int12 = 255
        r4:int12 = 255
        r5:int12 = 255
        r6:int12 = 255
        r7:int12 = 255
        r8:int12 = 255
        while is_worker_running():
            for x in pipelined(range(blank_size, width + blank_size)):
                d2 = self.inq.rd()
                line2[x] = d2
                d1 = line1[x]
                d0 = line0[x]
                r0, r1, r2 = r1, r2, d0
                r3, r4, r5 = r4, r5, d1
                r6, r7, r8 = r7, r8, d2
                out = filter3x3(r0, r1, r2,
                                r3, r4, r5,
                                r6, r7, r8)
                print('out', out)
                self.outq.wr(out)
            phase = (phase + 1) % 3
            if phase == 0:
                line0 = buf0
                line1 = buf1
                line2 = buf2
            elif phase == 1:
                line0 = buf1
                line1 = buf2
                line2 = buf0
            else:
                line0 = buf2
                line1 = buf0
                line2 = buf1
            r0 = r1 = r2 = 255
            r3 = r4 = r5 = 255
            r6 = r7 = r8 = 255
            y += 1
            if y == height + blank_size:
                # TODO:
                break


if __name__ == '__main__':
    filter = PipelinedStreamFilter(512, 512, 2)
from polyphony import testbench
from polyphony import module
from polyphony.io import Port
from polyphony.typing import int12, List, Tuple
from polyphony import rule, pipelined
from sobel_filter import PipelinedStreamFilter as StreamFilter


# NOTE: env.internal_ram_threshold_size = 0 is required for this test.


TEST_DATA:List[int12] = [
    141, 140, 137, 128, 126, 121, 119, 116, 113,
    110, 114, 117, 114, 117, 124, 128, 132, 137,
    141, 143, 143, 148, 152, 146, 148, 145, 143,
    142, 147, 145, 147, 147, 144, 145, 149, 150,
]

EXPECTED_DATA:List[int12] = [
    14, 13, 124, 116, 117, 121, 121, 123, 122,
    0, 0, 127, 104, 106, 124, 125, 126, 125,
    0, 0, 140, 121, 126, 139, 139, 138, 135,
    0, 0, 142, 138, 148, 131, 129, 139, 132
]


W = 9
H = 4


@module
class FilterTester:
    def __init__(self):
        self.start = Port(bool, 'in', protocol='ready_valid')
        self.finish = Port(bool, 'out', protocol='ready_valid')
        self.sobel = StreamFilter(W, H, 2)
        self.append_worker(self.test_source)
        self.append_worker(self.test_sink)

    def test_source(self):
        self.start()
        for y in range(H):
            for x in pipelined(range(W)):
                idx = y * W + x
                p = TEST_DATA[idx]
                print('in', p)
                self.sobel.inq.wr(p)

    def test_sink(self):
        output:List[int12] = [None] * (W * H)
        for y in range(H):
            for x in pipelined(range(W)):
                idx = y * W + x
                out = self.sobel.outq.rd()
                print('out', out)
                output[idx] = out
        for i in range(len(output)):
            if output[i] != EXPECTED_DATA[i]:
                print('error', i, output[i], EXPECTED_DATA[i])
        self.finish(True)


@testbench
def test_stream(tester):
    tester.start.wr(True)
    tester.finish.rd()


if __name__ == '__main__':
    tester = FilterTester()
    test_stream(tester)
Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away