LoginSignup
5
7

More than 3 years have passed since last update.

Cartesian Genetic Programmingの実装

Last updated at Posted at 2019-07-12

はじめに

Cartesian Genetic Programmingを実装してみたので,それについて.
今回実装するのは,回帰問題(単目的)用です.全体のソースコードはここにあげてます.
修正点・ミス・バグあったら教えてください.

Cartesian Genetic Programming(CGP)の実装

一般的にCGPは,多層のフィードフォワード型の合成関数で表現されます.

しかし,今回は実装が簡単なので,一層当たり1ノードのフィードフォワードネットワークにします.CGPは自分より前の層ならどこからでも接続できるので,ノード総数が同じならば,一層当たり1ノードのフィードフォワードネットワークでもそんなに問題ないはずです.

個体表現

個体は,固定長の配列で表現します.要素数がノードの総数です.このとき,すべてのノードが使われることはないので,表現型は可変長ネットワークになります.
Untitled Diagram (1).jpg
また,各ノードは

  • functionノードの種類
  • 子ノードの接続関係
  • 定数(constノードのみ使用)

をパラメータとして持つ構造体です.さらに,Ephemeral Constant(ERC)と呼ばれるconstantノードを一定数持つことにします.ERCとは,個体の生成時と同時にランダムに決まる定数です.

class Gene:
    def __init__(self, func_name, inputs_arg, arity, const):
        self.func_name = func_name
        self.inputs_arg = inputs_arg
        self.arity = arity
        self.const = const

class Individual(list):
    def __init__(self, content=[]):
        list.__init__(self, content)
        self.fitness = None

    def create(self, gene_num, output_num, nodeSet, leafSet, ERC_func=lambda:random.random()*2-1):
        self.output_num = output_num
        variable_leaf = [(name,leaf_info) for name,leaf_info in leafSet.items() if leaf_info["const"]!=None]
        variable_leaf.sort(key=lambda obj:obj[1]["const"])
        const_leaf = [(name,leaf_info) for name,leaf_info in leafSet.items() if leaf_info["const"]==None]
        const_leaf.sort(key=lambda obj:obj[0])

        for n, item in enumerate(variable_leaf):
            name, leaf_info = item
            self.append(Gene(name, [], 0, leaf_info["const"]))
        for n, item  in enumerate(const_leaf):
            name, leaf_info = item
            self.append(Gene(name, [], 0, ERC_func()))

        function_node = [(name,node_info) for name,node_info in nodeSet.items() if name!="out"]
        max_arg_num = max([node_info["arity"] for node_info in nodeSet.values()])
        for n in range(gene_num):
            name,node_info = random.choice(function_node)
            inputs_arg = [random.randrange(len(self)) for x in range(max_arg_num)]
            self.append(Gene(name, inputs_arg, node_info["arity"], None))

        function_num = len(self)
        self.function_num = function_num
        for n in range(output_num):
            inputs_arg = [random.randrange(function_num) for x in range(max_arg_num)]
            self.append(Gene("out", inputs_arg, 1, None))

あとで,交叉しやすいように,子ノードの数は揃えてます.ただし,子ノードは先頭から使われていき,そのノードの関数の規定まで使われます.また,関数のノードセット(nodeSet)と入力ノードのノードセット(leafSet)は,各要素に関数名,入力数,定数が定義された辞書in辞書となります.

個体関数の実行

実行ですが,本実装の場合は,配列の先頭から実行しすれば大丈夫です.
しかし,実行しなくてもよいノードも存在するため,実行するノードだけを先に計算します.これは配列の最後の要素であるoutputノードから子ノードを辿ることで計算できます.

    def calc_path(self):
        path = [False for n in range(len(self))]
        for n in range(len(self))[::-1]:
            gene = self[n]
            if gene.func_name == "out":
                path[gene.inputs_arg[0]] = True
                path[n] = True
            else:
                if path[n]:
                    for i in range(gene.arity):
                        path[gene.inputs_arg[i]] = True
        return path

そして,実行するノードだけを,合成関数として計算します.

    def run(self, X, nodeSet, leafSet):
        path = self.calc_path()
        calc_rets = [None for _ in range(len(self))]
        for n, use in enumerate(path):
            if use:
                gene = self[n]
                name = gene.func_name
                if gene.arity == 0:
                    if "x" in name:
                        ret = leafSet[name]["function"](X, gene.const)
                    else:
                        ret = leafSet[name]["function"](gene.const, len(X))
                else:
                    ret = nodeSet[name]["function"](calc_rets, gene.inputs_arg)
                calc_rets[n] = ret
        for n in range(1,self.output_num+1):
            ret = calc_rets[-1*n]
            mask = np.isfinite(ret)
            m = np.mean(ret[mask])
            ret[mask==False] = m
            calc_rets[-1*n] = ret

        return calc_rets[-1*self.output_num:]

