1. イントロダクション
本記事では, 私が 東京大学グローバル消費インテリジェンス寄付講座 (GCI 2025 Summer) におけるコンペティションにて用いた, DEAP による遺伝的プログラミングの実装について, 備忘録も兼ねて説明します.
機械学習によって既知の統計データに対して予測を行う際, モデルの性能を引き上げる手法として特徴量エンジニアリングがあります. 特徴量エンジニアリングから新たな変数を生成することで, モデルは統計データにおける変数間の複雑な関係を理解しやすくなります. しかしながら, 個々人の持つ知見にはどうしても限界があるため, 発見できる特徴量は多くないことが多いでしょう.
そこで, 本記事では遺伝的プログラミングに着目し, 統計データを代表する新たな特徴量を探索的に生成する手法を紹介します. 本手法は比較的計算コストを要するため, 常に最善の手法となるとは限りませんが, コンペティションのように僅かなモデル性能の改善が勝敗を分ける場面では, 極めて有効な選択肢となり得ます.
2. アルゴリズム
遺伝的プログラミングとは, 自然界における進化の過程を模倣した手法で木構造を用いて表現されます. 例えば, 次の木構造は $(2.2-(\frac{X}{11})) + (7\times\cos(Y))$ を表します.
また, 木構造の操作に関しては主に以下の 3 つの概念があります.
- 選択
次の世代の親となる個体を選択するプロセスです. 個体は確率的に選択され, より優れたパフォーマンスを示すほど選択される可能性が高く設定されています. 一般的なアルゴリズムとしてはトーナメント選択があり, ランダムに選ばれた少数の個体の中で複数のトーナメントを行った際の勝者 (= 最も適応度の高い個体) が選択されます.
- 交叉
親となる個体を 2 つの個体から生成するプロセスです. 個体の一部分が別の個体の全体, あるいは一部分で置き換えることで新しい親を生成します.
- 突然変異
親から構文的に正しい子を生成するプロセスです. 個体の一部分が削除され, ランダムに生成された部分木を繋ぎ合わせます.
一般的には適当な初期化の後に, 選択, 交叉, 突然変異を 1 step として繰り返し, それによって得られる最適解の収束を試みます. 本記事においては以下のアルゴリズムに基づいて実装を進めていきます.
- 構文に沿った個体をランダムに複数生成し集団を形成する
- 大きな変化が見られなくなるまで交叉と突然変異を繰り返す
- 最終的に集団の中で最も良いパフォーマンスを示した個体を保存する
- 3. で保存した個体を評価して前ステップのスコアと比較する
- 4. で算出したスコアが良ければその特徴量を採用し 1. に戻る
- スコアの改善が見られなくなったら一連のプロセスを終了する
3. Python (DEAP) による実装
本記事では Python の外部ライブラリである DEAP を用いて, 遺伝的プログラミングによる特徴量エンジニアリングを実装します. ライブラリは以下のコマンドによってインストールできます.
pip install deap
また, 機械学習に用いるモデルには scikit-learn を用います. また, データはカテゴリカル変数に対してエンコーディングが済んでいる pandas.DataFrame
とし, 各カラムが numpy.number
として格納されているものとします.
本記事において必要な import
文は以下の通りです.
import operator
import random
import re
import typing
import deap.base
import deap.gp
import deap.tools
import matplotlib.pyplot
import numpy
import pandas
import seaborn
import sklearn.linear_model
import sklearn.metrics
import sklearn.model_selection
import sklearn.pipeline
import sklearn.preprocessing
3.1. 個体の定義 (deap.gp.PrimitiveTree)
まず, 個体の定義から始めます. 今回は評価指標が高くなる個体の生成が目的であるため, 適応度が高いとパフォーマンスが高いという関係性を定義した上で個体を定義します. 同時に, 計算再現性のために乱数生成におけるシードを固定しておきます.
def initialise(
*,
seed : int = 42
) -> None
random.seed(seed)
numpy.random.seed(seed)
deap.creator.create('fitness', deap.base.Fitness, weights = (1.,))
deap.creator.create('individual', deap.gp.PrimitiveTree, fitness = deap.creator.fitness)
3.2. 構文の定義 (deap.gp.PrimitiveSet)
DEAP はまだデータをどのようにして木にするかを理解していない状態です. そこで, deap.gp.PrimitiveSet
から木に用いる演算子 (primitive
) を定義します. ただし, データが非正数を含む可能性を考慮していくつかの演算子は予め定義しておきます.
def safediv(
lhs : numpy.typing.NDArray[numpy.number],
rhs : numpy.typing.NDArray[numpy.number],
) -> numpy.typing.NDArray[numpy.number]:
return numpy.where(not numpy.isclose(numpy.abs(rhs), 0.), numpy.true_div(lhs, rhs), 1.)
def safesqrt(
x : numpy.typing.NDArray[numpy.number],
) -> numpy.typing.NDArray[numpy.number]:
return numpy.sqrt(numpy.abs(x))
def safelog(
x : numpy.typing.NDArray[numpy.number],
) -> numpy.typing.NDArray[numpy.number]:
return numpy.log(numpy.abs(x) + numpy.finfo(x.dtype).eps)
使用する演算子の実装はデータに依存しますので, これが正しいと一意的に定まるわけではありません. 自身のデータにどのような値が格納されているかを確認した上で実装するようにしましょう.
def mypset(
feature : int
) -> deap.gp.PrimitiveSet:
pset = deap.gp.PrimitiveSet('main', feature)
pset.addPrimitive(operator.add, 2, name = 'ADD')
pset.addPrimitive(operator.sub, 2, name = 'SUB')
pset.addPrimitive(operator.mul, 2, name = 'MUL')
pset.addPrimitive(safediv, 2, name = 'DIV')
pset.addPrimitive(operator.neg, 1, name = 'NEG')
# pset.addPrimitive(numpy.sin, 1, name = 'SIN')
# pset.addPrimitive(numpy.cos, 1, name = 'COS')
# pset.addPrimitive(numpy.tan, 1, name = 'TAN')
pset.addPrimitive(numpy.sinh, 1, name = 'SINH')
pset.addPrimitive(numpy.cosh, 1, name = 'COSH')
pset.addPrimitive(numpy.tanh, 1, name = 'TANH')
pset.addPrimitive(safesqrt, 1, name = 'SQRT')
pset.addPrimitive(numpy.exp, 1, name = 'EXP')
pset.addPrimitive(safelog, 1, name = 'LOG')
pset.addPrimitive(numpy.minimum, 2, name = 'MIN')
pset.addPrimitive(numpy.maximum, 2, name = 'MAX')
return pset
これで木の生成における構文が完成しました. deap.gp.PrimitiveSet
においては入力を受ける数 (feature
) の明記が必要です. 引数の 1 つである name
は特段設定する必要があるわけではありませんが, 可読性が高まるように自身で設定しておくに越したことはないでしょう.
一般的に sin, cos, tan といった周期関数は元々の値が持つ大小関係を失わせる可能性があります. そのため, ドメイン知識に基づいてその有効性が期待できる場合を除き, むやみな追加は避けることが賢明です. 本記事では, 汎用的な例としてこれらを省略しています.
また, 今回はデータが各カラムを numpy.number
として格納している pandas.DataFrame
であると仮定しているため deap.gp.PrimitiveSet
を用いていますが, カテゴリカル変数がエンコーディングをされていない状態でも deap.gp.PrimitiveSetTyped
を用いて入出力のクラスを指定することで, それ以降の操作を実装することができます. (pset.addPrimitive(operator.add, [numpy.ndarray, numpy.ndarray], numpy.ndarray, name = 'ADD')
のように記述します.)
3.3. 操作の定義 (deap.base.Toolbox)
ここでは, 先に述べた選択, 交叉, 突然変異のアルゴリズムを定義します.
def mytoolbox(
pset : deap.gp.PrimitiveSet,
*,
method : typing.Optional[typing.Callable] = None
) -> deap.base.Toolbox:
if not hasattr(deap.creator, 'individual'):
raise NotImplementedError
toolbox = deap.base.Toolbox()
toolbox.register('expr_individual', deap.gp.genHalfAndHalf, pset = pset, min_ = 1, max_ = 3)
toolbox.register('expr_mutate', deap.gp.genFull, min_ = 0, max_ = 2)
toolbox.register('individual', deap.tools.initIterate, container = deap.creator.individual, generator = toolbox.expr_individual)
toolbox.register('population', deap.tools.initRepeat, container = list, func = toolbox.individual)
toolbox.register('select', deap.tools.selTournament, tournsize = 10)
toolbox.register('mate', deap.gp.cxOnePoint)
toolbox.register('mutate', deap.gp.mutUniform, expr = toolbox.expr_mutate, pset = pset)
toolbox.decorate('mate', deap.gp.staticLimit(key = operator.attrgetter('height'), max_value = 5))
toolbox.decorate('mutate', deap.gp.staticLimit(key = operator.attrgetter('height'), max_value = 5))
toolbox.register('compile', deap.gp.compile, pset = pset)
if method is not None:
toolbox.register('evaluate', method)
return toolbox
個体の生成には deap.gp.genHalfAndHalf
を, 突然変異においては deap.gp.genFull
を採用しています. また, deap.gp.staticLimit
によって木の深さに最大値を設けることで, 過学習を防いでいます. 木の適応値を評価するための関数は後に述べることとします.
個体の選択には deap.tools.selTournament
を採用しています. 引数である tournsize
からトーナメント選択において参加する個体数を選択することができ, 値が大きいほど選択圧 (優れた個体を選ばれやすくする指標) が高まります.
3.4. 監視の実装 (deap.base.MultiStatistics)
遺伝的プログラミングは比較的計算コストの高い手法であるため, 実行する際はその進捗を監視することが必要不可欠です. TensorBoard などに値を渡すことで監視することもできますが, 本記事では deap.tools.MultiStatistics
を用いて標準出力による監視を行います.
def mystats(
*,
dummy : None = None
) -> deap.tools.MultiStatistics:
fitness = deap.tools.Statistics(lambda x: x.fitness.values)
size = deap.tools.Statistics(len)
stats = deap.tools.MultiStatistics(fitness = fitness, size = size)
stats.register('avg', numpy.mean)
stats.register('min', numpy.min)
stats.register('max', numpy.max)
stats.register('std', numpy.std)
return stats
3.5. 遺伝的プログラミングの実行
遺伝的プログラミングではデータを正規化することが必要です. 各個体に対してモデルを構築することで他のデータの影響を受けていないことが明示できるため, 関数として定義しておきます. また, モデルの評価にあたって交差検証を用いるため, それに用いる分割器も定義しておきます.
def mymodel(
*,
dummy : None = None
) -> sklearn.pipeline.Pipeline:
components = [
('scaler', sklearn.preprocessing.StandardScaler()),
('model', sklearn.linear_model.LogisticRegression(random_state = 42)
]
return sklearn.pipeline.Pipeline(components)
def mysplitter(
*,
dummy : None = None
) -> sklearn.model_selection.StratifiedKFold:
return sklearn.model_selection.StratifiedKFold(shuffle = True, random_state = 42)
data
という名前の pandas.DataFrame
にデータが格納されており, target
という名前の str
に推測したいカラムが格納されているとします.
data : pandas.DataFrame
target : str
X = data.drop(columns = [target]).to_numpy()
y = data[target].to_numpy().ravel()
indices = data.index.to_numpy()
X_train, X_test, y_train, y_test, indices_train, indices_test= sklearn.model_selection.train_test_split(X, y, indices, random_state = 42)
initialise()
model = mymodel()
model.fit(X_train, y_train)
threshold = sklearn.metrics.roc_auc_score(y_test, model.predict_proba(X_test)[:, 1])
steps = 100
features = X.shape[1] + 15
expressions = []
for step in range(steps):
feature = X.shape[1] + len(expressions)
if feature > features:
break
def evaluate(
individual : deap.gp.PrimitiveTree
) -> typing.Tuple[float, typing.Any]:
function = toolbox.compile(individual)
data = numpy.c_[X_train, function(*(X_train.T))]
model = mymodel()
splitter = mysplitter()
return numpy.mean(sklearn.model_selection.cross_val_score(model, data, y_train, scoring = 'roc_auc', cv = splitter)),
pset = mypset(feature = feature)
toolbox = mytoolbox(pset, method = evaluate)
stats = mystats()
population = toolbox.population(n = 500)
halloffame = deap.tools.HallOfFame(1)
population, _ = deap.algorithms.eaSimple(population, halloffame = halloffame, toolbox = toolbox, cxpb = 0.75, mutpb = 0.1, ngen = 10, verbose = True, stats = stats)
fitness = stats.compile(population)['fitness']['max']
if fitness > threshold:
threshold = fitness
expressions.append(halloffame[0])
これまでに定義してきた関数と deap.tools.HallOfFame
によるパフォーマンスが高かった個体の抽出, deap.algorithms.eaSimple
による遺伝的プログラミングの実行を踏まえて, カラム数が features
になるまで操作を繰り返します. deap.algorithms.eaSimple
では cxpb
によって交叉率を, mutpb
によって突然変異率を操作できます. Optuna などを用いて機械的に最適な交叉率や突然変異率を探索することはできますが, 計算コストが高く処理に時間がかかってしまうため, 扱うデータごとに監視の結果から手動で調整する方が好ましいでしょう.
以上により, 遺伝的プログラミングによって導出されたモデルの性能を引き上げるための手法が expressions
に格納されます.
3.6. 汎化性能の検証
expressions
に格納された手法を元々のデータに適用することで特徴量エンジニアリングが完了するように思えますが, ここで過学習の可能性を考慮する必要があります. evaluate
の定義からもわかる通り, あくまでも遺伝的プログラミングは X_train
に対してモデルのパフォーマンスを引き挙げただけであって, その汎化性能は検証されていません. そこで X_train
と X_test
に対して 1 つずつ遺伝的プログラミングによって生成されるカラムを追加していった際のパフォーマンスの変化から評価を行います.
まずは, expressions
に格納された手法を元々のデータに適用するための関数を定義します.
def apply(
data : pandas.DataFrame,
*,
feature : int,
expressions : typing.List[typing.Any],
) -> pandas.DataFrame:
result = data.copy()
for i, expression in enumerate(expressions):
if len(result.columns) >= feature:
break
pset = mypset(feature = len(result.columns))
toolbox = mytoolbox(pset = pset)
function = toolbox.compile(expr = expression)
result[f'gp{i:02d}'] = function(*(result.to_numpy().T))
return result
本記事では Matplotlib と seaborn を用いた定性評価より, 過学習の有無を視覚的に検証することとします.
X_train = data.loc[indices_train].drop(columns = [target]).copy()
X_test = data.loc[indices_test].drop(columns = [target]).copy()
y_train = data.loc[indices_train, target].copy()
y_test = data.loc[indices_test, target].copy()
scores = []
for i in range(len(expressions) + 1):
X_train_gp = apply(X_train, feature = len(X_train.columns) + i, expressions = expressions)
X_test_gp = apply(X_test, feature = len(X_test.columns) + i, expressions = expressions)
model = mymodel()
splitter = mysplitter()
score = numpy.mean(sklearn.model_selection.cross_val_score(model, X_train_gp, y_train, scoring = 'roc_auc', cv = splitter))
scores.append({'Phase' : 'Train', 'Feature' : len(X_train_gp), 'Score' : score})
model = mymodel()
model.fit(X_train_gp, y_train)
score = sklearn.metrics.roc_auc_score(y_test, model.predict_proba(X_test_gp)[:, 1])
scores.append({'Phase' : 'Test', 'Feature' : len(X_test_gp), 'Score' : score})
scores = pandas.DataFrame(scores)
scores['Type'] = numpy.where(scores['Feature'] == len(X_train), 'Base', 'G.P.')
axis = seaborn.lineplot(scores, 'Feature', 'Score', hue = 'Phase', style = 'Type', markers = {'Base' : 'd', 'G.P.' : 'o'}, palette = {'Train' : 'b', 'Test' : 'r'}, dashes = False, markersize = 7.5)
axis.set_xlabel('Feature')
axis.set_ylabel('Score')
axis.grid(which = 'both')
handles, labels = [], []
for handle, label in zip(*axis.get_legend_handles_labels()):
if not re.search(r'Phase|Type', label):
handles.append(handle)
labels.append(label)
axis.legend(handles = handles, labels = labels, loc = 'lower right')
matplotlib.pyplot.show()
例として, 私が 東京大学グローバル消費インテリジェンス寄付講座 (GCI 2025 Summer) におけるコンペティションにて遺伝的プログラミングを用いた際に出力したスコアの推移を載せることとします.
図より, Feature = 26
に至るまでは Train
, Test
の双方において Score
の上昇が確認できます. しかし, そこからは次第に Test
の Score
が伸び悩み, 下降していることが見て取れます. これがいわゆる遺伝的プログラミングにおける過学習であり, 生成された特徴量が Train
に適応しすぎることで, データがモデルの汎化性能を失わせているのです. したがって, この結果から Test
の Score
が下降する前の Feature = 23
や Feature = 26
までの特徴量を使うべきであると言えます.
4. アウトロダクション
遺伝的プログラミングは非常に柔軟で強力である一方, その性能は問題設定のやパラメータに左右されることが多いため, これさえやれば良いという模範解答がありません. 本記事で紹介したように, 世代数や個体数を小さく設定して全体の挙動を把握し, 試行錯誤を繰り返しながらチューニングを進めることが現実的なアプローチです.
また, 本記事において深くは触れませんでしたが, 遺伝的プログラミングには肥大化という現象が伴います. これは, 世代を重ねるごとに木構造が不必要に複雑化し, 過学習や計算コストの増加を招く問題です. 対策としては, evaluate
において len(individual)
に応じた罰則項を評価指標に導入する手法が挙げられます.
このように, 遺伝的プログラミングは地道な作業を要する扱いが難しい手法です. しかしながら, 最適化が進めば人力では思いつかないような有効な特徴量を発見できる可能性を秘めています. 機械学習を行う際に 1 度試してみてはいかがでしょうか.