LoginSignup
0
0

機械学習_ファイナンス

Posted at

データの可視化

タイムバー形式のデータを可視化

bar.py
plt.title("リターンのヒストグラム")
time_bar["cl"].pct_change().hist(bins=50)

前処理

対数差分の処理をそれぞれのデータに施す.

df = time_bar.copy()
# 対数処理
df["log_open"] = np.log(df["op"])
df["log_high"] = np.log(df["hi"])
df["log_low"] = np.log(df["lo"])
df["log_close"] = np.log(df["cl"])
# 対数差分処理
df["diff_log_open"] = df["log_open"].diff()
df["diff_log_high"] = df["log_high"].diff()
df["diff_log_low"] = df["log_low"].diff()
df["diff_log_close"] = df["log_close"].diff()
df["diff_volume"] = df["volume"].diff()
df.dropna(inplace=True)

daily_df_sorted_replaced = pd.pivot_table(daily_df_sorted,index="Date",columns="Code",values="log_diff_close")
daily_df_sorted_replaced =daily_df_sorted_replaced.replace([np.inf, -np.inf], 0)
daily_df_sorted_replaced = daily_df_sorted_replaced.fillna(0)

ewm_span=90
threshold = 2
ewm_mean = daily_df_sorted_replaced.ewm(span=ewm_span).mean()  # 指数加重移動平均
ewm_std = daily_df_sorted_replaced.ewm(span=ewm_span).std()  # 指数加重移動標準偏差
daily_df_sorted_replaced = daily_df_sorted_replaced.where((daily_df_sorted_replaced - ewm_mean).abs() < ewm_std * threshold,daily_df_sorted_replaced.ewm(span=90).mean())

daily_df_sorted_replaced = daily_df_sorted_replaced.reset_index().melt(id_vars=['Date']).rename(columns={"value":"log_diff_fill_close"})
#daily_df_sorted_replaced = daily_df_sorted_replaced.reset_index().melt(id_vars=['Date'])

ADF検定による,定常性の確保.

# ADF検定用パッケージ
from statsmodels.tsa.stattools import a

def adf_test(data, sig_level = 0.05, do_print=True) -> bool:
    """
    ADF検定を実施する関数
    Args:
        data: 検定対象の系列データ
        sig_level: 有意水準
        do_print: 検定結果をprintするかどうか
    Returns:
        bool: Trueの場合定常,Falseの場合非定常を表す
    """
    if do_print:
        print('Results of Dickey-Fuller Examination:')
    dftest = adfuller(data)
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    if do_print:
        print(dfoutput)
    return dfoutput['p-value'] < sig_level

AIモデル

AR(1)モデル

# 関数の用意

# 可視化のための関数
def plot_result(target, pred, title, ylabel):
    plt.figure(figsize=(15,5))
    plt.plot(target.index, target, c='blue', label='実際', marker='.')
    plt.plot(target.index, pred, c='r', label='予測値', marker='.')
    plt.ylabel(ylabel, fontsize=17)
    plt.legend()
    plt.title(title)
    plt.show()

# 残差の可視化のための関数
def plot_resid(target, pred, title, ylabel):
    plt.figure(figsize=(15,5))
    plt.plot(target.index, target-pred, c='green', marker='.')
    plt.ylabel(ylabel, fontsize=17)
    plt.title(title)
    plt.show()

# train validationの分割のための関数
def timeseries_train_val_split(Xy, target="y"):
    # 時系列の前半75%を学習, 後半25%を検証に利用する
    train, val = Xy[:int(len(Xy)*0.75)], Xy[int(len(Xy)*0.75)+10:]
    trainX = train.drop([target],axis=1)
    trainy = train[target]
    valX = val.drop([target],axis=1)
    valy = val[target]
    return trainX, trainy, valX, valy

from sklearn.metrics import accuracy_score
# 上昇もしくは下落の予測のaccuracyを測定する関数
def eval_direction(target, pred):
    target = np.where(np.array(target) > 0, 1, -1)
    pred = np.where(np.array(pred) > 0, 1, -1)
    print("accuracy", accuracy_score(target, pred))

# 回帰モデルの導入
from sklearn.linear_model import LinearRegression
# データと予測対象になるラベルの用意
Xy = df[["diff_log_close"]]
# 予測対象として,diff_log_closeの1ステップ先の値を設定しています.
Xy["y"] = df["diff_log_close"].shift(-1)
Xy.dropna(inplace=True)
Xy.head()

# trainとvalidationの分割
trainX, trainy, valX, valy = timeseries_train_val_split(Xy, target="y")

# AR(1)モデルの設定とfitting
lr = LinearRegression()
lr.fit(trainX[["diff_log_close"]], trainy)
pred = lr.predict(valX[["diff_log_close"]])

