非Deepな機械学習手法としてランダムフォレスト (Random Forest) を選択する場面は多々ありますが、基本的にライブラリ任せになってあまり中身を意識することがありません。ので、今回はランダムフォレストの内部的な仕組みを確認しつつ、それを踏まえてPythonでスクラッチ実装していこうと思います。
ランダムフォレストについて
ランダムフォレストの仕組みに関する分かりやすい記事は探せばいくらでもあるので、ここでは以降が読みやすくなるよう実装の視点から少し解説をつけておきます。
ランダムフォレストはたくさんの決定木から構成され、決定木はノードから構成されます。イメージとしては以下のようになります。
なので、実装の手順としては、
-
ノード :
Node
-
決定木 :
DecisionTree
-
ランダムフォレスト :
RandomForest
の3つのクラスを実装していきます。
1. ノード : Node
データ適合後の各ノードは分岐条件を持っています。分岐条件とは「データのある特徴 .feature
があるしきい値 .value
以上であるか否か」であり、YESであれば左に、NOであれば右に分岐します。分岐条件によって分割されたデータは子ノードへの入力となり、再帰的にノードが構築されていきます。
では各ノードの分岐条件 (.feature
と .value
) がどのように決定されていくかといえば、取り得る全ての組合せ (.feature
, .value
) について情報ゲインを計算し、それが最大となるような組合せを保持します。
決定木の末端のノードは葉 (Leaf) と呼ばれ、葉のノードは予測クラス.label
を持ちます。
2. 決定木 : DecisionTree
決定木クラスの実装は単純で、インスタンス生成時には属性としてルートノード .root
を設定します。データに適合 (.fit()
) する時に再帰的に決定木が構築されます。
3. ランダムフォレスト : RandomForest
ランダムフォレストでは多数の決定木を構築し、それぞれの決定木の予測したクラスの多数決によって最終的な予測が出力されます。それぞれの決定木にどのようにして差異が生じるかといえば、fit()
時に入力されたデータからランダム抽出 _bootstrap_sample()
したサブセットを各決定木への入力とすることで多様な木を構築しています。ランダム抽出の際、使用する特徴量についても選択します。
スクラッチ実装
0. ライブラリのインポート
スクラッチ実装なので、sklearn.tree
は使わない縛りでいきます。
from collections import Counter
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
1. ノードの実装
class Node:
def __init__(self):
self.left = None # 左側に分岐した子ノード
self.right = None # 右側に分岐した子ノード
self.feature = None # 分岐条件の特徴量
self.value = None # 分岐条件のしきい値
self.label = None # 自身の予測ラベル
self.gain = None # 自身の情報ゲイン
def build(self, X, y):
features = X.columns
n_features = len(features)
if len(y.unique()) == 1:
self.label = y.unique()[0]
return
class_counts = Counter(y) # ラベルの各クラスの数をdict型で返す
self.label = max(class_counts, key=class_counts.get)
# 最適な分岐条件を保持するための変数
best_gain = 0.0
best_feature = None
best_value = None
gini = self.gini(y) # 情報ゲインの計算に必要
for f in features:
values = X[f].unique()
for val in values:
y_l = y[X[f] >= val]
y_r = y[X[f] < val]
gain = self.information_gain(y_l, y_r, gini)
if gain > best_gain:
best_gain = gain
best_feature = f
best_value = val
# 情報ゲインが増えていなければ終了
if best_gain == 0:
return
else:
self.gain = best_gain
self.feature = best_feature
self.value = best_value
# 左側のノードを再帰的に構築
X_l = X[X[self.feature] >= self.value]
y_l = y[X[self.feature] >= self.value]
self.left = _Node()
self.left.build(X_l, y_l)
# 右側のノードを再帰的に構築
X_r = X[X[self.feature] < self.value]
y_r = y[X[self.feature] < self.value]
self.right = _Node()
self.right.build(X_r, y_r)
# ジニ不純度の計算
def gini(self, y):
classes = y.unique()
gini = 1.0
for c in classes:
gini -= (len(y[y==c]) / len(y)) ** 2
return gini
# 情報ゲインの計算
def information_gain(self, y_l, y_r, current_uncertainty):
pl = float(len(y_l) / (len(y_l)+len(y_r)))
pr = float(len(y_r) / (len(y_l)+len(y_r)))
return current_uncertainty - (pl*self.gini(y_l) + pr*self.gini(y_r))
# 一つのデータに対する予測
def predict(self, datum):
# 葉ノードでなければ分岐条件に従って分岐する
if self.feature != None:
if datum[self.feature] >= self.value:
return self.left.predict(datum)
else:
return self.right.predict(datum)
# 葉ノードなら自身の予測ラベルを返す
else:
return self.label
2. 決定木の実装
class DecisionTree:
def __init__(self):
self.root = None
def fit(self, X, y):
self.root = Node()
self.root.build(X, y)
def predict(self, X):
y_pred = []
for i in range(len(X)):
y_pred.append(self.root.predict(X.iloc[i]))
return pd.Series(y_pred)
3. ランダムフォレストの実装
class RandomForest:
def __init__(self, n_trees=10):
self._n_trees = n_trees
self._forest = [None] * self._n_trees
self._used_features_set = [None] * self._n_trees
def fit(self, X, y, random_state=0):
self._classes = np.unique(y)
sampled_X, sampled_y = self._bootstrap_sample(X, y, random_state=random_state)
for i, (sampled_Xi, sampled_yi) in enumerate(zip(sampled_X, sampled_y)):
tree = DecisionTree()
tree.fit(sampled_Xi, sampled_yi)
self._forest[i] = tree
# データセットから `._n_trees` 個のサブセットを返す関数
def _bootstrap_sample(self, X, y, random_state=0):
n_features = len(X.columns)
n_features_forest = int(np.floor(np.sqrt(n_features)))
bootstrapped_X = []
bootstrapped_y = []
np.random.seed(random_state)
for i in range(self._n_trees):
idx = np.random.choice(len(y), size=len(y))
col_idx = np.random.choice(n_features, size=n_features_forest, replace=False)
bootstrapped_X.append(X.iloc[idx, col_idx])
bootstrapped_y.append(y.iloc[idx])
self._used_features_set[i] = col_idx
return bootstrapped_X, bootstrapped_y
def predict(self, X):
proba = self.predict_proba(X)
return self._classes[np.argmax(proba, axis=1)]
def predict_proba(self, X):
if self._forest[0] is None:
raise ValueError('Model not fitted yet')
# 決定木群による投票
votes = pd.DataFrame()
for i, (tree, used_features) in enumerate(zip(self._forest, self._used_features_set)):
votes[i] = tree.predict(X.iloc[:, used_features])
votes_array = votes.values
# 投票結果を集計
counts_array = np.zeros((len(X), len(self._classes)), dtype=int)
for c in self._classes:
counts_array[:, c] = np.sum(np.where(votes_array==c, 1, 0), axis=1)
# 予測クラス毎に割合を計算し、probaとして返す
proba = counts_array / self._n_trees
return proba
4. irisデータセットによる動作確認
それではお馴染みirisを使って動作確認を行います。
if __name__ == '__main__':
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = pd.Series(iris.target)
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.4, random_state=42)
rf = RandomForest(n_trees=10)
rf.fit(X_train, y_train, random_state=42)
y_pred = rf.predict(X_valid)
print('============', 'Confusion Matrix')
print(confusion_matrix(y_valid, y_pred))
print('============', 'Accuracy Score')
print(accuracy_score(y_valid, y_pred))
============ Confusion Matrix
[[23 0 0]
[ 0 19 0]
[ 0 2 16]]
============ Accuracy Score
0.9666666666666667
Accuracy 96.7%!ということでまずまずな分類が出来ています。random_state
の値によっては0.9になったり1.0になったりしますが、基本的に単体の決定木より良い性能が出ます。
まとめ
ランダムフォレストの仕組みを確認し、Pythonでスクラッチ実装を行いました。数値データしか扱えませんが、上手く動作することを確認できました。他にも過学習を防ぐために max_depth
(木の最大深さ) を設定できるようにする、カテゴリカル変数を扱えるようにするなど出来ることはありますが、とりあえず今回はここまでにしておきます。