LoginSignup
2
1

【Scoped Error Diffusion】シンプルにして14倍早くした

Last updated at Posted at 2024-05-05

初めに

Qiita初心者です。至らぬ点があるかと思いますが、コメント欄で指摘してくだされば幸いです。
また、筆者はいろいろとポンコツです。前回の記事を見てくださればわかると思うのですが、ミスだと思ったものがミスじゃなかったりしています。

前回の記事 Scoped Error Diffusionという方法が初めて出した記事です。

実装

論よりコードということで先にコードを載せます。

コード全体 (前回と同様めっちゃ長いです)
import numpy as np
import typing

from tqdm import tqdm
from matplotlib import pyplot as plt
import pandas as pd


# Define activation functions
class Activate_Function:
    def __call__(
        self,
        x: np.ndarray[typing.Any, np.dtype[np.float64]],
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]: ...

    def derivative(
        self,
        x: np.ndarray[typing.Any, np.dtype[np.float64]],
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]: ...


class sigmoid(Activate_Function):
    def __init__(self, u0: float = 0.4):
        self.u0 = u0

    def __call__(
        self,
        x: np.ndarray[typing.Any, np.dtype[np.float64]],
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        x *= 2 / self.u0
        return np.exp(np.minimum(x, 0)) / (1 + np.exp(-np.abs(x)))

    def derivative(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return x * (1 - x)


class ReLU(Activate_Function):
    def __call__(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]], u0: float = 0.4
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return np.maximum(0, x)

    def derivative(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return (x > 0) * 1  # type: ignore


# Define predict function
class Linear:
    def __init__(self, in_: int, out_: int):
        self.weight = np.random.rand(in_, out_)

    def __call__(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return self.forward(x)

    def forward(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        self.input = x.copy()
        self.output = x @ self.weight
        return self.output


class ED_Layer:
    def __init__(
        self,
        in_: int,
        out_: int,
        alpha: float = 0.8,
        activate_function: Activate_Function = sigmoid(),
    ):
        self.in_ = in_  # Input count
        self.out_ = out_  # Output Count
        self.alpha = alpha  # Learning rate
        self.activate_function: Activate_Function = activate_function
        self.pospos = Linear(in_, out_)
        self.negpos = Linear(in_, out_)
        self.posneg = Linear(in_, out_)
        self.negneg = Linear(in_, out_)

    def __call__(
        self,
        pos_array: np.ndarray[typing.Any, np.dtype[np.float64]],
        neg_array: np.ndarray[typing.Any, np.dtype[np.float64]],
    ):
        return self.forward(pos_array, neg_array)

    def forward(
        self,
        pos_vec: np.ndarray[typing.Any, np.dtype[np.float64]],
        neg_vec: np.ndarray[typing.Any, np.dtype[np.float64]],
    ):
        self.postive = self.activate_function(
            self.pospos(pos_vec) - self.negpos(neg_vec)
        )
        self.negative = self.activate_function(
            self.negneg(neg_vec) - self.posneg(pos_vec)
        )
        return self.postive, self.negative

    def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
        for operator, linear, output in (
            (1, self.pospos, self.postive),
            (-1, self.negpos, self.postive),
            (1, self.posneg, self.negative),
            (-1, self.negneg, self.negative),
        ):
            diff_weight: np.ndarray[typing.Any, np.dtype[np.float64]] = np.einsum(  # type: ignore
                "b,bo,bi->bio",
                loss_data.mean(1),
                self.activate_function.derivative(output),
                linear.input,
            )
            linear.weight += operator * self.alpha * diff_weight.mean(0)


class ED_OutputLayer(ED_Layer):
    def __init__(
        self,
        in_: int,
        out_: int,
        alpha: float = 0.8,
        activate_function: Activate_Function = sigmoid(),
    ):
        super().__init__(in_, out_, alpha, activate_function)

    def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
        for operator, linear, output in (
            (1, self.pospos, self.postive),
            (-1, self.negpos, self.postive),
            (1, self.posneg, self.negative),
            (-1, self.negneg, self.negative),
        ):
            diff_weight: np.ndarray[typing.Any, np.dtype[np.float64]] = np.einsum(  # type: ignore
                "bo,bo,bi->bio",
                loss_data,
                self.activate_function.derivative(output),
                linear.input,
            )
            linear.weight += operator * self.alpha * diff_weight.mean(0)


class ED:
    def __init__(
        self,
        in_: int,
        out_: int,
        hidden_width: int,
        hidden_depth: int = 1,
        alpha: float = 0.8,
        activate_function: Activate_Function = sigmoid(),
    ):
        self.layers: list[list[list[ED_Layer]]] = [
            [
                [
                    ED_Layer(
                        in_, hidden_width, alpha, activate_function=activate_function
                    ),
                    ED_OutputLayer(
                        hidden_width, hidden_width, activate_function=activate_function
                    ),
                ]
                for _ in range(out_)
            ],
            *[
                [
                    [
                        ED_Layer(
                            hidden_width * out_,
                            hidden_width,
                            alpha,
                            activate_function=activate_function,
                        ),
                        ED_OutputLayer(
                            hidden_width,
                            hidden_width,
                            activate_function=activate_function,
                        ),
                    ]
                    for _ in range(out_)
                ]
                for _ in range(hidden_depth)
            ],
            [
                [
                    ED_Layer(
                        hidden_width * out_,
                        hidden_width,
                        alpha,
                        activate_function=activate_function,
                    ),
                    ED_OutputLayer(
                        hidden_width, 1, activate_function=activate_function
                    ),
                ]
                for _ in range(out_)
            ],
        ]

    def __call__(self, x: np.ndarray[typing.Any, np.dtype[np.float64]]):
        return self.forward(x.copy(), x.copy())

    def forward(
        self,
        pos: np.ndarray[typing.Any, np.dtype[np.float64]],
        neg: np.ndarray[typing.Any, np.dtype[np.float64]],
    ):
        def forward(
            pos: np.ndarray[typing.Any, np.dtype[np.float64]],
            neg: np.ndarray[typing.Any, np.dtype[np.float64]],
            cell: list[ED_Layer],
        ):
            for layer in cell:
                pos, neg = layer(pos, neg)
            return pos, neg

        for layer in self.layers:
            results = [forward(pos, neg, cell) for cell in layer]
            pos, neg = np.concatenate(results, axis=-1)
        return pos

    def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
        for layer in self.layers:
            for ind, cell in enumerate(layer):
                for layer in cell:
                    layer.update(loss_data[ind])


if __name__ == "__main__":
    np.random.seed(seed=10)
    ed = ED(12, 12, 2, 2, alpha=0.8, activate_function=sigmoid(u0=2.0))

    input_datas = np.array(
        [  # Datas
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],  # NOTのデータ
            [1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0],
            [0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0],
            [1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1],
            [1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1],
            [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
            # [0, 0],  # XORのデータ
            # [1, 0],
            # [0, 1],
            # [1, 1],
        ]
    )

    output_datas = np.array(
        [  # Datas
            [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
            [0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1],
            [1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
            [0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1],
            [1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0],
            [0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0],
            [1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            # [1, 0],
            # [1, 0],
            # [1, 0],
            # [0, 1],
        ]
    )

    metrics: list[float] = []
    for _ in tqdm(range(10000)):
        for num in range(len(input_datas)):
            output = ed(input_datas[num][None, :])
            err = output_datas[num] - output
            diff = err[0][:, None, None]
            ed.update(diff)
            metrics.append(np.abs(err[0]).sum())

    nims = 0
    correct = 0

    # input_datas = np.array(list(itertools.product([0, 1], repeat=12)))
    # output_datas = np.array(list(reversed(list(itertools.product([0, 1], repeat=12)))))

    for num in range(len(input_datas)):
        output = ed(input_datas[num][None, :])

        nims += len(output_datas[num])
        correct += (output.round(1).squeeze() == output_datas[num]).sum()

    print(f"Correct rate: {(correct/nims)*100:.02f}%")

    plt.plot(pd.DataFrame(metrics).rolling(20).mean())
    plt.plot(metrics, alpha=0.2)
    plt.grid()
    plt.xlabel("step")
    plt.ylabel("diff")
    plt.show()

解説

活性化関数

実際のコード
# Define activation functions
class Activate_Function:
    def __call__(
        self,
        x: np.ndarray[typing.Any, np.dtype[np.float64]],
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]: ...

    def derivative(
        self,
        x: np.ndarray[typing.Any, np.dtype[np.float64]],
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]: ...


class sigmoid(Activate_Function):
    def __init__(self, u0: float = 0.4):
        self.u0 = u0

    def __call__(
        self,
        x: np.ndarray[typing.Any, np.dtype[np.float64]],
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        x *= 2 / self.u0
        return np.exp(np.minimum(x, 0)) / (1 + np.exp(-np.abs(x)))

    def derivative(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return x * (1 - x)


class ReLU(Activate_Function):
    def __call__(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]], u0: float = 0.4
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return np.maximum(0, x)

    def derivative(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return (x > 0) * 1  # type: ignore

最初に書かれている三つのクラスは活性化関数です。
Carteletさんの実装を関数からクラスにしただけです。
詳しくはこちらのいくつかの活性化関数たちをご覧ください。

Q. なぜクラス化? A. Pylanceという型チェックの機能が警告を出すからです。

LinearクラスとED_Layerクラス及びED_OutputLayerクラス

実際のコード
# Define predict function
class Linear:
    def __init__(self, in_: int, out_: int):
        self.weight = np.random.rand(in_, out_)

    def __call__(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        return self.forward(x)

    def forward(
        self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
    ) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
        self.input = x.copy()
        self.output = x @ self.weight
        return self.output


class ED_Layer:
    def __init__(
        self,
        in_: int,
        out_: int,
        alpha: float = 0.8,
        activate_function: Activate_Function = sigmoid(),
    ):
        self.in_ = in_  # Input count
        self.out_ = out_  # Output Count
        self.alpha = alpha  # Learning rate
        self.activate_function: Activate_Function = activate_function
        self.pospos = Linear(in_, out_)
        self.negpos = Linear(in_, out_)
        self.posneg = Linear(in_, out_)
        self.negneg = Linear(in_, out_)

    def __call__(
        self,
        pos_array: np.ndarray[typing.Any, np.dtype[np.float64]],
        neg_array: np.ndarray[typing.Any, np.dtype[np.float64]],
    ):
        return self.forward(pos_array, neg_array)

    def forward(
        self,
        pos_vec: np.ndarray[typing.Any, np.dtype[np.float64]],
        neg_vec: np.ndarray[typing.Any, np.dtype[np.float64]],
    ):
        self.postive = self.activate_function(
            self.pospos(pos_vec) - self.negpos(neg_vec)
        )
        self.negative = self.activate_function(
            self.negneg(neg_vec) - self.posneg(pos_vec)
        )
        return self.postive, self.negative

    def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
        for operator, linear, output in (
            (1, self.pospos, self.postive),
            (-1, self.negpos, self.postive),
            (1, self.posneg, self.negative),
            (-1, self.negneg, self.negative),
        ):
            diff_weight: np.ndarray[typing.Any, np.dtype[np.float64]] = np.einsum(  # type: ignore
                "b,bo,bi->bio",
                loss_data.mean(1),
                self.activate_function.derivative(output),
                linear.input,
            )
            linear.weight += operator * self.alpha * diff_weight.mean(0)


class ED_OutputLayer(ED_Layer):
    def __init__(
        self,
        in_: int,
        out_: int,
        alpha: float = 0.8,
        activate_function: Activate_Function = sigmoid(),
    ):
        super().__init__(in_, out_, alpha, activate_function)

    def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
        for operator, linear, output in (
            (1, self.pospos, self.postive),
            (-1, self.negpos, self.postive),
            (1, self.posneg, self.negative),
            (-1, self.negneg, self.negative),
        ):
            diff_weight: np.ndarray[typing.Any, np.dtype[np.float64]] = np.einsum(  # type: ignore
                "bo,bo,bi->bio",
                loss_data,
                self.activate_function.derivative(output),
                linear.input,
            )
            linear.weight += operator * self.alpha * diff_weight.mean(0)

Carteletさんと特に代わり映えありません。私は解説できるほどの能力がないため、こちらの記事をご覧ください。

EDクラス

ここを変更して、モデルの性能の向上を図りました。

実際のコード
class ED:
    def __init__(
        self,
        in_: int,
        out_: int,
        hidden_width: int,
        hidden_depth: int = 1,
        alpha: float = 0.8,
        activate_function: Activate_Function = sigmoid(),
    ):
        self.layers: list[list[list[ED_Layer]]] = [
            [
                [
                    ED_Layer(
                        in_, hidden_width, alpha, activate_function=activate_function
                    ),
                    ED_OutputLayer(
                        hidden_width, hidden_width, activate_function=activate_function
                    ),
                ]
                for _ in range(out_)
            ],
            *[
                [
                    [
                        ED_Layer(
                            hidden_width * out_,
                            hidden_width,
                            alpha,
                            activate_function=activate_function,
                        ),
                        ED_OutputLayer(
                            hidden_width,
                            hidden_width,
                            activate_function=activate_function,
                        ),
                    ]
                    for _ in range(out_)
                ]
                for _ in range(hidden_depth)
            ],
            [
                [
                    ED_Layer(
                        hidden_width * out_,
                        hidden_width,
                        alpha,
                        activate_function=activate_function,
                    ),
                    ED_OutputLayer(
                        hidden_width, 1, activate_function=activate_function
                    ),
                ]
                for _ in range(out_)
            ],
        ]

    def __call__(self, x: np.ndarray[typing.Any, np.dtype[np.float64]]):
        return self.forward(x.copy(), x.copy())

    def forward(
        self,
        pos: np.ndarray[typing.Any, np.dtype[np.float64]],
        neg: np.ndarray[typing.Any, np.dtype[np.float64]],
    ):
        def forward(
            pos: np.ndarray[typing.Any, np.dtype[np.float64]],
            neg: np.ndarray[typing.Any, np.dtype[np.float64]],
            cell: list[ED_Layer],
        ):
            for layer in cell:
                pos, neg = layer(pos, neg)
            return pos, neg

        for layer in self.layers:
            results = [forward(pos, neg, cell) for cell in layer]
            pos, neg = np.concatenate(results, axis=-1)
        return pos

    def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
        for layer in self.layers:
            for ind, cell in enumerate(layer):
                for layer in cell:
                    layer.update(loss_data[ind])

EDの中身はこんな感じとなっております。

image.png

前回から何が変わった軽く説明いたしますと前回のモデルはED法を層として使っていたのですが、あまり層を減らしてもあんまり性能に
影響がないと分かったので層を減らしました。
前に比べてかなり早くなりました。

速度比較

XOR

パラメーター

  • 入力の数(in_) 2
  • 出力の数(out_) 2
  • 学習率(alpha) 0.1
  • ステップ数 1000

前回

  • 正答率 100.00%
  • かかった時間 00:58
  • 隠れ層幅(hidden_width) 10
  • 隠れ層深さ(hidden_depth) 10
  • 損失の減り方
    Figure_1.png

今回

  • 正答率 100.00%
  • かかった時間 00:04
  • 隠れ層幅(hidden_width) 2
  • 隠れ層深さ(hidden_depth) 2
  • 損失の減り方
    Figure_1.png

NOT

パラメーター

  • 入力の数(in_) 3
  • 出力の数(out_) 3
  • 学習率(alpha) 0.8
  • 隠れ層幅(hidden_width) 2
  • 隠れ層深さ(hidden_depth) 2
  • ステップ数 1000

前回

  • 正答率 100.00%
  • かかった時間 01:13
  • 損失の減り方
    not_figura.png

今回

  • 正答率 100.00%
  • かかった時間 00:13
  • 損失の減り方
    not_figura.png

メリット/デメリット

メリット

  • 速い
  • シンプル

デメリット

  • 3層以上、隠れ層の幅が3以上になると学習しにくくなる

3層以上になると学習しにくくなることについては、どこに入力されたかという位置の情報が段々と失われていくからと推測します。
隠れ層の幅が3以上になると学習しにくくなるのは関係なしに全ての中間出力を入力にいれているからだと推測します。
AIが出力に関係している入力の値が探しにくくなっているのかもしれません。Autoencoderのような特徴を発見する機構が必要なのかも?

  • 予測できているのか?

サンプルコードにはステップ10000の長さ12の入出力のNOTテストが行われています。精度は100%です。
しかし、コメントアウトされたコード

input_datas = np.array(list(itertools.product([0, 1], repeat=12)))
output_datas = np.array(list(reversed(list(itertools.product([0, 1], repeat=12)))))

で全てのパターンのテストを行うと精度は66.95%で、学習出来ているのか不安になる数字です。
これは単なる過学習かもしれませんが、詳しく検証していないのでわかりません...

感想

まだまだ荒削りだなと感じました。こちらのCarteletさんの記事にもっと早くする方法が書かれていたので、こちらを参考にもっと早くしたいなと思います。

最後に

ED法を複数出力させる際には隠れ層を重ねてはいけないことと隠れニューロンの幅はあまり増やしてはいけないことがわかりました。
このような課題を解決すれば、ED法の複数出力がより実用的になるかもしれません。

面白いと思ったらいいねをお願いします!何か疑問点がありましたら、コメントをお願いします!できる限りの範囲でお答えします!
よろしくお願いします!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1