# 結果のプロット
plot_result(target=valy, pred=pred, title="AR(1)モデルの予測結果", ylabel="diff_log_close")
plot_resid(valy, pred, title="AR(1)モデルの誤差", ylabel="diff_log_close")

機械学習モデル

# データとラベルの用意
Xy = df[["diff_log_open", "diff_log_high", "diff_log_low", "diff_log_close", "diff_volume"]]
Xy["y"] = df["diff_log_close"].shift(-1)
Xy.dropna(inplace=True)
Xy.head()

# random forestモデルの用意
from sklearn.ensemble import RandomForestRegressor

# trainとvalidationを分割
trainX, trainy, valX, valy = timeseries_train_val_split(Xy, target="y")

# モデルの定義とfitting
rf = RandomForestRegressor(random_state=0)
rf.fit(trainX,trainy)
pred = rf.predict(valX)

# 可視化
plot_result(target=valy, pred=pred, title="ランダムフォレストモデルの予測結果", ylabel="diff_log_close")
plot_resid(valy, pred, title="ランダムフォレストモデルの誤差", ylabel="diff_log_close")

ディープラーニングモデル

# データとラベルの用意
# 過去10本分の平均と標準偏差を用いて標準化を実施(未来情報を参照しないように標準化)
df["scaled_log_open"] = (df["log_open"] - df["log_open"].rolling(10).mean()) / df["log_open"].rolling(10).std()
df["scaled_log_high"] = (df["log_high"] - df["log_high"].rolling(10).mean()) / df["log_high"].rolling(10).std()
df["scaled_log_low"] = (df["log_low"] - df["log_low"].rolling(10).mean()) / df["log_low"].rolling(10).std()
df["scaled_log_close"] = (df["log_close"] - df["log_close"].rolling(10).mean()) / df["log_close"].rolling(10).std()

Xy = df[["scaled_log_open", "scaled_log_high", "scaled_log_low", "scaled_log_close", "diff_log_close"]]
Xy["y"] = df["diff_log_close"].shift(-1)
Xy.dropna(inplace=True)
Xy.head()

from torch import nn
# LSTMを用いた回帰モデルの定義
class LSTMRegressor(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, batch_first):
        super(LSTMRegressor, self).__init__()
        # lstm層の設定
        self.lstm = nn.LSTM(
            input_size=input_size, hidden_size=hidden_size, batch_first=batch_first
        )
        # 出力層の設定
        self.output_layer = nn.Linear(hidden_size, output_size)

    def forward(self, inputs):
        h, _ = self.lstm(inputs)
        # LSTM層から出力される隠れ層を出力層に通して予測結果を得る
        output = self.output_layer(h[:, -1])
        return output

        import torch
from torch.utils.data import Dataset

# LSTMを学習するためのデータセット
class TimeBarDataset(Dataset):
    def __init__(self, df, features, target, sequence_length):
        self.df = df
        self.features = features
        self.target = target
        self.sequence_length = sequence_length
        self.index = df.index

    def __len__(self):
        return len(self.index) - self.sequence_length + 1

    def __getitem__(self, idx):
        # startからendの期間の長さ(self.sequence_length)の分の長さを入力として利用します.
        start = self.index[idx]
        end = self.index[idx + self.sequence_length-1]
        # X.shape = (self.sequence_length(入力時系列の長さ) x input_size(特徴量の個数)) のテンソル
        # y.shape = (output_size) のテンソル
        return torch.from_numpy(self.df[start: end][self.features].values).float(), torch.from_numpy(self.df[end:end][self.target].values).float()

import torch.optim as optim

# Trainerの設定
class Trainer:
    def __init__(self, model, epochs, lr):
        self.sequence_length = sequence_length
        # model
        self.model = model
        self.best_model = self.model
        # train pram
        self.lr = lr
        self.epochs = epochs
        self.device = torch.device("cpu") # 演習ではcpuを利用します.

        # loss functionの設定
        self.criterion = nn.MSELoss(reduction="mean")
        # optimizerの設定
        self.optimizer = optim.Adam(
            self.model.parameters(), lr=lr, betas=(0.9, 0.999), amsgrad=True
        )

    def train(self, train_dataloader, val_dataloader):
        """学習の実施
        """
        best_loss = float("inf")
        for epoch in range(self.epochs):
            print("-------------")
            print("Epoch {}/{}".format(epoch + 1, self.epochs))
            print("-------------")
            train_loss = 0.0
            test_loss = 0.0
            self.model.train()
            for (X,y) in train_dataloader:
                out = self.model(X)
                loss = self.criterion(out, y)
                self.optimizer.zero_grad()
                loss.backward()
                # 学習の安定のために勾配のクリップを設定
                nn.utils.clip_grad_value_(self.model.parameters(), clip_value=2.0)
                self.optimizer.step()
                train_loss += loss.item()

            self.model.eval()
            with torch.no_grad():
                for batch in val_dataloader:
                    out = self.model(X)
                    loss = self.criterion(out, y)
                    test_loss += loss.item()

            train_loss /= len(train_dataloader)
            test_loss /= len(val_dataloader)
            # テスト誤差が最小のものをbest_modelとする
            if test_loss < best_loss:
                best_loss = test_loss
                self.best_model = self.model
                print("save best model!!")
            print("loss: {:.3}, test_loss: {:.3}".format(train_loss, test_loss))

    def predict(self, val_dataset):
        """予測
        """
        res = []
        self.best_model.eval()
        with torch.no_grad():
            for (X, y) in val_dataset:
                out = self.best_model(X.unsqueeze(0))
                res.append(out.item())
        return res


