内容
外国為替市場(FOREX)のチャート予想に取り組んでいます。今回は画像生成系のタスクを使って、過去の為替チャートから未来の予想チャートを描くことをやってみました。最終的には流行であるPix2Pixを使って、何とかそれらしいチャートを描くところまで来ました。
コンセプト
これまではメタトレードのインジケータを使ってトレードしましたが、やはりこれは難しいということで断念しました。自身のブログではチャートを山に見立てて、地形図を為替相場予想に使ってみようと思ったのが始まりです。詳細は下記の記事をお読みください。 http://1969681.blog66.fc2.com/blog-entry-637.html
手法
アルゴリズムは完全にGITHUBのリポジトリを使っています。
https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix
やっていることは単に教師データの作成だけです(ただし、こういうコンセプトは、やはりオリジナルではないかと思います)
以下にポイントを書きます。
- チャートはpandas_datareaderを使用 https://pandas-datareader.readthedocs.io/en/latest/remote_data.html
- データサーバーはAlpha Vantage https://pandas-datareader.readthedocs.io/en/latest/remote_data.html#remote-data-alphavantage
df = data.DataReader(symbol1,'av-daily',start,end, api_key=apikey)
- 時系列データを画像タスクに変換するために、ベクトルの外積を使って二次元マッピング
https://deepage.net/features/numpy-outer.html - データは96日分を一塊にして、A画像、B画像を生成。32日分の重複個所を作って、連続性を確保した。
- Pix2PixへのINPUTは、64x128 → 4倍へ拡張 → 256x512
- 画像はここに置く → pytorch-CycleGAN-and-pix2pix\datasets\facades\train
PIX2PIX
- PIX2PIXの実行コマンドは、サイトからの転機
- エポック数は、2つのパラメータで与える
- GPUは二枚を使用
- 反転はしないに設定すること
- バッチサイズは32
python train.py --dataroot ./datasets/facades --name facades_pix2pix --model pix2pix --direction AtoB --batch_size 32 --gpu_ids 0,1 --no_flip
ちなみに、引数を間違えても、下記のようなメッセージで教えてくれる。
usage: train.py [-h] --dataroot DATAROOT [--name NAME] [--use_wandb] [--gpu_ids GPU_IDS] [--checkpoints_dir CHECKPOINTS_DIR]
[--model MODEL] [--input_nc INPUT_NC] [--output_nc OUTPUT_NC] [--ngf NGF] [--ndf NDF] [--netD NETD] [--netG NETG]
[--n_layers_D N_LAYERS_D] [--norm NORM] [--init_type INIT_TYPE] [--init_gain INIT_GAIN] [--no_dropout]
[--dataset_mode DATASET_MODE] [--direction DIRECTION] [--serial_batches] [--num_threads NUM_THREADS]
[--batch_size BATCH_SIZE] [--load_size LOAD_SIZE] [--crop_size CROP_SIZE] [--max_dataset_size MAX_DATASET_SIZE]
[--preprocess PREPROCESS] [--no_flip] [--display_winsize DISPLAY_WINSIZE] [--epoch EPOCH] [--load_iter LOAD_ITER]
[--verbose] [--suffix SUFFIX] [--display_freq DISPLAY_FREQ] [--display_ncols DISPLAY_NCOLS] [--display_id DISPLAY_ID]
[--display_server DISPLAY_SERVER] [--display_env DISPLAY_ENV] [--display_port DISPLAY_PORT]
[--update_html_freq UPDATE_HTML_FREQ] [--print_freq PRINT_FREQ] [--no_html] [--save_latest_freq SAVE_LATEST_FREQ]
[--save_epoch_freq SAVE_EPOCH_FREQ] [--save_by_iter] [--continue_train] [--epoch_count EPOCH_COUNT] [--phase PHASE]
[--n_epochs N_EPOCHS] [--n_epochs_decay N_EPOCHS_DECAY] [--beta1 BETA1] [--lr LR] [--gan_mode GAN_MODE]
[--pool_size POOL_SIZE] [--lr_policy LR_POLICY] [--lr_decay_iters LR_DECAY_ITERS] [--lambda_L1 LAMBDA_L1]
うまくいけば、下記のメッセージで評価が進む。
----------------- Options ---------------
batch_size: 32 [default: 1]
beta1: 0.5
checkpoints_dir: ./checkpoints
continue_train: False
crop_size: 256
dataroot: ./datasets/facades [default: None]
dataset_mode: aligned
direction: AtoB
display_env: main
display_freq: 400
display_id: 0 [default: 1]
display_ncols: 4
display_port: 8097
display_server: http://localhost
display_winsize: 256
epoch: latest
epoch_count: 1
gan_mode: vanilla
gpu_ids: 0,1 [default: 0]
init_gain: 0.02
init_type: normal
input_nc: 3
isTrain: True [default: None]
lambda_L1: 100.0
load_iter: 0 [default: 0]
load_size: 286
lr: 0.0002
lr_decay_iters: 50
lr_policy: linear
max_dataset_size: inf
model: pix2pix [default: cycle_gan]
n_epochs: 400 [default: 100]
n_epochs_decay: 400 [default: 100]
n_layers_D: 3
name: facades_pix2pix [default: experiment_name]
ndf: 64
netD: basic
netG: unet_256
ngf: 64
no_dropout: False
no_flip: False
no_html: False
norm: batch
num_threads: 4
output_nc: 3
phase: train
pool_size: 0
preprocess: resize_and_crop
print_freq: 100
save_by_iter: False
save_epoch_freq: 5
save_latest_freq: 5000
serial_batches: False
suffix:
update_html_freq: 1000
use_wandb: False
verbose: False
----------------- End -------------------
dataset [AlignedDataset] was created
Trainデータの作成
解析期間 WSIZE 96日 --> a(n=32)+b(n=32)+c(n=32) TRAIN時に Aはa,b、Bはb,cを使う
テンソル 0:終値 1:高値-終値 2:終値-安値
正規化 MAX-MIN (A,B別々に正規化)
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np
import pdb
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
apikey = '取得したAPIキー'
symbol1 = 'EURUSD'
start = dt.date(2001,8,1)
end = dt.date(2022,2,20)
df1 = data.DataReader(symbol1,'av-daily',start,end, api_key=apikey)
wsize=96 #サンプル期間
df1.index = pd.DatetimeIndex(df1.index)
alldata =[]
for time in range(len(df1)-wsize):
try:
dfspan1 = df1[time:time+wsize]
alldata.append({"df":dfspan1,"date":dfspan1.tail(1)})
except:
pass
from PIL import Image
import matplotlib.pyplot as plt
def min_max(x, axis=None):
min = x.min(axis=axis, keepdims=True)
max = x.max(axis=axis, keepdims=True)
result = (x-min)/(max-min)
return result
def imagemake(dfspan1):
a = np.array([x for x in dfspan1['df']['close']])
b = np.array([x for x in dfspan1['df']['high']])
c = np.array([x for x in dfspan1['df']['low']])
m = np.outer(a,a).astype(np.float32)
n = np.outer(b-a,b-a).astype(np.float32)
o = np.outer(a-c,a-c).astype(np.float32)
m1 = min_max(m[0:64,0:64])
m2 = min_max(m[32:96,32:96])
n1 = min_max(n[0:64,0:64])
n2 = min_max(n[32:96,32:96])
o1 = min_max(o[0:64,0:64])
o2 = min_max(o[32:96,32:96])
te1 = np.stack([m1,n1,o1])
te2 = np.stack([m2,n2,o2])
te3=np.concatenate([te1,te2], 2)
te4=np.kron(te3, np.ones((4,4)))
tmp = torch.from_numpy(te4).clone()
return transforms.ToPILImage(mode='RGB')(tmp)
for item in alldata:
img=imagemake(item)
fn = item['date'].index.astype(str).values.tolist()
fname = "datasets/facades/train/" + fn[0] + ".png"
img.save(fname)
Pix2Pix予測¶
テストフォルダのデータで予想
python test.py --dataroot ./datasets/facades --name facades_pix2pix --model pix2pix --direction AtoB
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pdb
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
path = os.getcwd()
new_dir_path = "results/facades_pix2pix/test_latest/images"
img=[]
image={}
for imageName in os.listdir(new_dir_path):
inputPath = os.path.join(path, new_dir_path,imageName)
if "fake_B" in imageName : image['fakeB']=inputPath
if "real_A" in imageName: image['realA']=inputPath
if "real_B" in imageName: image['realB']=inputPath
if len(image)==3:
img.append(image)
image={}
import re
import datetime
def getprice(img,transform):
match = re.search(r'\d{4}-\d{2}-\d{2}', img['fakeB'])
date = datetime.datetime.strptime(match.group(), '%Y-%m-%d').date()
fake = transform(Image.open(img['fakeB']))
real = transform(Image.open(img['realB']))
fake=fake.numpy()
real=real.numpy()
real1 = min_max(np.diag(real[0,:,:]))
plt.plot(real1)
fake1 = min_max(np.sqrt(np.diag(fake[0,:,:])))
fake2 = min_max(fake[0,0,:])
fake3 = min_max(fake[0,:,0])
plt.plot((fake1+fake2+fake3)/3)
plt.title(date)
plt.show()
transform = transforms.PILToTensor()
for item in img:
price = getprice(item,transform)
未来予測
教師データの作成
変更点は以下のみ
te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
OANDAの日足データを使用するように変更した
EURUSD.oj5k1440.csv
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np
import pdb
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
#
apikey = '取得したAPIキー'
symbol1 = 'EURUSD'
start = dt.date(2021,1,1)
end = dt.date(2021,1,2)
df1 = data.DataReader(symbol1,'av-daily',start,end, api_key=apikey)
df = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k1440.csv', index_col=0)
df=df.drop(df.columns[0], axis=1)
df.columns=df1.columns
df_new=pd.concat([df1,df.tail(100)])
print(df_new)
wsize=96 #サンプル期間
df_new.index = pd.DatetimeIndex(df_new.index)
alldata =[]
for time in range(len(df_new)-wsize+1):
try:
dfspan1 = df_new[time:time+wsize]
alldata.append({"df":dfspan1,"date":dfspan1.tail(1)})
except:
pass
from PIL import Image
import matplotlib.pyplot as plt
def min_max(x, axis=None):
min = x.min(axis=axis, keepdims=True)
max = x.max(axis=axis, keepdims=True)
result = (x-min)/(max-min)
return result
def imagemake(dfspan1):
a = np.array([x for x in dfspan1['df']['close']])
b = np.array([x for x in dfspan1['df']['high']])
c = np.array([x for x in dfspan1['df']['low']])
m = np.outer(a,a).astype(np.float32)
n = np.outer(b-a,b-a).astype(np.float32)
o = np.outer(a-c,a-c).astype(np.float32)
m1 = min_max(m[0:64,0:64])
m2 = min_max(m[32:96,32:96])
n1 = min_max(n[0:64,0:64])
n2 = min_max(n[32:96,32:96])
o1 = min_max(o[0:64,0:64])
o2 = min_max(o[32:96,32:96])
#te1 = np.stack([m1,n1,o1])
te2 = np.stack([m2,n2,o2])
te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
te4=np.kron(te3, np.ones((4,4)))
tmp = torch.from_numpy(te4).clone()
return transforms.ToPILImage(mode='RGB')(tmp)
for item in alldata:
img=imagemake(item)
fn = item['date'].index.astype(str).values.tolist()
fname = "datasets/facades/test/" + fn[0] + "sk.png"
img.save(fname)
後処理
テストを実施 → 予想チャートを描く
python test.py --dataroot ./datasets/facades --name facades_pix2pix --model pix2pix --direction AtoB
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pdb
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
path = os.getcwd()
new_dir_path = "results/facades_pix2pix/test_latest/images"
img=[]
image={}
for imageName in os.listdir(new_dir_path):
inputPath = os.path.join(path, new_dir_path,imageName)
if "fake_B" in imageName : image['fakeB']=inputPath
if "real_A" in imageName: image['realA']=inputPath
if "real_B" in imageName: image['realB']=inputPath
if len(image)==3:
img.append(image)
image={}
import re
import datetime
import pdb
def getprice(img,transform):
match = re.search(r'\d{4}-\d{2}-\d{2}', img['fakeB'])
date = datetime.datetime.strptime(match.group(), '%Y-%m-%d').date()
fake = transform(Image.open(img['fakeB']))
real = transform(Image.open(img['realA']))
fake=fake.numpy()
real=real.numpy()
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.title(date)
real1 = np.sqrt(np.diag(real[0,:,:]))#対角成分
real1_avg = np.mean(real1[128:256])
print(real1_avg)
real2= real1-real1_avg
plt.plot(real2,label="real")
fake1 = np.sqrt(np.diag(fake[0,:,:]))#対角成分
fake1_avg = np.mean(fake1[0:128])
print(fake1_avg)
fake2= fake1-fake1_avg
fake3=np.append(real2[0:128],fake2)
plt.plot(fake3,label="fake")
plt.legend()
plt.tight_layout()
plt.show()
transform = transforms.PILToTensor()
for item in img:
price = getprice(item,transform)