追記(2024/05/05)
次の記事ができました!この記事の概要はこの方法を高速化して、メリット/デメリットをまとめました!
できればこの記事も見てくださると嬉しいです!
追記(2024/05/04)
ミスだと思ったコードがミスじゃなかったようです。
本当に申し訳ないです。
以前のコードでの問題といたしましては
- XOR 100.00%
- NOT 75.00%
でした。
実装のコードをコピーしてみてください。NOTが100%になるはずです
間違えた部分
間違えた部分ですが、最後の方のなかの
for i in tqdm(range(1000)):
for num in range(len(input_datas)):
output = ed(input_datas[num][None, :])
err = output_datas[num] - output
diff = err[0][:, None, None]
ed.update(diff)
が正しくは
for i in tqdm(range(1000)):
for num in range(len(input_datas)):
output = ed(input_datas[num][None, :])
err = output_datas[num] - output
diff = err[0][:, None, None]
ed.update(diff)
ed.update(diff)
でした。今後も間違えないように努めていきますので何卒宜しくお願い致します。
二回目更新して意味あるんかな...
初めに
Qiitaで初投稿します。読みにくいかと思いますがお手柔らかにお願いします。
また、筆者はいろいろとポンコツです。
「このコードおかしいな」や「この記事表現おかしくない?」、「全体的に読みづらい」、「あれ、この発想前に出てたのにな...」など思ったら、
改善すべき点を(筆者は国語力が低いので)わかりやすくコメント欄に書いてくださると嬉しいです!
今後改善していきますので是非!
実装
論よりコードということで先に
import numpy as np
import typing
from tqdm import tqdm
import pickle
# Define activation functions
class Activate_Function:
def __call__(
self,
x: np.ndarray[typing.Any, np.dtype[np.float64]],
) -> np.ndarray[typing.Any, np.dtype[np.float64]]: ...
def derivative(
self,
x: np.ndarray[typing.Any, np.dtype[np.float64]],
) -> np.ndarray[typing.Any, np.dtype[np.float64]]: ...
class sigmoid(Activate_Function):
def __call__(
self, x: np.ndarray[typing.Any, np.dtype[np.float64]], u0: float = 0.4
) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
x *= 2 / u0
return np.exp(np.minimum(x, 0)) / (1 + np.exp(-np.abs(x)))
def derivative(
self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
return x * (1 - x)
class ReLU(Activate_Function):
def __call__(
self, x: np.ndarray[typing.Any, np.dtype[np.float64]], u0: float = 0.4
) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
return np.maximum(0, x)
def derivative(
self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
return (x > 0) * 1
# Define predict function
class Linear:
def __init__(self, in_: int, out_: int):
self.weight = np.random.rand(in_, out_)
def __call__(
self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
return self.forward(x)
def forward(
self, x: np.ndarray[typing.Any, np.dtype[np.float64]]
) -> np.ndarray[typing.Any, np.dtype[np.float64]]:
self.input = x.copy()
self.output = x @ self.weight
return self.output
class ED_Layer:
def __init__(
self,
in_: int,
out_: int,
alpha: float = 0.8,
activate_function: Activate_Function = sigmoid(),
):
self.in_ = in_ # Input count
self.out_ = out_ # Output Count
self.alpha = alpha # Learning rate
self.activate_function: Activate_Function = activate_function
self.pospos = Linear(in_, out_)
self.negpos = Linear(in_, out_)
self.posneg = Linear(in_, out_)
self.negneg = Linear(in_, out_)
def __call__(
self,
pos_array: np.ndarray[typing.Any, np.dtype[np.float64]],
neg_array: np.ndarray[typing.Any, np.dtype[np.float64]],
):
return self.forward(pos_array, neg_array)
def forward(
self,
pos_vec: np.ndarray[typing.Any, np.dtype[np.float64]],
neg_vec: np.ndarray[typing.Any, np.dtype[np.float64]],
):
self.postive = self.activate_function(
self.pospos(pos_vec) - self.negpos(neg_vec)
)
self.negative = self.activate_function(
self.negneg(neg_vec) - self.posneg(pos_vec)
)
return self.postive, self.negative
def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
for operator, linear, output in (
(1, self.pospos, self.postive),
(-1, self.negpos, self.postive),
(1, self.posneg, self.negative),
(-1, self.negneg, self.negative),
):
diff_weight: np.ndarray[typing.Any, np.dtype[np.float64]] = np.einsum( # type: ignore
"b,bo,bi->bio",
loss_data.mean(1),
self.activate_function.derivative(output),
linear.input,
)
linear.weight += operator * self.alpha * diff_weight.mean(0)
class ED_OutputLayer(ED_Layer):
def __init__(
self,
in_: int,
out_: int,
alpha: float = 0.8,
activate_function: Activate_Function = sigmoid(),
):
super().__init__(in_, out_, alpha, activate_function)
def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
for operator, linear, output in (
(1, self.pospos, self.postive),
(-1, self.negpos, self.postive),
(1, self.posneg, self.negative),
(-1, self.negneg, self.negative),
):
diff_weight: np.ndarray[typing.Any, np.dtype[np.float64]] = np.einsum( # type: ignore
"bo,bo,bi->bio",
loss_data,
self.activate_function.derivative(output),
linear.input,
)
linear.weight += operator * self.alpha * diff_weight.mean(0)
class ED_Tissue:
def __init__(
self,
in_: int,
out_: int,
hidden_width: int,
hidden_depth: int = 1,
alpha: float = 0.8,
activate_function: Activate_Function = sigmoid(),
):
self.layers: list[ED_Layer] = [
ED_Layer(in_, hidden_width, alpha, activate_function=activate_function),
*[
ED_Layer(
hidden_width,
hidden_width,
alpha,
activate_function=activate_function,
)
for _ in range(hidden_depth)
],
ED_OutputLayer(hidden_width, out_, activate_function=activate_function),
]
def __call__(
self,
pos: np.ndarray[typing.Any, np.dtype[np.float64]],
neg: np.ndarray[typing.Any, np.dtype[np.float64]],
):
return self.forward(pos, neg)
def forward(
self,
pos: np.ndarray[typing.Any, np.dtype[np.float64]],
neg: np.ndarray[typing.Any, np.dtype[np.float64]],
):
for layer in self.layers:
pos, neg = layer(pos, neg)
return pos, neg
def update(self, loss_data: np.ndarray[typing.Any, np.dtype[np.float64]]):
for layer in self.layers:
layer.update(loss_data)
class ED_Organ:
def __init__(
self,
in_: int,
out_: int,
hidden_width: int,
hidden_depth: int,
alpha: float = 0.8,
activate_function: Activate_Function = sigmoid(),
):
self.tissues_layer = [
[
ED_Tissue(
in_,
out_,
hidden_width,
hidden_depth,
alpha,
activate_function=activate_function,
)
for _ in range(out_)
],
*[
[
ED_Tissue(
in_ * out_,
out_,
hidden_width,
hidden_depth,
alpha,
activate_function=activate_function,
)
for _ in range(out_)
]
for _ in range(out_)
],
[
ED_Tissue(
out_ * out_,
1,
hidden_width,
hidden_depth,
alpha,
activate_function=activate_function,
)
for _ in range(out_)
],
]
def __call__(self, x: np.ndarray[typing.Any, np.dtype[np.float64]]):
return self.forward(x.copy(), x.copy())
def forward(
self,
pos: np.ndarray[typing.Any, np.dtype[np.float64]],
neg: np.ndarray[typing.Any, np.dtype[np.float64]],
):
for tissues in self.tissues_layer:
results = [tissue(pos, neg) for tissue in tissues]
results = np.concatenate(results, axis=-1)
pos, neg = results
return pos
def update(self, loss_data: list[np.ndarray[typing.Any, np.dtype[np.float64]]]):
for tissues in self.tissues_layer:
for ind, tissue in enumerate(tissues):
tissue.update(loss_data[ind])
if __name__ == "__main__":
np.random.seed(seed=10)
ed = ED_Organ(2, 2, 10, 10, alpha=0.1)
input_datas = np.array(
[ # Datas
# [0, 0, 0], # NOTのデータ
# [1, 0, 0],
# [0, 1, 0],
# [1, 1, 0],
# [0, 0, 1],
# [1, 0, 1],
# [0, 1, 1],
# [1, 1, 1],
# コメントアウトされたデータでも100%
[0, 0], # XORのデータ
[1, 0],
[0, 1],
[1, 1]
]
)
output_datas = np.array(
[ # Datas
# [1, 1, 1],
# [0, 1, 1],
# [1, 0, 1],
# [0, 0, 1],
# [1, 1, 0],
# [0, 1, 0],
# [1, 0, 0],
# [0, 0, 0],
[1, 0],
[1, 0],
[1, 0],
[0, 1]
]
)
for i in tqdm(range(1000)):
for num in range(len(input_datas)):
output = ed(input_datas[num][None, :])
err = output_datas[num] - output
diff = err[0][:, None, None]
ed.update(diff)
ed.update(diff)
pickle.dump(ed, open("./model.pickel", mode="wb+"))
nims = 0
correct = 0
for i in range(100):
for num in range(len(input_datas)):
output = ed(input_datas[num][None, :])
nims += len(output_datas[num])
correct += (output.round(1).squeeze() == output_datas[num]).sum()
print(f"Correct rate: {(correct/nims)*100:.02f}%")
仕組み
大体何やってるかの図をご用意しました。
ポイントは出力ごとに縦方法で分割しグループにして、それぞれの出力の誤差で学習しているところです。
こちらの一つの出力に一つのネットワークで対応するアイディアを参考にしました。
この方法では、ネットワークを四つにして、それぞれ繋げています。
それぞれがいい感じに作用して、正しい解答をしてくれるってことですね。
追記 コードを視覚化したものをご用意しました
この図を作るにはあたって
ちぃがぅさんのED法+交差エントロピーをTF/Torchで実装してみた(おまけでBitNet×ED法を検証という記事を参考にしました。比較する際にとても分かりやすい図を多数載せていて、なるほど!と理解が深まったので今度投稿するときは参考にしたいです。
感想
他のネットワークから持ってきた出力をまとめて入れてもちゃんと正しくなるように出力してくれるのはすごいなと思いました。
ただ...計算コストが高くなる問題は解決していませんが...
追記
コードに書いてあるReLU活性化関数は飾りです。その関数で実行しようと、すると正答率0%になります。
正答率0%はうせやろと思って調べたら関数が間違っていました。申し訳ないです。
しかし、正答率は50%です。学習できませんでした...
最後に
この方法?設計?をScoped Error Diffusionと呼ぶことにします。
ED法では誤差をネットワーク全体にばらまきます。でもそれが体で行われたらやばくないですか?誤差を毎回必要のないところまでばらまいちゃうんですよ?
じゃあ、範囲を指定してばらまこうと。そういう発想です。
この方法を使う際はできればでいいのですが、「FanaticPondエライ!」と褒めて頂ければ、めっちゃ嬉しいです。
他に不明な点があったらコメントをください!お願いします!
謝辞
最初にED法を紹介してくださったaro kanekoさん
最初にED法を実装してくださったちぃがぅさん
個人的にとても分かりやすい方法で実装してくださったCarteletさん
色々なED法についての記事を投稿してくださった開発者の皆さん
ED法について、少しだけわかった気がします。ありがとうございました!
この記事がED法の進化の少しでも手助けになればと期待して、締めくくります。