LoginSignup
5
7

More than 3 years have passed since last update.

ランダムフォレストをスクラッチで実装したい

Last updated at Posted at 2020-12-09

非Deepな機械学習手法としてランダムフォレスト (Random Forest) を選択する場面は多々ありますが、基本的にライブラリ任せになってあまり中身を意識することがありません。ので、今回はランダムフォレストの内部的な仕組みを確認しつつ、それを踏まえてPythonでスクラッチ実装していこうと思います。

ランダムフォレストについて

ランダムフォレストの仕組みに関する分かりやすい記事は探せばいくらでもあるので、ここでは以降が読みやすくなるよう実装の視点から少し解説をつけておきます。

ランダムフォレストはたくさんの決定木から構成され、決定木はノードから構成されます。イメージとしては以下のようになります。

random-forest.png

なので、実装の手順としては、

  1. ノード : Node
  2. 決定木 : DecisionTree
  3. ランダムフォレスト : RandomForest

の3つのクラスを実装していきます。

1. ノード : Node

データ適合後の各ノードは分岐条件を持っています。分岐条件とは「データのある特徴 .feature があるしきい値 .value 以上であるか否か」であり、YESであれば左に、NOであれば右に分岐します。分岐条件によって分割されたデータは子ノードへの入力となり、再帰的にノードが構築されていきます。

node.png

では各ノードの分岐条件 (.feature.value) がどのように決定されていくかといえば、取り得る全ての組合せ (.feature, .value) について情報ゲインを計算し、それが最大となるような組合せを保持します。

決定木の末端のノードは葉 (Leaf) と呼ばれ、葉のノードは予測クラス.label を持ちます。

2. 決定木 : DecisionTree

決定木クラスの実装は単純で、インスタンス生成時には属性としてルートノード .root を設定します。データに適合 (.fit()) する時に再帰的に決定木が構築されます。

tree.png

3. ランダムフォレスト : RandomForest

ランダムフォレストでは多数の決定木を構築し、それぞれの決定木の予測したクラスの多数決によって最終的な予測が出力されます。それぞれの決定木にどのようにして差異が生じるかといえば、fit() 時に入力されたデータからランダム抽出 _bootstrap_sample() したサブセットを各決定木への入力とすることで多様な木を構築しています。ランダム抽出の際、使用する特徴量についても選択します。

forest.png

スクラッチ実装

0. ライブラリのインポート

スクラッチ実装なので、sklearn.tree は使わない縛りでいきます。

from collections import Counter
import numpy as np
import pandas as pd

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score

1. ノードの実装

class Node:

    def __init__(self):
        self.left = None      # 左側に分岐した子ノード
        self.right = None     # 右側に分岐した子ノード
        self.feature = None   # 分岐条件の特徴量
        self.value = None     # 分岐条件のしきい値
        self.label = None     # 自身の予測ラベル
        self.gain = None      # 自身の情報ゲイン

    def build(self, X, y):

        features = X.columns
        n_features = len(features)

        if len(y.unique()) == 1:
            self.label = y.unique()[0]
            return

        class_counts = Counter(y)  # ラベルの各クラスの数をdict型で返す
        self.label = max(class_counts, key=class_counts.get)

        # 最適な分岐条件を保持するための変数
        best_gain = 0.0
        best_feature = None
        best_value = None

        gini = self.gini(y)  # 情報ゲインの計算に必要

        for f in features:
            values = X[f].unique()

            for val in values:
                y_l = y[X[f] >= val]
                y_r = y[X[f] <  val]

                gain = self.information_gain(y_l, y_r, gini)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = f
                    best_value = val

        # 情報ゲインが増えていなければ終了
        if best_gain == 0:
            return
        else:
            self.gain = best_gain
            self.feature = best_feature
            self.value = best_value

            # 左側のノードを再帰的に構築
            X_l = X[X[self.feature] >= self.value]
            y_l = y[X[self.feature] >= self.value]
            self.left = _Node()
            self.left.build(X_l, y_l)

            # 右側のノードを再帰的に構築
            X_r = X[X[self.feature] < self.value]
            y_r = y[X[self.feature] < self.value]
            self.right = _Node()
            self.right.build(X_r, y_r)

    # ジニ不純度の計算
    def gini(self, y):
        classes = y.unique()
        gini = 1.0
        for c in classes:
            gini -= (len(y[y==c]) / len(y)) ** 2
        return gini

    # 情報ゲインの計算
    def information_gain(self, y_l, y_r, current_uncertainty):
        pl = float(len(y_l) / (len(y_l)+len(y_r)))
        pr = float(len(y_r) / (len(y_l)+len(y_r)))
        return current_uncertainty - (pl*self.gini(y_l) + pr*self.gini(y_r))

    # 一つのデータに対する予測
    def predict(self, datum):
        # 葉ノードでなければ分岐条件に従って分岐する
        if self.feature != None:
            if datum[self.feature] >= self.value:
                return self.left.predict(datum)
            else:
                return self.right.predict(datum)
        # 葉ノードなら自身の予測ラベルを返す
        else:
            return self.label

