4
5

More than 3 years have passed since last update.

粒子群最適化法でロジスティック回帰実装

Last updated at Posted at 2019-11-21

何回か前に遺伝的アルゴリズムをこの記事で書いたんですが、今回はそれと同じような考え方で作られた最適化アルゴリズムを作ってみました。

粒子群最適化とは

魚や鳥などが群れを作って移動しているものをよく見ますね。これらの群れは敵を見つけたり餌を見つけたりした場合にも一糸乱れぬ動きで最適な運動を行います。これを関数の最適化に活用してやろうという手法になります。

参考文献はこの辺
進化計算アルゴリズム入門
粒子群最適化と非線形システム

実装

なにも複雑なことはしていなくて、一番単純な感じで実装してます。

functions.py
import random

# min~maxの間でランダムに数を作る
def randRange(a : float,b : float) -> float:
    return a + (b-a)*random.random()

# n個の粒子を生成

def makeParticles(n : int,ranges : dict) -> list:
    ls = [{key:0 for key in ranges.keys()} for key in range(n)]
    for i in range(n):
        for key in ranges.keys():
            a,b = ranges[key]
            ls[i][key] = randRange(a,b)
    return ls

# 二つのdictの同じ要素同士を足し算
def dictAdd(*dic : dict) -> dict:
    ansDic = {}
    for key in dic[0].keys():
        ansDic[key] = 0
        for i in range(len(dic)):
            ansDic[key] += dic[i][key]
    return ansDic

# 二つのdictの同じ要素同士を引き算
def dictDiff(dic1 : dict,dic2 : dict) -> dict:
    ansDic = {}
    for key in dic1.keys():
        ansDic[key] = dic1[key] - dic2[key]
    return ansDic

# 二つのdictの同じ要素同士の積
def dictMul(dic1 : dict, dic2 : dict) -> dict:
    ansDic = {}
    for key in dic1.keys():
        ansDic[key] = dic1[key] * dic2[key]

# dictのそれぞれの要素に値をかける
def dictNumMul(dic : dict,n) -> dict:
    ansDic = {}
    for key in dic.keys():
        ansDic[key] = dic[key] * n
    return ansDic
PSO.py
import functions as func
import numpy as np
import copy

"""
time_max 反復する最大の回数
swam_size 粒子の個数
inertia 慣性係数
ap パーソナルベストの加速係数
ag グローバルベストの加速係数
"""


class PSO:
    def __init__(self,time_max = 1000,swam_size = 100,
                inertia = 0.9,ap = 0.8,ag = 0.8):
        self.time_max = time_max
        self.swam_size = swam_size
        self.inertia = inertia
        self.ap = ap
        self.ag = ag
        self.pBest = [np.inf for i in range(swam_size)]
        self.gBest = np.inf
        self.gPosi = {}

    """
    optimize関数のf引数には最小化したい関数を渡す
    para引数にはdictで
    {"引数名":[最小値,最大値]}
    という感じで渡す。
    """
    # 最適化したい関数とその引数のやつを受け取る
    def optimize(self,f,para):
        self.function = f
        self.particleInitialization(para)
        self.Update()
        for i in range(self.time_max):
            self.move()
            self.Update()
        return self.gPosi,self.gBest

    # 粒子を初期化する
    def particleInitialization(self,ls : dict):
        self.particle = func.makeParticles(self.swam_size,ls)
        self.pPosi = copy.deepcopy(self.particle)
        self.velocity = [{key:0 for key in ls.keys()} for i in range(self.swam_size)]

    # 動く
    def move(self):
        for i in range(self.swam_size):
            v0 = func.dictNumMul(self.velocity[i],self.inertia)
            v1 = func.dictNumMul(func.dictDiff(self.pPosi[i],self.particle[i]),self.ap*func.randRange(0,1))
            v2 = func.dictNumMul(func.dictDiff(self.gPosi,self.particle[i]),self.ag*func.randRange(0,1))
            v = func.dictAdd(v0,v1,v2)
            self.particle[i] = func.dictAdd(self.particle[i],v)
            self.velocity[i] = v

    # パーソナルベストとグローバルベストを更新する
    def Update(self):
        for i in range(self.swam_size):
            cost = self.function(**self.particle[i])
            if cost < self.pBest[i]:
                self.pBest[i] = cost
                self.pPosi[i] = copy.deepcopy(self.particle[i])
            if cost < self.gBest:
                self.gBest = cost
                self.gPosi = copy.deepcopy(self.particle[i])

 使ってみる

main.py
import PSO

# 最小化したい関数を定義

def f(x,y):
    return x**4+4*x**3-8*(x+1)**2-1 + y**4 + x**3 - 5*(y - 2)**2 - 3*x
def main():
    pso = PSO.PSO(time_max=1000)
    # 初期の粒子を-10000<x<10000,-10000<y<10000の範囲で発生させて最小化
    a,b = pso.optimize(f,{"x":[-10000,10000],"y":[-10000,10000]})
    print("座標",a,"最小値",b)

if __name__ == "__main__":
    main()

出力
座標 {'x': -4.412547925687121, 'y': -2.187602966185929}
最小値 -196.17514912856427

という感じで最小化できました。

wolframalphaでの結果

 追記

アップロードしたときはここまでだったんですけど、さすがに味気ないのでロジスティック回帰を粒子群最適化法を使って行いたいと思います。

ここでは、クロスエントロピー誤差関数を粒子群最適化法によって最小化することで分類境界を予測するということを行っています。

logistic.py
import functions as func
import math
import PSO
import numpy
# 発生した乱数にラベルをつける時の境界
def f(x,y):
    if x + 2 * y < 11:return 1
    else:return 0

# シグモイド関数
def sigmoid(x):
    if 600 > x > -600:
        return 1/(1 + math.exp(-x))
    elif x > 0:
        return 1
    else:
        return 0

# ゼロの時無限を返してあげるため
def llog(x):
    if x == 0:
        return -numpy.inf
    return math.log(x)

# 乱数でデータを発生させる
data = [[func.randRange(0,10),func.randRange(0,10)] for i in range(100)]
label = [f(x,y) for x,y in data]

# 最小化するコスト関数
def loss(w1,w2,b):
    ent = 0
    for i in range(len(data)):
        x = sigmoid(data[i][0]*w1 + data[i][1]*w2 + b)
        y = label[i]
        ent -= y * llog(x) + (1-y) * llog(1-x)
    return ent

def main():
    pso = PSO.PSO()
    a,b = pso.optimize(loss,{"w1":[-100,100],"w2":[-100,100],"b":[-100,100]})
    print(a,b)

    # できたモデルが学習データをしっかり分類できているか評価をする
    c = 0
    for i in range(100):
        n = sigmoid(data[i][0]*a["w1"] + data[i][1]*a["w2"] + a["b"])
        ans = 1 if n > 0.5 else 0
        if ans == label[i]:
            c += 1
    print("分類精度",c/100)

if __name__ == "__main__":
    main()

結果

{'w1': -5.345994566644921, 'w2': -10.19111400412491, 'b': 56.97834543330356} 2.744570556357321
分類精度 1.0

という感じでしっかりと分類することができました。

4
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
5