その1の記事に引き続き、[自作のAutomated Machine Learning(AutoML)プログラム][1]の解説を書きます。
[1]:https://github.com/bright1998/AutoML_Binary_Classification/blob/master/AML_binary-classification.py
##【データの分割】
# モデル用データの分割
X_train_eval, X_test, y_train_eval, y_test = train_test_split(X_fin_m, y_m, train_size=0.8, test_size=0.2, random_state=1)
X_train, X_eval, y_train, y_eval = train_test_split(X_train_eval, y_train_eval, train_size=0.8, test_size=0.2, random_state=1)
学習用、検証用、テスト用のデータに分割しています。今回はテスト用として20%残しておき、残り80%のさらに80%(つまり全体の64%)を学習用、20%(全体の16%)を検証用としました。
##【パイプライン構築】
if use_PCA == True:
pipelines = {
# ロジスティック回帰
'logistic' : Pipeline([('scl', StandardScaler()), ('PCA', PCA(random_state=1)), ('est', LogisticRegression(random_state=1))]),
中略
else:
pipelines = {
# ロジスティック回帰
'logistic' : Pipeline([('scl', StandardScaler()), ('est', LogisticRegression(random_state=1))]),
中略
データの標準化、(必要な場合は主成分分析、)機械学習という一連の流れをパイプラインとして定義しています。複数の機械学習手法をforループで処理するために、パイプラインのリストを作っています。最初に機械学習手法の1次スクリーニングを行うので、ほとんどのモデルパラメータはデフォルト値を採用しています。
##【交差検証による機械学習手法のスクリーニング】
# 交差検定
results = {}
kf = KFold(n_splits=10, shuffle=True, random_state=1)
for pipe_name, pipeline in pipelines.items():
cv_results = cross_val_score(estimator=pipeline, X=X_train.values, y=y_train.values, scoring=scoring_method, cv=kf)
print(pipe_name, ':', scoring_method, 'score= ', cv_results.mean(), '+-', cv_results.std())
results[pipe_name] = cv_results.mean() - cv_results.std()
# ランキング(評価値の高い順にソート)
sorted_results = {}
for k,v in sorted(results.items(), key=lambda x:-x[1]):
sorted_results[k] = v
df_ranking = pd.DataFrame(data=list(sorted_results.values()), index=list(sorted_results.keys()), columns=['score'])
10分割交差検証を行い、評価指標の(平均値-標準偏差)で順位付けをします。平均値で順位をつけても良いかもしれませんが、今回は性能が下振れした時を想定して順位付けをしました。
##【グリッドサーチによるベストモデルの選定と学習】
best_scores = {}
best_models = {}
for i in range(n_GS):
method_name = list(sorted_results.keys())[i]
if method_name == 'logistic':
param_grid = {'est__penalty':['l1', 'l2'], 'est__C':[0.1, 0.5, 1, 2, 5, 10]}
elif method_name == 'svm':
param_grid = {'est__loss':['hinge', 'squared_hinge'], 'est__C':[0.1, 1, 10]}
elif method_name == 'knn':
param_grid = {'est__n_neighbors':[2, 5, 10, 15], 'est__weights':['uniform', 'distance']}
elif method_name == 'rfc':
param_grid = {'est__n_estimators':[5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100], 'est__max_features':['auto', 'log2']}
elif method_name == 'gbc':
param_grid = {'est__n_estimators':[30, 50, 70, 100, 150, 200], 'est__max_depth':[3, 4, 5, 6, 7, 8, 9, 10]}
elif method_name == 'mlp':
param_grid = {'est__hidden_layer_sizes':[(100,), (100, 100,), (100, 100, 100,)], 'est__activation':['relu', 'logistic'], 'est__solver':['adam', 'lbfgs', 'sgd']}
gs = GridSearchCV(pipelines[method_name], param_grid, scoring=scoring_method, cv=kf)
gs.fit(X_eval.values, y_eval.values)
best_scores[method_name] = gs.best_score_
best_models[method_name] = gs.best_estimator_
print('===Result of Grid Search===')
print(best_scores)
print('')
# ランキング(評価値の高い順にソート)
sorted_best_scores = {}
for k,v in sorted(best_scores.items(), key=lambda x:-x[1]):
sorted_best_scores[k] = v
print('Best Model:', list(sorted_best_scores.keys())[0])
print('')
# ベストモデルの抽出
best_model = best_models[list(sorted_best_scores.keys())[0]]
display(best_model)
# ベストモデルの学習
best_model.fit(X_train_eval.values, y_train_eval.values)
print('=== Fitting Done ===')
print('')
上位n_GS個の手法についてそれぞれグリッドサーチをして、最も評価指標がよくなるモデルパラメータを求めます。更にその中から最良のモデル(best_model)を抽出します。最後に最良のモデルで学習をしなおしています。
【汎化性能の確認】
# 学習済みモデルの評価(汎化性能の確認)
print('=== Checking Generalization Ability ===')
if scoring_method == 'accuracy':
print('Done:', scoring_method, 'score=', accuracy_score(y_test.values, best_model.predict(X_test)))
中略
学習で用いずに取っておいたテスト用データを用いて予測を行い、正解データと比較することで汎化性能を確認します。
##【モデルの保存】
# モデルの保存
with open('best_model.pickle', mode='wb') as fp:
pickle.dump(best_model, fp, protocol=2)
学習済みのモデルをファイルに保存しておきます。
##【予測の実行】
df_prediction_class = pd.DataFrame(data=best_model.predict(X_fin_s), columns=['Y_pred'])
prediction_proba = best_model.predict_proba(X_fin_s)
for i in range(len(df_prediction_class)):
if df_prediction_class.loc[i, 'Y_pred'] == cls:
icheck = i
break
check_list = list(prediction_proba[icheck, :])
max_proba = 0
for i in range(len(check_list)):
if check_list[i] > max_proba:
max_proba = check_list[i]
imax = i
df_prediction_proba = pd.DataFrame(data=prediction_proba[:,imax], columns=['Y_proba'])
目的変数のない予測用データ(X_fin_s)をモデルに渡し、predictメソッドで予測結果を得ます。またpredict_probaメソッドを用いると、各クラスに所属する確率を得られます。ここで長々と処理を施しているのは、どのクラスに所属している確率を得たいかをclsというパラメータで指定していますが、predict_probaメソッドの返り値の何番目にその確率が格納されているかを確認するためです。
##【予測結果の出力】
result_fin = pd.concat([ID_s, df_prediction_class, df_prediction_proba], axis=1)
# result_fin = pd.concat([ID_s, df_prediction_proba], axis=1)
result_fin = result_fin.set_index(header_ID)
result_fin.to_csv('predict_probability.csv')
予測結果をcsvファイルに出力しています。
##【あとがき】
その1の記事の【経緯】でも書いたように、JDLAのE資格の受験資格を得るために受講した機械学習講座の最終課題として、このようなプログラムを作成しました。E資格自体にそれほどの価値はないかもしれませんが、受験資格を得て合格するまでには、それなりの知識やスキルを得ることができます。受験するか悩まれている方の参考になれば幸いです。一気に書き上げたので誤字・脱字などがあるかもしれません。適宜修正しますのでご容赦ください。