2. 決定木の実装

class DecisionTree:

    def __init__(self):
        self.root = None

    def fit(self, X, y):
        self.root = Node()
        self.root.build(X, y)

    def predict(self, X):
        y_pred = []
        for i in range(len(X)):
            y_pred.append(self.root.predict(X.iloc[i]))
        return pd.Series(y_pred)

3. ランダムフォレストの実装

class RandomForest:

    def __init__(self, n_trees=10):
        self._n_trees = n_trees
        self._forest = [None] * self._n_trees
        self._used_features_set = [None] * self._n_trees

    def fit(self, X, y, random_state=0):
        self._classes = np.unique(y)
        sampled_X, sampled_y = self._bootstrap_sample(X, y, random_state=random_state)
        for i, (sampled_Xi, sampled_yi) in enumerate(zip(sampled_X, sampled_y)):
            tree = DecisionTree()
            tree.fit(sampled_Xi, sampled_yi)
            self._forest[i] = tree

    # データセットから `._n_trees` 個のサブセットを返す関数
    def _bootstrap_sample(self, X, y, random_state=0):

        n_features = len(X.columns)
        n_features_forest = int(np.floor(np.sqrt(n_features)))
        bootstrapped_X = []
        bootstrapped_y = []

        np.random.seed(random_state)
        for i in range(self._n_trees):
            idx = np.random.choice(len(y), size=len(y))
            col_idx = np.random.choice(n_features, size=n_features_forest, replace=False)
            bootstrapped_X.append(X.iloc[idx, col_idx])
            bootstrapped_y.append(y.iloc[idx])
            self._used_features_set[i] = col_idx
        return bootstrapped_X, bootstrapped_y

    def predict(self, X):
        proba = self.predict_proba(X)
        return self._classes[np.argmax(proba, axis=1)]

    def predict_proba(self, X):
        if self._forest[0] is None:
            raise ValueError('Model not fitted yet')

        # 決定木群による投票
        votes = pd.DataFrame()
        for i, (tree, used_features) in enumerate(zip(self._forest, self._used_features_set)):
            votes[i] = tree.predict(X.iloc[:, used_features])
        votes_array = votes.values

        # 投票結果を集計
        counts_array = np.zeros((len(X), len(self._classes)), dtype=int)
        for c in self._classes:
            counts_array[:, c] = np.sum(np.where(votes_array==c, 1, 0), axis=1)

        # 予測クラス毎に割合を計算し、probaとして返す
        proba = counts_array / self._n_trees
        return proba

4. irisデータセットによる動作確認

それではお馴染みirisを使って動作確認を行います。

if __name__ == '__main__':

    iris = load_iris()
    X = pd.DataFrame(iris.data, columns=iris.feature_names)
    y = pd.Series(iris.target)

    X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.4, random_state=42)

    rf = RandomForest(n_trees=10)
    rf.fit(X_train, y_train, random_state=42)

    y_pred = rf.predict(X_valid)

    print('============', 'Confusion Matrix')
    print(confusion_matrix(y_valid, y_pred))
    print('============', 'Accuracy Score')
    print(accuracy_score(y_valid, y_pred))
実行結果
============ Confusion Matrix
[[23  0  0]
 [ 0 19  0]
 [ 0  2 16]]
============ Accuracy Score
0.9666666666666667

Accuracy 96.7%!ということでまずまずな分類が出来ています。random_state の値によっては0.9になったり1.0になったりしますが、基本的に単体の決定木より良い性能が出ます。

まとめ

ランダムフォレストの仕組みを確認し、Pythonでスクラッチ実装を行いました。数値データしか扱えませんが、上手く動作することを確認できました。他にも過学習を防ぐために max_depth (木の最大深さ) を設定できるようにする、カテゴリカル変数を扱えるようにするなど出来ることはありますが、とりあえず今回はここまでにしておきます。

参考

5
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
7