93
86

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AI(Transformer)を用いた株価予測

Last updated at Posted at 2023-03-06

概要

本記事ではTransformerというモデルを用いて、株価を予測するモデルを構築したのでまとめています。構築したモデルのインプットとアウトプットは以下の通りです。

  • インプット:直近60営業日分の株価
  • アウトプット:5営業日後の株価

アプリケーション化したものに関する記事は以下を参照ください。
https://qiita.com/YusukeOhnishi/items/9778ecd356f76fbee530

また実装したコードについてはGitHub(https://github.com/YusukeOhnishi/Transformer_for_StockPrediction)にアップロードしておりますので、実際に動かしてみたい場合にはクローンしてご使用ください。Google Colaboratoryにて動作することを確認していますので、Google ColaboratoryにてランタイムのタイプをGPUに変更して実行していただけるとよいかと思います。

今回作成したモデルでは1銘柄に対して学習・推論を行っていますが、もっと汎用的なモデルを作成したい場合には、銘柄情報等をインプットにすることなどが考えられます。

また本記事の内容はあくまでも技術的に推論モデルを作ることが可能だという内容になるので、実際に株式購入を検討されている方は、ご自身の判断で投資は行ってください。

Transformerとは

Transformerは2017年に発表されたモデルで、主に自然言語処理で用いられているモデルです。自然言語処理の問題では単語の出現順序が重要な要素であるため、このようなデータの順序をモデルに組み込んでいます。また自然言語処理のみではなく、時系列データなどの予測などに利用することが可能ですので、本記事で取り扱う株価の推移などに適用することが可能です。

このような時系列データ及び自然言語を取り扱うモデルは他にもあり、例えばRNN、LSTMなどの再帰的に層を繋げるモデルなどが以前は主流でした。

これらのモデルは大きくエンコーダー、デコーダーという2つの部品からできており、エンコーダーに入力となる時系列データを渡し、デコーダーから対応する時系列データを出力するという流れになります。例えば言語翻訳などで考えると、日本語の文章をエンコーダーに渡すと対応する英語の文章をデコーダーが出力するといったものです。

これまでの研究で、このエンコーダーとデコーダーの間に、データのどの部分に着目するのかということを学習させる機構であるAttentionを入れることで精度が大きく向上することが知られています。

Transformerは、このAttentionという機構のみでエンコーダーとデコーダー間を繋いだモデルです。このような実装を行うことで、より高速で高精度なモデルが実現されています。原論文に記載されているモデルの構成は以下の通りです。

TransformerModel.png

Transformerモデルの詳細については別記事を参考にしてください。

データ

株価のデータはPythonのpandas_datareaderというライブラリを用いて、yahoo finance(https://finance.yahoo.com/)から取得します。取得したデータはいくつかの要素を含みますが、今回は「Adj Close」の値のみを用いて実装を行います。これは配当を加味した終値になります。

また実装では例として、ソフトバンクグループの2003年1月1日から2022年12月31日までの計20年分の株価(https://finance.yahoo.com/quote/9984.T/history)を用います。

実装

データ取得と整形

上の通りYahoo financeからデータを取得します。データ取得は下記のように実装します。

from pandas_datareader import data as wb
import yfinance as yfin
yfin.pdr_override()

## Security code
stock_code='9984.T'
## Start Date
start_date='2003-1-1'
## End Date
end_date='2022-12-31'

# Get data
df=wb.DataReader(stock_code,start=start_date,end=end_date)
df
	        Open	    High	    Low	        Close	    Adj Close   Volume
Date						
2003-01-01	225.833328	225.833328	225.833328	225.833328	203.087509	0
2003-01-02	225.833328	225.833328	225.833328	225.833328	203.087509	0
2003-01-03	225.833328	225.833328	225.833328	225.833328	203.087509	0
2003-01-06	231.666672	233.500000	229.166672	230.666672	207.434036	25549200
2003-01-07	234.833328	235.333328	224.666672	225.833328	203.087509	33570000
...	        ...	        ...	        ...	        ...	        ...	        ...
2022-12-26	5772.000000	5824.000000	5728.000000	5778.000000	5778.000000	5392700
2022-12-27	5797.000000	5870.000000	5785.000000	5796.000000	5796.000000	6626000
2022-12-28	5770.000000	5785.000000	5680.000000	5711.000000	5711.000000	7324500
2022-12-29	5650.000000	5656.000000	5576.000000	5618.000000	5618.000000	8281700
2022-12-30	5681.000000	5735.000000	5624.000000	5644.000000	5644.000000	9104100

取得したデータは上記のように各日付に対して、Open(始値)、High(高値)、Low(安値)、Close(終値)、Adj Close(配当調整済み終値)、Volume(取引高)が入力されたものです。
今回着目したいのはAdj Closeであるため、この値をプロットすると以下のようになります。

# Plot 'Adj Close' values
plt.plot(df.iloc[:,-2])
plt.xlabel('date')
plt.ylabel('Adj Close')
plt.show()

AdjClose.png

このようなデータから今回用いるモデルに適用できる時系列データ列を作成します。
学習用データは以下のように実装します。

import math
import numpy as np
import tqdm
import matplotlib.pyplot as plt
import torch
import torch.nn as nn

## Split ratio of train data and validation data
train_rate=0.7
## How many business days to see
observation_period_num=60
## How many business days to predict
predict_period_num=5

# Normalization
mean_list=df.mean().values
std_list=df.std().values
df=(df-mean_list)/std_list

# Array initialization
inout_data=[]

# Put data in array
for i in range(len(df)-observation_period_num-predict_period_num):
  data=df.iloc[i:i+observation_period_num,4].values
  label=df.iloc[i+predict_period_num:i+observation_period_num+predict_period_num,4].values
  inout_data.append((data,label))
inout_data=torch.FloatTensor(inout_data)

train_data=inout_data[:int(np.shape(inout_data)[0]*train_rate)].to(device)
valid_data=inout_data[int(np.shape(inout_data)[0]*train_rate):].to(device)

print('train data:',np.shape(train_data)[0])
print('valid data:',np.shape(valid_data)[0])
  1. データを正規化する。
  2. データの0番目からobservation_period_num(60)営業日分のデータを取得し、これを学習用Input dataとする。
  3. 2に対してpredict_period_num(5)営業日ずらしたpredict_period_num番目からobservation_period_num(60)営業日分のデータを取得し、これを学習用Label dataとする。
  4. 開始位置を1営業日ずらして2、3を繰り返す。

fig1.png

これを繰り返すことで合計で[営業日数]-[observation_period_num]-[predict_period_num]個のInput DataとLabel dataが生成されますが、このうち最初のtrain_rate(0.7)の割合分を訓練データ、残りを検証用データとしています。

モデル

続いてTransformerのモデル部分に関して説明します。モデル部分の実装は2つのパーツで構成しています。

1つ目はPositional Encodingです。これはモデルでいう入力部分に相当するもので、この機構を取り入れることで、時系性を持ったデータに対して、各データが何番目のデータなのかということを考慮することができます。今回の場合だと60営業日分のデータを1つの時系列データとしているので、60営業日の何日目のデータなのかということを取り入れていることになります。

詳細については以下の記事を参考にするとわかりやすいかと思います。
https://qiita.com/masaki_kitayama/items/01a214c07b2efa8aed1b

実装は以下の通りです。

# Functions for positional encoding
class PositionalEncoding(nn.Module):
  def __init__(self,d_model,max_len=5000):
    super().__init__()
    self.dropout=nn.Dropout(p=0.1)
    pe=torch.zeros(max_len, d_model)
    position=torch.arange(0, max_len,dtype=torch.float).unsqueeze(1)
    div_term=torch.exp(torch.arange(0,d_model, 2).float()*(-math.log(10000.0)/d_model))
    pe[:,0::2]=torch.sin(position*div_term)
    pe[:,1::2]=torch.cos(position*div_term)
    pe=pe.unsqueeze(0).transpose(0,1)
    self.register_buffer("pe",pe)
  
  def forward(self,x):
    return self.dropout(x+self.pe[:np.shape(x)[0],:])

続いて、2つ目はTransformerモデルのメイン部分ですが、こちらはpytorchにTransformerモデルを使うことができるようになっているため、これを使用します。

https://pytorch.org/docs/stable/generated/torch.nn.TransformerEncoderLayer.html
https://pytorch.org/docs/stable/generated/torch.nn.TransformerEncoder.html

モデルとしてはEncoderにデータを渡し、出力結果をデコーダに渡すという、標準的な実装方法としています。実装は以下の通りです。

# Transformer model definition
class TransformerModel(nn.Module):
  def __init__(self,feature_size=250,num_layers=1,dropout=0.1):
    super().__init__()
    self.model_type='Transformer'
    self.src_mask=None
    self.device=torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.pos_encoder=PositionalEncoding(d_model=feature_size)
    self.encoder_layer=nn.TransformerEncoderLayer(d_model=feature_size,nhead=10,dropout=dropout)
    self.transformer_encoder=nn.TransformerEncoder(self.encoder_layer,num_layers=num_layers)
    self.decoder=nn.Linear(feature_size,1)
  
  def init_weights(self):
    self.decoder.bias.data.zero_()
    self.decoder.weight.data.uniform(-0.1,0.1)

  def _generate_square_subsequent_mask(self,sz):
    mask=(torch.triu(torch.ones(sz,sz))==1).transpose(0,1)
    mask=mask.float().masked_fill(mask==0,float('-inf')).masked_fill(mask==1,float(0.0))
    return mask

  def forward(self,src):
    if self.src_mask is None or self.src_mask.size(0)!=len(src):
      device=self.device
      mask=self._generate_square_subsequent_mask(len(src)).to(device)
      self.src_mask=mask
    src=self.pos_encoder(src)
    output=self.transformer_encoder(src,self.src_mask)
    output=self.decoder(output)
    return output

学習

学習を行うに際して、2つの関数を定義しておきます。

  • ミニバッチに分割するための関数
  • 学習時に一定回数(patience)の間lossの減少が止まった場合に学習を早期終了させるための関数

それぞれの実装は以下の通りです。

# Define a function for getting mini-batch
def get_batch(source, i, batch_size):
  seq_len=min(batch_size, len(source)-1-i)
  data=source[i:i+seq_len]
  input=torch.stack(torch.stack([item[0] for item in data]).chunk(observation_period_num,1))
  target=torch.stack(torch.stack([item[1] for item in data]).chunk(observation_period_num,1))

  return input, target
# Function for early stop of train if valid loss is not decreasing
class EarlyStopping:
    def __init__(self,patience=5):
        self.patience=patience
        self.counter=0
        self.best_score=None
        self.early_stop=False
        self.val_loss_min=np.Inf
        
    def __call__(self,val_loss,model):
        score=(-val_loss)
        if self.best_score is None:
            self.best_score=score
        elif score<self.best_score:
            self.counter+=1
            if self.counter>=self.patience:
                self.early_stop=True
        else:
            self.best_score=score
            self.counter=0

上記で定義した関数やモデルを用いて学習を行います。学習については一般的なものと同じなので簡単に説明します。

  • 損失関数は平均二乗誤差関数
  • 最適化手法はAdam
  • 1エポック毎に検証用データに対する損失関数を計算

実装は以下のようになります。

# Parameter for mdoel
## Learning Rate
lr=0.00005
## Epoch Number
epochs=1000
## Mini-Batch size
batch_size=64
## How many epochs to stop train if valid loss is not decreasing
patience=20

model=TransformerModel().to(device)
criterion=nn.MSELoss()

optimizer=torch.optim.AdamW(model.parameters(),lr=lr)
scheduler=torch.optim.lr_scheduler.StepLR(optimizer,1.0,gamma=0.95)
earlystopping=EarlyStopping(patience)

train_loss_list=[]
valid_loss_list=[]

for epoch in range(1,epochs+1):
  # train
  model.train()
  total_loss_train=0.0
  for batch, i in enumerate(range(0,len(train_data),batch_size)):
    data,targets=get_batch(train_data,i,batch_size)
    optimizer.zero_grad()
    output=model(data)
    loss=criterion(output,targets)
    loss.backward()
    optimizer.step()
    total_loss_train+=loss.item()
  scheduler.step()
  total_loss_train=total_loss_train/len(train_data)

  #valid
  model.eval()
  total_loss_valid=0.0
  for i in range(0,len(valid_data),batch_size):
    data,targets=get_batch(valid_data,i,batch_size)
    output=model(data)
    total_loss_valid+=len(data[0])*criterion(output, targets).cpu().item()
  total_loss_valid=total_loss_valid/len(valid_data)

  #etc
  train_loss_list.append(total_loss_train)
  valid_loss_list.append(total_loss_valid)
  if epoch%10==0:
    print(f'{epoch:3d}:epoch | {total_loss_train:5.7} : train loss | {total_loss_valid:5.7} : valid loss')
  earlystopping((total_loss_valid),model)
  if earlystopping.early_stop:
    print(f'{epoch:3d}:epoch | {total_loss_train:5.7} : train loss | {total_loss_valid:5.7} : valid loss')
    print("Early Stop")
    break

plt.xlabel('epoch')
plt.ylabel('train_loss')
plt.plot(train_loss_list)
plt.show()

plt.xlabel('epoch')
plt.ylabel('valid_loss')
plt.plot(valid_loss_list)
plt.show()

上記を実行した場合の訓練データと検証データそれぞれに対するlossの推移は以下のようになり、lossはエポックが進むたびに減少しており、学習が問題なく進んでいることがわかります。

loss.png

推論

作成したモデルを用いて実際のデータとモデルの予測した値を比較してみます。
出力結果の最終要素、つまり予測された5営業日後の株価と実際の株価をプロットすると以下のようになります。

model.eval()
result=torch.Tensor(0)
actual=torch.Tensor(0)

with torch.no_grad():
  for i in range(0,len(valid_data)-1):
    data,target=get_batch(valid_data,i,1)
    output=model(data)
    result=torch.cat((result, output[-1].view(-1).cpu()),0)
    actual=torch.cat((actual,target[-1].view(-1).cpu()),0)

plt.plot(actual,color='red',alpha=0.7)
plt.plot(result,color='black',linewidth=1.0)
plt.show()

predict.png

この結果から推測された株価では、急な株価変動が起こった場合に大きなずれが生じていると分かります。
このような急激な価格変動は株式会社の決算情報や外部の要素等によって引き起こされている可能性が高いです。今回作成したモデルではインプットとして、株価のみを与えているため、株価の動きの特徴を見ることで次の値を予測するため、トレード手法でいうテクニカル分析のみに対応しています。そのため、このような大きな変動は今回作成したモデルでは予測不可というのが正しい挙動です。

一方で今回作成したモデルはGoogle Colaboratory(GPU使用)環境で実行した場合、学習の時間を含め5分以下、とかなり高速です。その点を踏まえるとテクニカル分析としては十分に予測できていると思います。

最後に、上述の急激な価格変動を予測できるようにする改善方法をいくつか挙げておきます。その他改善方法等あればコメントいただけると幸いです。

  • 決算情報を入力データに追加する。
  • 自然言語処理を用いて、ニュースなどの情報を取り入れる。(かなり計算量は増えますが)
93
86
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
93
86

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?