一応,対応できるように書いてあります.単出力の場合は,返り値の最初の要素が出力列となります.

進化アルゴリズム

実装した進化アルゴリズムはGenetic Algorithm(GA)とEvolutionary Strategy(ES)の二つです.
また,この二つのアルゴリズムのパラメータは後述するsklearn_objectが持つパラメータを使います.

Genetic Algorithm

GAの場合は交叉・突然変異・選択の3つの操作を行います.トーナメント選択のみしか実装していないです.
交叉は,1点交叉と一様交叉の2つが選べます.突然変異は,使用ノードのみを変異させる方法と,すべてのノードを変異させる方法がとれます.
以後,増やしていきたいです.
また,各手法はエリート戦略として,エリートを一個体保存するようにしました.

def GA_evolution(sklearn_object):
    ind_num = sklearn_object.ind_num
    nodeSet = sklearn_object.nodeSet
    leafSet = sklearn_object.leafSet
    gene_num = sklearn_object.gene_num
    output_num = sklearn_object.output_num
    generation_num = sklearn_object.generation_num
    tour_size = sklearn_object.tour_size

    CXPB = sklearn_object.CXPB
    MUTPB = sklearn_object.MUTPB
    CXMode = sklearn_object.CXMode
    MUTMode = sklearn_object.MUTMode

    X = sklearn_object.train_X
    y = sklearn_object.train_y
    evaluator_args = (X,y,nodeSet,leafSet)

    pop = op.makeInitialPopulation(ind_num, nodeSet, leafSet, gene_num, output_num)
    elite = None

    for g in range(generation_num):
        pop = op.evaluation(pop, evaluator, evaluator_args)

        new_elite = min([ind for ind in pop if ind.fitness!=None], key=lambda ind:ind.fitness)
        if elite == None or elite.fitness == None or new_elite.fitness < elite.fitness:
            elite = deepcopy(new_elite)
        pop = op.tournament_selection(pop, tour_size, elite)

        #print(g,*getGenerationInfo(pop))
        if CXMode == "OnePoint":
            pop = op.one_point_crossover(pop, CXPB)
        if CXMode == "Uniform":
            pop = op.uniform_crossover(pop, CXPB)
        if MUTMode == "Point":
            pop = op.point_mutate(pop, MUTPB, nodeSet, leafSet)
        if MUTMode == "UsePoint":
            pop = op.use_point_mutate(pop, MUTPB, nodeSet, leafSet)


    pop = op.evaluation(pop, evaluator, evaluator_args)
    new_elite = min([ind for ind in pop if ind.fitness!=None], key=lambda ind:ind.fitness)
    if new_elite.fitness < elite.fitness:
        elite = deepcopy(new_elite)
    return elite

Evolutionary Strategy

ESの場合は,(1+λ)ESのみ実装しました.論文見てると(1+λ)ESが強いようなので.
ESの場合は,交叉ができないようにしました.突然変異はGAと同じく,2つの方法が選べます.

def ES_evolution(sklearn_object):
    ind_num = sklearn_object.ind_num
    nodeSet = sklearn_object.nodeSet
    leafSet = sklearn_object.leafSet
    gene_num = sklearn_object.gene_num
    output_num = sklearn_object.output_num
    generation_num = sklearn_object.generation_num

    MUTPB = sklearn_object.MUTPB
    MUTMode = sklearn_object.MUTMode
    stop_eval = sklearn_object.stop_eval

    X = sklearn_object.train_X
    y = sklearn_object.train_y
    evaluator_args = (X,y,nodeSet,leafSet)

    pop = op.makeInitialPopulation(ind_num, nodeSet, leafSet, gene_num, output_num)
    elite = None

    eval_num = 0
    for g in range(generation_num):
        pop = op.evaluation(pop, evaluator, evaluator_args)
        eval_num += len(pop)
        if eval_num > stop_eval:
            break
        new_elite = min([ind for ind in pop if ind.fitness!=None], key=lambda ind:ind.fitness)
        if elite == None or elite.fitness == None or  new_elite.fitness < elite.fitness:
            elite = deepcopy(new_elite)

        pop = [deepcopy(elite) for _ in range(ind_num)]
        if MUTMode == "Point":
            pop = op.point_mutate(pop, MUTPB, nodeSet, leafSet)
        if MUTMode == "UsePoint":
            pop = op.use_point_mutate(pop, MUTPB, nodeSet, leafSet)
        #print(g,*getGenerationInfo(pop))

    new_elite = min([ind for ind in pop if ind.fitness!=None], key=lambda ind:ind.fitness)
    if elite == None or elite.fitness == None or  new_elite.fitness < elite.fitness:
        elite = deepcopy(new_elite)
    return elite

