#ランダムフォレストを用いてn日後の株価の上下を予測するお話
参考にした論文↓
Predicting the direction of stock market prices using random forest
import time
import warnings
warnings.filterwarnings('ignore')
import os
import numpy as np
import pandas as pd
import pandas_datareader as pdr
import matplotlib.pyplot as plt
import pydotplus as pdp
import talib
from collections import OrderedDict
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn import tree
start=time.time()
output_dir = './output'
os.makedirs(output_dir, exist_ok=True)
def main():
NAME='SNE'
df1=pdr.DataReader(NAME,'yahoo',start='1999-6-01',end='2020-06-01')
predict_data_start='2000-01-01'
predict_data_end='2020-01-01'
print("インポートしたデータの確認:")
print("分析銘柄:",NAME)
print(df1.head(5),"\n")
df1["Close"]=df1["Adj Close"]
df=df1[["High","Low","Open","Close","Volume"]]
print("欠損値の確認(全てFalseであることを確認):")
print(df.isnull().all())
Alpha=0.9
data=pd.DataFrame([])
target=pd.DataFrame([])
train_data_list=[]
test_data_list=[]
train_target_list=[]
test_target_list=[]
Window_days_list=[20,40,60]
#20,40,60営業日後に対する予測結果の計算
for Window_days in Window_days_list:
#インジケータの計算
data,target=calculation_indicator(df,Window_days,Alpha)
condition=((data.index>predict_data_start) & (data.index<predict_data_end))
data=data.loc[condition]
target=target.loc[condition]
#データをtrain/testに分ける
train_data,test_data,train_target,test_target=train_test_split(data,target,shuffle=False,test_size=0.1,random_state=0)
#ランダムフォレストの計算
go_RandomForest(train_data.values,test_data.values,train_target.values,test_target.values,Window_days)
train_data_list.append(train_data)
test_data_list.append(test_data_list)
train_target_list.append(train_target)
test_target_list.append(test_target)
#OOBエラーの計算
#OOB_error(df,train_data_list,test_data_list,train_target_list,test_target_list)
def OOB_error(df,train_data_list,test_data_list,train_target_list,test_target_list):
RANDOM_STATE = 123
min_estimators = 1
max_estimators = 100
ensemble_clfs = [(0,"RandomForestClassifier, DAYS=30",RandomForestClassifier()),
(1,"RandomForestClassifier, DAYS=60",RandomForestClassifier()),
(2,"RandomForestClassifier, DAYS=90",RandomForestClassifier())]
error_rate = OrderedDict((label, []) for _,label, _ in ensemble_clfs)
for j,label, clf in ensemble_clfs:
for i in range(min_estimators,max_estimators+1):
clf.set_params(n_estimators=i,oob_score=True)
clf.fit(train_data_list[j],train_target_list[j])
oob_error=1-clf.oob_score_
error_rate[label].append((i, oob_error))
for label, clf_err in error_rate.items():
xs, ys = zip(*clf_err)
plt.plot(xs, ys, label=label)
plt.xlim(min_estimators, max_estimators)
plt.xlabel("n_estimators")
plt.ylabel("OOB error rate")
plt.legend(loc="upper right")
plt.savefig('./output/OOB_error.png')
#plt.show()
def go_RandomForest(train_data,test_data,train_target,test_target,window_days):
parameter={"n_estimators":[i for i in range(5,65,20)],
"criterion":["gini"],
"min_samples_leaf":[i for i in range(5,50,15)],
"max_depth":[i for i in range(1,7,2)],
"random_state":[123],
"max_features":["auto"],
}
tscv=[(train,test) for train, test in sklearn.model_selection.TimeSeriesSplit(n_splits=5).split(train_data)]
clf_fit=sklearn.model_selection.GridSearchCV(RandomForestClassifier(),param_grid=parameter,cv=tscv,n_jobs=4)
train_target=np.reshape(train_target,(-1))
test_target=np.reshape(test_target,(-1))
clf_fit.fit(train_data,train_target)
predictor=clf_fit.best_estimator_
##チューニングの履歴の出力
para_tune=pd.DataFrame.from_dict(clf_fit.cv_results_)
para_tune.to_csv('./output/para_select.csv')
##予測結果出力
result_for_test_f=predictor.predict(test_data)
table_f=sklearn.metrics.confusion_matrix(test_target,result_for_test_f)
tn,fp,fn,tp=table_f[0][0],table_f[0][1],table_f[1][0],table_f[1][1]
print("\n--------------------------------------------")
print("Window_days:",window_days)
print("パラメタフィッティング結果:")
print(predictor.get_params(True).items(),"\n")
print("結果(test_data):")
print("Accuracy\t{0:.3f}".format((tp+tn)/(tp+fp+fn+tn)))
print("Precision\t{0:.3f}".format(tp/(tp+fp)))
print("Recall\t\t{0:.3f}".format(tp/(tp+fn)))
print("SPC\t\t{0:.3f}".format(tn/(tn+fp)))
result_for_train_f=predictor.predict(train_data)
table_fc=sklearn.metrics.confusion_matrix(train_target,result_for_train_f)
tn,fp,fn,tp=table_fc[0][0],table_fc[0][1],table_fc[1][0],table_fc[1][1]
print("過学習のチェック")
print("Accuracy(train_data)\t{0:.3f}".format((tp+tn)/(tp+fp+fn+tn)))
print("--------------------------------------------\n")
#結果をファイル書き出し
with open('./output/result_random_forest.txt', 'w') as f:
print("テストデータ:\n",test_target,"\n\n","予測結果:\n",result_for_test_f, file=f)
#決定木の出力
filename = "./output/tree.png"
dot_data=tree.export_graphviz(clf_fit.best_estimator_[0], out_file=None,proportion=True)
graph = pdp.graph_from_dot_data(dot_data)
graph.write_png(filename)
def calculation_indicator(DF,Window_days,Alpha):
DF["expClose"]=DF["Close"]
DF["expHigh"]=DF["High"]
DF["expLow"]=DF["Low"]
DF["expVolume"]=DF["Volume"]
#exponential_smootingの計算
for i in range(1,len(DF)):
DF["expClose"][i]=Alpha*DF["Close"][i]+(1-Alpha)*DF["expClose"][i-1]
DF["expHigh"][i]=Alpha*DF["High"][i]+(1-Alpha)*DF["expHigh"][i-1]
DF["expLow"][i]=Alpha*DF["Low"][i]+(1-Alpha)*DF["expLow"][i-1]
DF["expVolume"][i]=Alpha*DF["Volume"][i]+(1-Alpha)*DF["expVolume"][i-1]
#targetの追加
DF["target"]=0.0
for i in range(0,len(DF)-Window_days):
if((DF["Close"][i+Window_days]-DF["Close"][i])>0.0):
DF["target"][i]=int(1)
else:
DF["target"][i]=int(-1)
#RSIの計算
close=np.array(DF["expClose"])
rsi=talib.RSI(close,timeperiod=14)
DF["RSI"]=rsi
#Stochastic_Ocillatorの計算
high=np.array(DF["expHigh"])
low=np.array(DF["expLow"])
DF["%K"]=0.0
for i in range(14,len(DF)):
H14=DF["expHigh"][i-14:i-1].max()
L14=DF["expLow"][i-14:i-1].min()
DF["%K"][i-1]=100*(DF["expClose"][i-1]-L14)/(H14-L14)
#Williams_%Rの計算
willr=talib.WILLR(high,low,close,timeperiod=14)
DF["%R"]=willr
#MACDの計算
ema12=talib.EMA(close,timeperiod=12)
ema26=talib.EMA(close,timeperiod=26)
macd=ema12-ema26
DF["MACD"]=macd
#Price_Rate_Of_Changeの計算
DF["PROC"]=0.0
for i in range(Window_days,len(DF)):
DF["PROC"][i-1]=(DF["expClose"][i]-DF["expClose"][i-Window_days])/DF["expClose"][i-Window_days]
#OBVの計算
DF["OBV"]=0.0
volume=np.array(DF["expVolume"])
obv=talib.OBV(close,volume)
DF["OBV"]=obv
np.set_printoptions(edgeitems=20)
#print(macd)
DF.to_csv('./output/all_data.csv')
return DF[["RSI","%K","%R","MACD","PROC","OBV"]],DF[["target"]]
main()
calculation_time=time.time()-start
print ("calculation_time:{0:.2f}".format(calculation_time) + "[sec]")
1つ1つ解説してみる。
まずはimportとwarningsのignoreを書いておく。
import time
import warnings
warnings.filterwarnings('ignore')
import os
import numpy as np
import pandas as pd
import pandas_datareader as pdr
import matplotlib.pyplot as plt
import pydotplus as pdp
import talib
from collections import OrderedDict
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn import tree
次に、計算時間の測定のためのtime関数を挟んでおく。処理時間を知りたい部分に挿入する。ここでは、importの下とmain関数の実行部分の下、つまりコードのほぼ最後に挟んでいる。
makedirsは出力するデータを格納するためのフォルダを作成するために記載。
(作業コードと同じディレクトリに出力してもいいけど、見た目があんまり綺麗じゃない気がするので。)
start=time.time()
output_dir = './output'
os.makedirs(output_dir, exist_ok=True)
次にmain関数の中身。
pandasのdatareaderを使って株価の日足データを取得。
head(5)で先頭5行文だけ確認しておく。
NAME='SNE'
df1=pdr.DataReader(NAME,'yahoo',start='1999-6-01',end='2020-06-01')
predict_data_start='2000-01-01'
predict_data_end='2020-01-01'
print("インポートしたデータの確認:")
print("分析銘柄:",NAME)
print(df1.head(5),"\n")
必要な列のみ取り出したり、欠損値が無いかどうか確認。
df1["Close"]=df1["Adj Close"]
df=df1[["High","Low","Open","Close","Volume"]]
print("欠損値の確認(全てFalseであることを確認):")
print(df.isnull().all())
変数の宣言や、空のリストを準備したり。
Alpha=0.9
data=pd.DataFrame([])
target=pd.DataFrame([])
train_data_list=[]
test_data_list=[]
train_target_list=[]
test_target_list=[]
Window_days_list=[20,40,60]
1回の実行で複数の営業日後の株価予測をしようとしたので、for文。
#20,40,60営業日後に対する予測結果の計算
for Window_days in Window_days_list:
#インジケータの計算
data,target=calculation_indicator(df,Window_days,Alpha)
condition=((data.index>predict_data_start) & (data.index<predict_data_end))
data=data.loc[condition]
target=target.loc[condition]
#データをtrain/testに分ける
train_data,test_data,train_target,test_target=train_test_split(data,target,shuffle=False,test_size=0.1,random_state=0)
#ランダムフォレストの計算
go_RandomForest(train_data.values,test_data.values,train_target.values,test_target.values,Window_days)
train_data_list.append(train_data)
test_data_list.append(test_data_list)
train_target_list.append(train_target)
test_target_list.append(test_target)
#OOBエラーの計算
#OOB_error(df,train_data_list,test_data_list,train_target_list,test_target_list)
疲れたので、ひとまずここまで。