LSTMなどを使って相場の予測など作ってみたのですが、なかなか精度がでず困っていたところ以下の記事を見つけたので、同じようにしてFXの予測をしてみました。
結果として、全然精度が出なかったのですが、日経平均の予想で結構な精度が出るのに為替では出ないというのも腑に落ちないので何か根本的なミスをしている気がします。
(わかりましたら、修正します。)
当日終値までのデータを利用して、翌日の終値の上下を予想していたのですが、どうも前日終値を使って翌日以降の終値を予想するのが正しいようです。
修正したところValデータでも85%程度の精度が出ました。
他の通貨のデータ等を利用して学習データを増やすとより汎化性能を上げられそうです。
基本的に日経平均と為替相場はかなり相関しているはずですね。
為替データの取得
DataReaderを使ってYahooより為替データをDataFrameに入れます。
↓EURJPYの取得
data = pdr.DataReader('EURJPY=X','yahoo',end='2021-03-09',start='2001-01-03')
data.head()
目的関数の取得
今回は、終日終値が目標とする日の終日終値を上回っていれば、「Up」のフラグを立てることとする。
data['Up'] = 0
# future_num日後の終値の上下を予測
future_num = 1
for i in range(len(data)-1):
if data['Close'][i] < data['Close'][i+future_num]:
data['Up'][i] = 1
保存するファイル名の準備
保存するファイルを格納するフォルダを作成し、パス及びフォルダを設定(対象日をファイル名として保存)
data['ImageName'] = ['./DataFolder/' + str(data.index[i].year)+ '-' + str(data.index[i].month) + '-' + str(data.index[i].day) + '.png' for i in range(len(data))]
チャート画像ファイルの出力
mplfinanceを利用して、ファイルを出力します。
参考サイトは、mpl_financeを利用していましたが、mpl_financeは今後終了するため、mplfinanceを利用した方が良いそうです。
設定値などの元論文は、以下
https://arxiv.org/pdf/1903.12258.pdf
↓画像データのファイル名の位置を修正しました。
img_create = 1
if img_create == 1:
seq_len = 20
data.fillna(0)
#data.reset_index(inplace=True)
data['Date2'] = data.index
data['Date2'] = data['Date2'].map(mdates.date2num)
for i in range(0, len(data)-1):
c = data.iloc[i:i + int(seq_len) , :]
if len(c) == int(seq_len):
# Date,Open,High,Low,Adj Close,Volume
target = ['Date2','Open','High','Low','Close']
pngfile = data['ImageName'][i + int(seq_len)-2]
my_dpi = 96
# Create my own `marketcolors` to use with the `nightclouds` style:
mc = mpf.make_marketcolors(up='#77d879',down='#db3f3f',inherit=True)
# Create a new style based on `nightclouds` but with my own `marketcolors`:
s = mpf.make_mpf_style(base_mpf_style='nightclouds',marketcolors=mc,gridstyle = '')
fig = mpf.figure(style=s,figsize=(48 / my_dpi, 48 / my_dpi), dpi=my_dpi)
ax1 = fig.add_subplot(1,1,1)
ax1.grid(False)
ax1.set_xticklabels([])
ax1.set_yticklabels([])
ax1.xaxis.set_visible(False)
ax1.yaxis.set_visible(False)
ax1.axis('off')
mpf.plot(c[target], ax=ax1,volume=False,type='candle',update_width_config=dict(candle_linewidth=1))
fig.savefig(pngfile, pad_inches=0, transparent=False, bbox_inches="tight")
plt.close(fig)
print("Converting olhc to candlestik finished.")
データセット準備
PyTorchのデータセット準備
x_train, x_val, y_train, y_val = model_selection.train_test_split(data['ImageName'].values, data['Up'].values, test_size=0.1,random_state=1)
データ読み込み
X = []
Y = []
X_val = []
Y_val = []
transform = transforms.Compose([transforms.ToTensor()])
# for image_file,label in zip(x_train,y_train):
for image_file,label in zip(x_train,y_train):
try:
image = Image.open(image_file)
image = image.convert('RGB')
image = transform(image)
#image = np.asarray(image)
X.append(image)
Y.append(label)
except FileNotFoundError as e:
print(e)
for image_file,label in zip(x_val,y_val):
try:
image = Image.open(image_file)
image = image.convert('RGB')
image = transform(image)
X_val.append(image)
Y_val.append(label)
except FileNotFoundError as e:
print(e)
class EUR_Dataset(Dataset):
def __init__(self, image, label_list, phase=None):
self.transform = transforms.Compose([transforms.ToTensor()])
self.image = image
self.label_list = torch.tensor(label_list,dtype=torch.long)
self.phase = phase
def __getitem__(self,index):
# index番目の画像を読み込み、前処理を行う
image = self.image[index]
# index番目のラベルを取得する
label = self.label_list[index]
return image, label
def __len__(self):
return len(self.image)
train_dataset = EUR_Dataset(X,Y,phase='train')
total_samples = len(train_dataset)
print(total_samples)
val_dataset = EUR_Dataset(X_val,Y_val,phase='val')
val_samples = len(val_dataset)
print(val_samples)
モデルの読み込み(resnet18を利用)
model = models.resnet18(pretrained=False)
num_ftrs = model.fc.in_features
model.fc = nn.Sequential(
nn.Linear(num_ftrs ,2))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
criterion = nn.CrossEntropyLoss()
# Observe that all parameters are being optimized
optimizer_ft = optim.AdamW(model.parameters())
batchsize = 16
loss_list = []
val_loss_list = []
# DataLoaderのインスタンス作成
train_dataloader = DataLoader(dataset=train_dataset, batch_size=batchsize, shuffle=True)
val_dataloader = DataLoader(dataset=val_dataset, batch_size=batchsize, shuffle=True)
# データローダーを辞書として格納する
dataloaders_dict = {
'train' : train_dataloader,
'val' : val_dataloader
}
学習の実施
Train_loss = []
Val_loss = []
Train_acc = []
Val_acc = []
def train_model(model, criterion, optimizer, num_epochs=25):
since = time.time()
best_model_wts = copy.deepcopy(model.state_dict())
#best_acc = 0.0
best_loss = 1000.0
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# Each epoch has a training and validation phase
for phase in ['train', 'val']:
running_loss = 0.0
running_corrects = 0
# 学習モード、検証モードの切替
if phase == 'train':
model.train()
else:
model.eval()
#for i,data in enumerate(trainloader):
i = 0
data_num = 0
for inputs, labels in dataloaders_dict[phase]:
i += 1
#for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward
# track history if only in train
with torch.set_grad_enabled(phase=='train'):
outputs = model(inputs)
preds = torch.argmax(outputs, axis=1)
#print(preds)
loss = criterion(outputs, labels)
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
data_num += inputs.size(0)
epoch_loss = running_loss/data_num
epoch_acc = running_corrects.double()/data_num
print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))
if phase == 'train':
Train_loss.append(epoch_loss)
Train_acc.append(epoch_acc)
else:
Val_loss.append(epoch_loss)
Val_acc.append(epoch_acc)
# deep copy the model
if phase == 'val' and epoch_loss < best_loss:
best_loss = epoch_loss
best_model_wts = copy.deepcopy(model.state_dict())
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Loss: {:4f}'.format(best_loss))
# load best model weights
model.load_state_dict(best_model_wts)
return model
model = train_model(model,criterion,optimizer_ft,num_epochs=20)
学習結果
~~
学習の結果、Valデータがほとんど学習できてないので使い物にならないですね。
(ランダムValデータを取っており、Valデータの前後のデータはTrainデータに含まれています。そういう意味でももっと精度が出なければいけません。)
ユーロの他に豪ドルも試してみましたが、同じような結果です。
為替データと株価データのチャートの特徴にそんなに違いはないと思うので、もっと精度は出るはずだと思うので、何かミスをしているように思います。~~
汎化性能はまだ改善の余地がありそうですが、Valデータで80%近くの数値が出ました。
(学習データは以下の比率なので、まずまず予測はできている数値と考えて良さそうです。
up: 2410
down: 2273
)
今後、他の通貨データも利用して汎化性能を高めて、実際の取引にも利用してみようかと思います。
また、株価データにも適用して考察してみたいと思います。
精度が出るように修正できたらアップしていきたいと思います。
(他のデータでも試してみようかと思います。)