はじめに
前回の以下記事の続編です。
Kerasを使って米国株時系列データをRNNで機械学習させてみる
今度はKerasのニューラルネットワーク・モデルを使って株価チャートのパターン認識をさせてみようと思います。具体的には以下のようなU字型チャートの認識を目指します。今回時系列の長さは40としました。
動作環境
動作環境は以下の通りです。
python==3.8.6
numpy==1.19.4
matplotlib==3.3.3
pandas==1.1.4
pandas_datareader==0.9.0
データの準備
前回と同様、分析の対象としたのは、S&P500採用銘柄(約500銘柄)の約5年分の日次データ(pandas-DataFrame形式)です。データはpandas-datareaderを使ってダウンロードし、pandas-DataFrameオブジェクトをpickleして保管しました。データ総量は30MB程度ですが、ダウンロードには20分くらいかかりました。
import pandas_datareader as pdr
from pandas import to_pickle
# Read S&P500 tickers
with open('SP500.txt', "r") as f_in: #SP500.txtに米国株のティッカーシンボルを格納
ticker_list = [v.rstrip() for v in f_in.readlines()]
# Get data from Yahoo
pd_data = pdr.get_data_yahoo(ticker_list)
# pickele the pandas-DataFrame object
to_pickle(pd_data, 'pd_data.pkl')
学習用データの作成
正解データの作成
正解データは二次関数にランダムウォークを追加する形で生成しました。ランダムウォークは正規分布に従うと仮定しました。多様性を持たせるため、1日の期待変動率とU字型の底値が高値から何パーセントの位置にあるか、の2パラメータを現実的な範囲でランダムに生成させるようにしました。
import numpy as np
def gen_cup(len): #二次関数の時系列を作成
s = 0.01 + np.random.choice(10)/1000 #1日の期待変動率
c = 0.7 + np.random.choice(20)/100 #U字型の底値(高値の何パーセントか)
dt = 2/len
x = np.cumsum(np.ones(len, dtype="float32") * dt)
f = lambda x : (1-c)*(x-1)**2+c #二次関数
y = f(x) + np.random.standard_normal(len) * s
return y
def gen_data1(nb_of_samples, length): #ニューラルネットワークの入力データを成形
sequences = [] #時系列データを格納
target = [] #正解データを格納
#正規化する関数
f = lambda x: (x - np.nanmin(x)) / (np.nanmax(x) - np.nanmin(x))
for i in range(nb_of_samples):
data_array = gen_cup(length)
data_array = f(data_array) #正規化
sequences.append(data_array)
target.append(1) #正解は1
#学習データを成形
X = np.array(sequences).reshape(len(sequences), length, 1)
#正解データを成形
t = np.array(target).reshape(len(target))
return X, t
len_sequence = 40 #時系列データ長
num_training_samples = 5000 #学習データのサンプル数(正解データ、不正解データで2倍になる)
X1, t1 = gen_data1(num_training_samples, len_sequence)
生成された時系列の例
不正解データの作成
不正解データは最初にダウンロードしてきた実際の株価データから、ランダムにサンプルを抽出し、U字型になっている可能性があるデータを除くようにしました。具体的には期間の最初と最後で、正規化された値が0.9より大きいサンプルを除きました。
import pandas as pd
import random
def gen_data2(dataframe, nb_of_samples, length, arg1=0):
max_start_index = len(dataframe) - length
ticker_list = df['Close'].columns #dfから銘柄リストを作成
sequences = [] #時系列データを格納
target = [] #正解データを格納
#正規化する関数
f = lambda x: (x - np.nanmin(x)) / (np.nanmax(x) - np.nanmin(x))
while len(sequences) < nb_of_samples:
start_index = np.random.choice(max_start_index) #開始日をランダムに選択
end_index = start_index + length
ticker = random.choice(ticker_list) #ランダムな銘柄を選択
temp_df = dataframe['Close'][ticker][start_index:end_index]
if (temp_df.count() == length).all(): #データ中に欠損値がない場合
data_array = temp_df.values.astype('float32')
data_array = f(data_array) #正規化
#arg1==0指定時、期間の最初と最後の値が0.9より大きい場合は除外する
if arg1==0 and data_array[-1]>0.9 and data_array[0]>0.9:
continue
sequences.append(data_array)
target.append(0) #不正解は0
#学習データを成形
X = np.array(sequences).reshape(len(sequences), length, 1)
#正解データを成形
t = np.array(target).reshape(len(target))
return X, t
df = pd.read_pickle('pd_data.pkl') #pickleしたpandas-DataFrameの読み込み
X2, t2 = gen_data2(df, num_training_samples, len_sequence)
抽出された不正解データの例
正解データと不正解データを結合
生成された正解データと、抽出された不正解データを結合して、1セットの学習用データとします。Kerasは学習用データのシャッフル機能があるようですが、念のため、正解と不正解が、どの部分でも均等に含まれるように、正解と不正解を交互に結合しました。
X = np.stack([X1[0], X2[0]])
t = np.hstack((t1[0], t2[0]))
for i in range(num_training_samples-1):
temp_X = np.stack([X1[i+1], X2[i+1]])
X = np.vstack((X, temp_X))
t = np.hstack((t, t1[i+1]))
t = np.hstack((t, t2[i+1]))
del X1,X2,t1,t2,temp_X
モデルの作成
モデルは前回と同様にRNN(LSTM)を採用しました。前回との違いは入力データの次元が2->1となったことです。
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import LSTM, Dense
input_dim = 1 #入力データの次元数:株価のみ
output_dim = 1 #出力データの次元数
num_hidden_units = 128 #隠れ層のユニット数
#モデル定義
model = Sequential()
model.add(LSTM(
num_hidden_units,
input_shape=(len_sequence, input_dim),
return_sequences=False))
model.add(Dense(output_dim, activation='sigmoid'))
#モデルをコンパイル
model.compile(loss="binary_crossentropy", optimizer="rmsprop", metrics=["accuracy"])
model.summary()
学習
モデルを作成したら、機械学習をさせます。今回は10,000サンプルを用いてミニバッチサイズ200、学習エポック数50としました。
batch_size = 200 #ミニバッチサイズ
num_of_training_epochs = 50 #学習エポック数
#学習
model.fit(
X, t,
batch_size=batch_size,
epochs=num_of_training_epochs,
validation_split=0.1
)
学習結果
50エポック終了時点で以下の状況でした。
Epoch 50/50
45/45 [==============================] - 5s 110ms/step - loss: 0.0203 - accuracy: 0.9924 - val_loss: 0.0151 - val_accuracy: 0.9940
評価
評価は以下のように行いました。
- ダウンロードした実際の株価データから新たにランダムにサンプルを抽出
- サンプルを学習済モデルに投入
- 出力が0.9より大きいサンプルが検出されたらグラフ(図)を作成して保存
- グラフを目視で評価
import matplotlib.pyplot as plt
#評価
j = 0 #グラフの作成個数カウント用
while j < 16: #グラフが16個できたら終了
X_test, t = gen_data2(df, 1, len_sequence, arg1=1) #サンプルを1個抽出
predictions = model.predict(X_test) #学習済モデルに投入
if predictions[0,0] > 0.9: #0.9より大きい場合、グラフを作成して保存
j = j + 1
print(predictions[0,0])
plt.plot(X_test[0])
plt.savefig('./images/result' + str(j) + '.png')
plt.close()
0.9以上を得たサンプルのグラフ一覧
得られた出力は以下の通りでした。対応するグラフは左上右上左下右下の順で並んでいます。
0.9220923
0.95872766
0.99880373
0.97952735
0.9435309
0.99737906
0.9997492
0.9186541
0.90825605
0.9997
0.9841814
0.9131614
0.94787455
0.98044866
0.981392
0.9942799
総評
得られた16枚には明らかに誤った判定といえるものはなく、概ね正しく判定できているように思います。