import torch.optim as optim

# Trainerの設定
class Trainer:
    def __init__(self, model, epochs, lr):
        self.sequence_length = sequence_length
        # model
        self.model = model
        self.best_model = self.model
        # train pram
        self.lr = lr
        self.epochs = epochs
        self.device = torch.device("cpu") # 演習ではcpuを利用します.

        # loss functionの設定
        self.criterion = nn.MSELoss(reduction="mean")
        # optimizerの設定
        self.optimizer = optim.Adam(
            self.model.parameters(), lr=lr, betas=(0.9, 0.999), amsgrad=True
        )

    def train(self, train_dataloader, val_dataloader):
        """学習の実施
        """
        best_loss = float("inf")
        for epoch in range(self.epochs):
            print("-------------")
            print("Epoch {}/{}".format(epoch + 1, self.epochs))
            print("-------------")
            train_loss = 0.0
            test_loss = 0.0
            self.model.train()
            for (X,y) in train_dataloader:
                out = self.model(X)
                loss = self.criterion(out, y)
                self.optimizer.zero_grad()
                loss.backward()
                # 学習の安定のために勾配のクリップを設定
                nn.utils.clip_grad_value_(self.model.parameters(), clip_value=2.0)
                self.optimizer.step()
                train_loss += loss.item()

            self.model.eval()
            with torch.no_grad():
                for batch in val_dataloader:
                    out = self.model(X)
                    loss = self.criterion(out, y)
                    test_loss += loss.item()

            train_loss /= len(train_dataloader)
            test_loss /= len(val_dataloader)
            # テスト誤差が最小のものをbest_modelとする
            if test_loss < best_loss:
                best_loss = test_loss
                self.best_model = self.model
                print("save best model!!")
            print("loss: {:.3}, test_loss: {:.3}".format(train_loss, test_loss))

    def predict(self, val_dataset):
        """予測
        """
        res = []
        self.best_model.eval()
        with torch.no_grad():
            for (X, y) in val_dataset:
                out = self.best_model(X.unsqueeze(0))
                res.append(out.item())
        return res

from torch.utils.data import DataLoader

# trainとvalidationを分割
train, val = Xy[:int(len(Xy)*0.7)], Xy[int(len(Xy)*0.7)+10:]
# seedの固定
np.random.seed(0)
torch.manual_seed(0)

# 各種パラメータの設定
# 入力長の設定
sequence_length = 10
# モデルのパラメータ設定
input_size = 4
output_size = 1
hidden_size = 32
batch_first = True
# その他パラメータの設定
epochs = 10
lr = 0.01

# datasetの用意
train_dataset = TimeBarDataset(train, ["scaled_log_open", "scaled_log_high", "scaled_log_low", "scaled_log_close"], "y", sequence_length)
val_dataset = TimeBarDataset(val, ["scaled_log_open", "scaled_log_high", "scaled_log_low", "scaled_log_close"], "y", sequence_length)
print("train:", len(train_dataset), "val: ", len(val_dataset))

# 学習用Dataloader
train_dataloader = DataLoader(
    train_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=2,
    drop_last=True,
    pin_memory=True
)
val_dataloader = DataLoader(
    val_dataset,
    batch_size=16,
    shuffle=False,
    num_workers=2,
    drop_last=True,
    pin_memory=True
)

model = LSTMRegressor(input_size,output_size,hidden_size,batch_first)

trainer = Trainer(model,epochs,lr)
trainer.train(train_dataloader, val_dataloader)

# val_datasetを用いて予測
pred = trainer.predict(val_dataset)

# LSTMの都合上初めのsequence_length-1の長さ分はNoneで埋める
pred = [None]*(sequence_length-1) + pred

# 可視化
plot_result(target=val["y"], pred=pred, title="LSTMモデルの予測結果", ylabel="diff_log_close")
plot_resid(val["y"][sequence_length-1:], pred[sequence_length-1:], title="LSTMモデルの誤差", ylabel="diff_log_close")
        



0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0