scikit-learnからの呼び出し

sklearnのcross validationやgrid searchから呼び出せるように,sklearnのbaseの要件を満たします.
baseオブジェクトを継承をしない場合は,

  • fit関数
  • predict関数
  • get_params関数
  • set_params関数
  • score関数

を定義すればいいはずです(2019/07/12現在).もしかしたらpredict関数はいらないかもしれません.
また,grid searchなどでscore関数を参照する場合には,戻り値のスコアを最大化します.今回は回帰問題を対象とするため,スコアに-1を掛けて返します.

class CGPRegressor:
    def __init__(self,ind_num=4, nodeSet=None,
        const_num=0, gene_num=100, generation_num=100, tour_size=7,
        CXPB=0.7, CXMode="OnePoint", MUTPB=0.05, MUTMode="UsePoint",
        ea_type="ES",stop_eval=100000):
        self.ind_num = ind_num
        if nodeSet == None:
            self.nodeSet = cgp_base.CreateNodeSet({"add","sub","mul","div"})
        else:
            self.nodeSet = nodeSet
        self.const_num = const_num
        self.gene_num = gene_num
        self.generation_num = generation_num
        self.tour_size = tour_size

        self.CXPB = CXPB
        if ea_type == "GA":
            self.MUTPB = 1.0 - CXPB
        else:
            self.MUTPB = MUTPB
        self.CXMode = CXMode
        self.MUTMode = MUTMode
        self.ea_type = ea_type
        self.output_num = 1
        self.stop_eval = stop_eval

    def fit(self, X, y):
        self.train_X = X
        self.train_y = y
        variable_num= X.shape[1]
        output_num = 1
        self.leafSet = cgp_base.CreateLeafSet(variable_num, self.const_num)
        if self.ea_type == "ES":
            self.best_ind = ea.ES_evolution(self)
        if self.ea_type == "GA":
            self.best_ind = ea.GA_evolution(self)
        return self

    def predict(self, X):
        _y = self.best_ind.run(X, self.nodeSet, self.leafSet)[0]
        return _y

    def get_params(self, deep=True):
        return {
        "nodeSet":self.nodeSet,
        "ind_num":self.ind_num,
        "const_num":self.const_num,
        "gene_num":self.gene_num,
        "generation_num":self.generation_num,
        "tour_size":self.tour_size,
        "CXPB":self.CXPB,
        "MUTPB":self.MUTPB,
        "CXMode":self.CXMode,
        "MUTMode":self.MUTMode,
        "ea_type":self.ea_type
        }

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self,parameter, value)
        return self

    def score(self,X,y,sample_weight=None):
        _y = self.predict(X)
        s =  np.sum((y-_y)**2)/len(y)
        return -1*s

実行結果

実行してみます.対象の回帰問題ですが,データセットを用意するのが面倒なので,乱数列とします.
そのうちちゃんとした問題に適用してみます.

if __name__ == "__main__":# test code
    seed = 6
    inputs_num = 3
    const_num = 1
    instances_num = 5

    random.seed(seed)
    np.random.seed(seed)
    X = np.random.random((instances_num, inputs_num))
    y = np.random.random(instances_num)

    model = CGPRegressor(ind_num=4, nodeSet=None,
        const_num=const_num, gene_num=10, generation_num=100,
        MUTPB=0.05, MUTMode="UsePoint",
        ea_type="ES")
    model.fit(X,y)
    s = model.score(X,y)
    print(s)

実行結果

-0.08112019089484938

とりあえず,動いているようです.

おわりに

今回は,CGPを実装してみました.ソースコードはこちら
次は,CGPにはGAがよいのか,それともESがよいのかを検証しようと思っています.
そのときは,実際の回帰問題で試してみることにします.

5
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
7