データの可視化
タイムバー形式のデータを可視化
bar.py
plt.title("リターンのヒストグラム")
time_bar["cl"].pct_change().hist(bins=50)
前処理
対数差分の処理をそれぞれのデータに施す.
df = time_bar.copy()
# 対数処理
df["log_open"] = np.log(df["op"])
df["log_high"] = np.log(df["hi"])
df["log_low"] = np.log(df["lo"])
df["log_close"] = np.log(df["cl"])
# 対数差分処理
df["diff_log_open"] = df["log_open"].diff()
df["diff_log_high"] = df["log_high"].diff()
df["diff_log_low"] = df["log_low"].diff()
df["diff_log_close"] = df["log_close"].diff()
df["diff_volume"] = df["volume"].diff()
df.dropna(inplace=True)
daily_df_sorted_replaced = pd.pivot_table(daily_df_sorted,index="Date",columns="Code",values="log_diff_close")
daily_df_sorted_replaced =daily_df_sorted_replaced.replace([np.inf, -np.inf], 0)
daily_df_sorted_replaced = daily_df_sorted_replaced.fillna(0)
ewm_span=90
threshold = 2
ewm_mean = daily_df_sorted_replaced.ewm(span=ewm_span).mean() # 指数加重移動平均
ewm_std = daily_df_sorted_replaced.ewm(span=ewm_span).std() # 指数加重移動標準偏差
daily_df_sorted_replaced = daily_df_sorted_replaced.where((daily_df_sorted_replaced - ewm_mean).abs() < ewm_std * threshold,daily_df_sorted_replaced.ewm(span=90).mean())
daily_df_sorted_replaced = daily_df_sorted_replaced.reset_index().melt(id_vars=['Date']).rename(columns={"value":"log_diff_fill_close"})
#daily_df_sorted_replaced = daily_df_sorted_replaced.reset_index().melt(id_vars=['Date'])
ADF検定による,定常性の確保.
# ADF検定用パッケージ
from statsmodels.tsa.stattools import a
def adf_test(data, sig_level = 0.05, do_print=True) -> bool:
"""
ADF検定を実施する関数
Args:
data: 検定対象の系列データ
sig_level: 有意水準
do_print: 検定結果をprintするかどうか
Returns:
bool: Trueの場合定常,Falseの場合非定常を表す
"""
if do_print:
print('Results of Dickey-Fuller Examination:')
dftest = adfuller(data)
dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
if do_print:
print(dfoutput)
return dfoutput['p-value'] < sig_level
AIモデル
AR(1)モデル
# 関数の用意
# 可視化のための関数
def plot_result(target, pred, title, ylabel):
plt.figure(figsize=(15,5))
plt.plot(target.index, target, c='blue', label='実際', marker='.')
plt.plot(target.index, pred, c='r', label='予測値', marker='.')
plt.ylabel(ylabel, fontsize=17)
plt.legend()
plt.title(title)
plt.show()
# 残差の可視化のための関数
def plot_resid(target, pred, title, ylabel):
plt.figure(figsize=(15,5))
plt.plot(target.index, target-pred, c='green', marker='.')
plt.ylabel(ylabel, fontsize=17)
plt.title(title)
plt.show()
# train validationの分割のための関数
def timeseries_train_val_split(Xy, target="y"):
# 時系列の前半75%を学習, 後半25%を検証に利用する
train, val = Xy[:int(len(Xy)*0.75)], Xy[int(len(Xy)*0.75)+10:]
trainX = train.drop([target],axis=1)
trainy = train[target]
valX = val.drop([target],axis=1)
valy = val[target]
return trainX, trainy, valX, valy
from sklearn.metrics import accuracy_score
# 上昇もしくは下落の予測のaccuracyを測定する関数
def eval_direction(target, pred):
target = np.where(np.array(target) > 0, 1, -1)
pred = np.where(np.array(pred) > 0, 1, -1)
print("accuracy", accuracy_score(target, pred))
# 回帰モデルの導入
from sklearn.linear_model import LinearRegression
# データと予測対象になるラベルの用意
Xy = df[["diff_log_close"]]
# 予測対象として,diff_log_closeの1ステップ先の値を設定しています.
Xy["y"] = df["diff_log_close"].shift(-1)
Xy.dropna(inplace=True)
Xy.head()
# trainとvalidationの分割
trainX, trainy, valX, valy = timeseries_train_val_split(Xy, target="y")
# AR(1)モデルの設定とfitting
lr = LinearRegression()
lr.fit(trainX[["diff_log_close"]], trainy)
pred = lr.predict(valX[["diff_log_close"]])
# 結果のプロット
plot_result(target=valy, pred=pred, title="AR(1)モデルの予測結果", ylabel="diff_log_close")
plot_resid(valy, pred, title="AR(1)モデルの誤差", ylabel="diff_log_close")
機械学習モデル
# データとラベルの用意
Xy = df[["diff_log_open", "diff_log_high", "diff_log_low", "diff_log_close", "diff_volume"]]
Xy["y"] = df["diff_log_close"].shift(-1)
Xy.dropna(inplace=True)
Xy.head()
# random forestモデルの用意
from sklearn.ensemble import RandomForestRegressor
# trainとvalidationを分割
trainX, trainy, valX, valy = timeseries_train_val_split(Xy, target="y")
# モデルの定義とfitting
rf = RandomForestRegressor(random_state=0)
rf.fit(trainX,trainy)
pred = rf.predict(valX)
# 可視化
plot_result(target=valy, pred=pred, title="ランダムフォレストモデルの予測結果", ylabel="diff_log_close")
plot_resid(valy, pred, title="ランダムフォレストモデルの誤差", ylabel="diff_log_close")
ディープラーニングモデル
# データとラベルの用意
# 過去10本分の平均と標準偏差を用いて標準化を実施(未来情報を参照しないように標準化)
df["scaled_log_open"] = (df["log_open"] - df["log_open"].rolling(10).mean()) / df["log_open"].rolling(10).std()
df["scaled_log_high"] = (df["log_high"] - df["log_high"].rolling(10).mean()) / df["log_high"].rolling(10).std()
df["scaled_log_low"] = (df["log_low"] - df["log_low"].rolling(10).mean()) / df["log_low"].rolling(10).std()
df["scaled_log_close"] = (df["log_close"] - df["log_close"].rolling(10).mean()) / df["log_close"].rolling(10).std()
Xy = df[["scaled_log_open", "scaled_log_high", "scaled_log_low", "scaled_log_close", "diff_log_close"]]
Xy["y"] = df["diff_log_close"].shift(-1)
Xy.dropna(inplace=True)
Xy.head()
from torch import nn
# LSTMを用いた回帰モデルの定義
class LSTMRegressor(nn.Module):
def __init__(self, input_size, output_size, hidden_size, batch_first):
super(LSTMRegressor, self).__init__()
# lstm層の設定
self.lstm = nn.LSTM(
input_size=input_size, hidden_size=hidden_size, batch_first=batch_first
)
# 出力層の設定
self.output_layer = nn.Linear(hidden_size, output_size)
def forward(self, inputs):
h, _ = self.lstm(inputs)
# LSTM層から出力される隠れ層を出力層に通して予測結果を得る
output = self.output_layer(h[:, -1])
return output
import torch
from torch.utils.data import Dataset
# LSTMを学習するためのデータセット
class TimeBarDataset(Dataset):
def __init__(self, df, features, target, sequence_length):
self.df = df
self.features = features
self.target = target
self.sequence_length = sequence_length
self.index = df.index
def __len__(self):
return len(self.index) - self.sequence_length + 1
def __getitem__(self, idx):
# startからendの期間の長さ(self.sequence_length)の分の長さを入力として利用します.
start = self.index[idx]
end = self.index[idx + self.sequence_length-1]
# X.shape = (self.sequence_length(入力時系列の長さ) x input_size(特徴量の個数)) のテンソル
# y.shape = (output_size) のテンソル
return torch.from_numpy(self.df[start: end][self.features].values).float(), torch.from_numpy(self.df[end:end][self.target].values).float()
import torch.optim as optim
# Trainerの設定
class Trainer:
def __init__(self, model, epochs, lr):
self.sequence_length = sequence_length
# model
self.model = model
self.best_model = self.model
# train pram
self.lr = lr
self.epochs = epochs
self.device = torch.device("cpu") # 演習ではcpuを利用します.
# loss functionの設定
self.criterion = nn.MSELoss(reduction="mean")
# optimizerの設定
self.optimizer = optim.Adam(
self.model.parameters(), lr=lr, betas=(0.9, 0.999), amsgrad=True
)
def train(self, train_dataloader, val_dataloader):
"""学習の実施
"""
best_loss = float("inf")
for epoch in range(self.epochs):
print("-------------")
print("Epoch {}/{}".format(epoch + 1, self.epochs))
print("-------------")
train_loss = 0.0
test_loss = 0.0
self.model.train()
for (X,y) in train_dataloader:
out = self.model(X)
loss = self.criterion(out, y)
self.optimizer.zero_grad()
loss.backward()
# 学習の安定のために勾配のクリップを設定
nn.utils.clip_grad_value_(self.model.parameters(), clip_value=2.0)
self.optimizer.step()
train_loss += loss.item()
self.model.eval()
with torch.no_grad():
for batch in val_dataloader:
out = self.model(X)
loss = self.criterion(out, y)
test_loss += loss.item()
train_loss /= len(train_dataloader)
test_loss /= len(val_dataloader)
# テスト誤差が最小のものをbest_modelとする
if test_loss < best_loss:
best_loss = test_loss
self.best_model = self.model
print("save best model!!")
print("loss: {:.3}, test_loss: {:.3}".format(train_loss, test_loss))
def predict(self, val_dataset):
"""予測
"""
res = []
self.best_model.eval()
with torch.no_grad():
for (X, y) in val_dataset:
out = self.best_model(X.unsqueeze(0))
res.append(out.item())
return res
import torch.optim as optim
# Trainerの設定
class Trainer:
def __init__(self, model, epochs, lr):
self.sequence_length = sequence_length
# model
self.model = model
self.best_model = self.model
# train pram
self.lr = lr
self.epochs = epochs
self.device = torch.device("cpu") # 演習ではcpuを利用します.
# loss functionの設定
self.criterion = nn.MSELoss(reduction="mean")
# optimizerの設定
self.optimizer = optim.Adam(
self.model.parameters(), lr=lr, betas=(0.9, 0.999), amsgrad=True
)
def train(self, train_dataloader, val_dataloader):
"""学習の実施
"""
best_loss = float("inf")
for epoch in range(self.epochs):
print("-------------")
print("Epoch {}/{}".format(epoch + 1, self.epochs))
print("-------------")
train_loss = 0.0
test_loss = 0.0
self.model.train()
for (X,y) in train_dataloader:
out = self.model(X)
loss = self.criterion(out, y)
self.optimizer.zero_grad()
loss.backward()
# 学習の安定のために勾配のクリップを設定
nn.utils.clip_grad_value_(self.model.parameters(), clip_value=2.0)
self.optimizer.step()
train_loss += loss.item()
self.model.eval()
with torch.no_grad():
for batch in val_dataloader:
out = self.model(X)
loss = self.criterion(out, y)
test_loss += loss.item()
train_loss /= len(train_dataloader)
test_loss /= len(val_dataloader)
# テスト誤差が最小のものをbest_modelとする
if test_loss < best_loss:
best_loss = test_loss
self.best_model = self.model
print("save best model!!")
print("loss: {:.3}, test_loss: {:.3}".format(train_loss, test_loss))
def predict(self, val_dataset):
"""予測
"""
res = []
self.best_model.eval()
with torch.no_grad():
for (X, y) in val_dataset:
out = self.best_model(X.unsqueeze(0))
res.append(out.item())
return res
from torch.utils.data import DataLoader
# trainとvalidationを分割
train, val = Xy[:int(len(Xy)*0.7)], Xy[int(len(Xy)*0.7)+10:]
# seedの固定
np.random.seed(0)
torch.manual_seed(0)
# 各種パラメータの設定
# 入力長の設定
sequence_length = 10
# モデルのパラメータ設定
input_size = 4
output_size = 1
hidden_size = 32
batch_first = True
# その他パラメータの設定
epochs = 10
lr = 0.01
# datasetの用意
train_dataset = TimeBarDataset(train, ["scaled_log_open", "scaled_log_high", "scaled_log_low", "scaled_log_close"], "y", sequence_length)
val_dataset = TimeBarDataset(val, ["scaled_log_open", "scaled_log_high", "scaled_log_low", "scaled_log_close"], "y", sequence_length)
print("train:", len(train_dataset), "val: ", len(val_dataset))
# 学習用Dataloader
train_dataloader = DataLoader(
train_dataset,
batch_size=16,
shuffle=True,
num_workers=2,
drop_last=True,
pin_memory=True
)
val_dataloader = DataLoader(
val_dataset,
batch_size=16,
shuffle=False,
num_workers=2,
drop_last=True,
pin_memory=True
)
model = LSTMRegressor(input_size,output_size,hidden_size,batch_first)
trainer = Trainer(model,epochs,lr)
trainer.train(train_dataloader, val_dataloader)
# val_datasetを用いて予測
pred = trainer.predict(val_dataset)
# LSTMの都合上初めのsequence_length-1の長さ分はNoneで埋める
pred = [None]*(sequence_length-1) + pred
# 可視化
plot_result(target=val["y"], pred=pred, title="LSTMモデルの予測結果", ylabel="diff_log_close")
plot_resid(val["y"][sequence_length-1:], pred[sequence_length-1:], title="LSTMモデルの誤差", ylabel="diff_log_close")