続報である
状況
- 1d足トレードではなく、より短期足(15m,30m,60m)との組み合わせで、3チャンネルを組み合わせた。
- バックテスト結果から、勝率60%を得た。
- Trades 170
- Win Rate [%] 60.0
苦労した部分
- PIX2PIXでの学習データにおいて、過去値と予測値の時系列スライド(重複部分)は、データ中の3/4とした。
- データサイズは 過去16+48 未来48+16 重複48 → 64x4倍して256サイズに変換
- バックテストでは、バイナリオプションを想定したアルゴリズムを実装した。
- バックテストシグナルは、すべて予測チャート上での変化率で検出した。
実装
Trainデータの作成
main.py
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
df1 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k15.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df1.index = pd.to_datetime(df1['date']+" "+df1['time'])
df2 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k30.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df2.index = pd.to_datetime(df2['date']+" "+df2['time'])
df3 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k60.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df3.index = pd.to_datetime(df3['date']+" "+df3['time'])
wsize=80 #サンプル期間
alldata1 =[]
alldata2=[]
alldata3=[]
for time in range(len(df1)-wsize):
try:
dfspan = df1[time:time+wsize]
alldata1.append({"df":dfspan,"date":dfspan.tail(1)})
except:
pass
for time in range(len(df2)-wsize):
try:
dfspan = df2[time:time+wsize]
alldata2.append({"df":dfspan,"date":dfspan.tail(1)})
except:
pass
for time in range(len(df3)-wsize):
try:
dfspan = df3[time:time+wsize]
alldata3.append({"df":dfspan,"date":dfspan.tail(1)})
except:
pass
# In[2]:
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import pdb
def min_max(x, axis=None):
min = x.min(axis=axis, keepdims=True)
max = x.max(axis=axis, keepdims=True)
result = (x-min)/(max-min)
return result
def imagemake(dfspan1,dfspan2,dfspan3):
a = min_max(np.array([x for x in dfspan1['df']['close']]))
d = min_max(np.array([x for x in dfspan2['df']['close']]))
g = min_max(np.array([x for x in dfspan3['df']['close']]))
m = np.outer(a,a).astype(np.float32)
n = np.outer(d,d).astype(np.float32)
o = np.outer(g,g).astype(np.float32)
m1 = min_max(m[0:64,0:64])
m2 = min_max(m[16:80,16:80])
n1 = min_max(n[0:64,0:64])
n2 = min_max(n[16:80,16:80])
o1 = min_max(o[0:64,0:64])
o2 = min_max(o[16:80,16:80])
te1 = np.stack([m1,n1,o1])
te2 = np.stack([m2,n2,o2])
te3=np.concatenate([te1,te2], 2)
#te1 = np.stack([m1,n1,o1])
#te2 = np.stack([m2,n2,o2])
#te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
te4=np.kron(te3, np.ones((4,4)))
tmp = torch.from_numpy(te4).clone()
return transforms.ToPILImage(mode='RGB')(tmp)
# In[3]:
alldata3s = alldata3[:-1]
alldata2s = alldata2[:-1]
i1pick=False
i2pick=False
for i1,i1s in tqdm(zip(reversed(alldata3),reversed(alldata3s))):
if i1['date'].index >= alldata2[-1]['date'].index:
continue
for i2,i2s in zip(reversed(alldata2),reversed(alldata2s)):
if i1['date'].index >= i2['date'].index and i2['date'].index>i1s['date'].index:
i1pick=i1s
else:
i1pick=False
continue
if i2['date'].index >= alldata1[-1]['date'].index:
continue
for i3 in reversed(alldata1):
if i2['date'].index > i3['date'].index and i3['date'].index>=i2s['date'].index:
i2pick=i2s
else:
i2pick=False
continue
if (i1pick and i2pick):
img=imagemake(i3,i2pick,i1pick)
fn = i3['df'].tail(1).index.astype(str).values.tolist()
fn1 = i2pick['df'].tail(1).index.astype(str).values.tolist()
fn2 = i1pick['df'].tail(1).index.astype(str).values.tolist()
print(fn,fn1,fn2)
fname = "datasets/facades1/train/" + fn[0].replace(":","-") + ".png"
if len(fn[0])<16:
fname = fname.replace(".png"," 00-00-00.png")
img.save(fname)
train
python train.py --dataroot ./datasets/facades1 --name facades_pix2pix1 --model pix2pix --direction AtoB --batch_size 32 --gpu_ids 0,1 --no_flip
未来予測
future.py
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
df1 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k15.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df1.index = pd.to_datetime(df1['date']+" "+df1['time'])
df2 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k30.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df2.index = pd.to_datetime(df2['date']+" "+df2['time'])
df3 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k60.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df3.index = pd.to_datetime(df3['date']+" "+df3['time'])
wsize=96 #サンプル期間
alldata1 =[]
alldata2=[]
alldata3=[]
for time in range(len(df1)-wsize):
try:
dfspan = df1[time:time+wsize]
alldata1.append({"df":dfspan,"date":dfspan.tail(1)})
except:
pass
for time in range(len(df2)-wsize):
try:
dfspan = df2[time:time+wsize]
alldata2.append({"df":dfspan,"date":dfspan.tail(1)})
except:
pass
for time in range(len(df3)-wsize):
try:
dfspan = df3[time:time+wsize]
alldata3.append({"df":dfspan,"date":dfspan.tail(1)})
except:
pass
# In[33]:
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import pdb
def min_max(x, axis=None):
min = x.min(axis=axis, keepdims=True)
max = x.max(axis=axis, keepdims=True)
result = (x-min)/(max-min)
return result
def imagemake(dfspan1,dfspan2,dfspan3):
a = min_max(np.array([x for x in dfspan1['df']['close']]))
d = min_max(np.array([x for x in dfspan2['df']['close']]))
g = min_max(np.array([x for x in dfspan3['df']['close']]))
m = np.outer(a,a).astype(np.float32)
n = np.outer(d,d).astype(np.float32)
o = np.outer(g,g).astype(np.float32)
m1 = min_max(m[0:64,0:64])
m2 = min_max(m[16:80,16:80])
n1 = min_max(n[0:64,0:64])
n2 = min_max(n[16:80,16:80])
o1 = min_max(o[0:64,0:64])
o2 = min_max(o[16:80,16:80])
te1 = np.stack([m1,n1,o1])
te2 = np.stack([m2,n2,o2])
te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
te4=np.kron(te3, np.ones((4,4)))
tmp = torch.from_numpy(te4).clone()
return transforms.ToPILImage(mode='RGB')(tmp)
# In[34]:
COUNT=5000
alldata3s = alldata3[:-1]
alldata2s = alldata2[:-1]
i1pick=False
i2pick=False
countloop =0
for i1,i1s in tqdm(zip(reversed(alldata3),reversed(alldata3s))):
if i1['date'].index >= alldata2[-1]['date'].index:
continue
for i2,i2s in zip(reversed(alldata2),reversed(alldata2s)):
if i1['date'].index >= i2['date'].index and i2['date'].index>i1s['date'].index:
i1pick=i1s
else:
i1pick=False
continue
if i2['date'].index >= alldata1[-1]['date'].index:
continue
for i3 in reversed(alldata1):
if i2['date'].index > i3['date'].index and i3['date'].index>=i2s['date'].index:
i2pick=i2s
else:
i2pick=False
continue
if (i1pick and i2pick):
img=imagemake(i3,i2pick,i1pick)
fn = i3['df'].tail(1).index.astype(str).values.tolist()
fn1 = i2pick['df'].tail(1).index.astype(str).values.tolist()
fn2 = i1pick['df'].tail(1).index.astype(str).values.tolist()
print(fn,fn1,fn2)
fname = "datasets/facades1/test/" + fn[0].replace(":","_") + "sk.png"
if len(fn[0])<16:
fname = fname.replace("sk.png"," 00_00_00sk.png")
img.save(fname)
countloop=countloop+1
if countloop> COUNT:
break
else:
continue
break
else:
continue
break
Test
python test.py --dataroot ./datasets/facades1 --name facades_pix2pix1 --model pix2pix --direction AtoB
BackTest.py
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
import numpy as np
import pdb
import re
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
path = os.getcwd()
new_dir_path = "results/facades_pix2pix1/test_latest/images"
img=[]
image={}
for imageName in os.listdir(new_dir_path):
inputPath = os.path.join(path, new_dir_path,imageName)
if "fake_B" in imageName : image['fakeB']=inputPath
if "real_A" in imageName: image['realA']=inputPath
if "real_B" in imageName: image['realB']=inputPath
if len(image)==3:
ddd=re.findall(r"\d\d\d\d-\d\d-\d\d \d\d_\d\d_\d\d",inputPath)
try:
image['date']=ddd[0].replace("_",":")
img.append(image)
image={}
except:
pass
# In[36]:
import re
import datetime
def min_max(x, axis=None):
min = x.min(axis=axis, keepdims=True)
max = x.max(axis=axis, keepdims=True)
result = (x-min)/(max-min)
return result
def getprice(img,transform,info):
#match = re.search(r'\d{4}-\d{2}-\d{2}', img['fakeB'])
#date = datetime.datetime.strptime(match.group(), '%Y-%m-%d').date()
fake = transform(Image.open(img['fakeB']))
fake=fake.numpy()
fake1x = fake[info,:,0]
fake1y = fake[info,0,:]
fake2=(fake1x+fake1y)/2
return min_max(fake2),img['date']
# In[165]:
def GetSignal(item):
y1 = np.mean(item[128:192])
y2 = np.mean(item[192:256])
return y2/y1
# In[166]:
import matplotlib.animation as animation
get_ipython().run_line_magic('matplotlib', 'nbagg')
import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
transform = transforms.PILToTensor()
fig = plt.figure()
ims = []
# In[167]:
import matplotlib.pyplot as plt
import pandas as pd
get_ipython().run_line_magic('matplotlib', 'inline')
from tqdm import tqdm
realdf = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k15.csv', sep=",",names=('date', 'time', 'Open', 'High', 'Low', 'Close', 'Volume'))
realdf.index = pd.to_datetime(realdf['date']+" "+realdf['time'])
realdf['signal']=-1
for item in img:
v2,date = getprice(item,transform,0)
signal =GetSignal(v2)
realdf.at[date, 'signal'] = signal
BackTest
シグナルは過去の平均値と、未来の平均値を割り算して閾値判定した。
6倍ぐらい(逆数は0.18)の差がついたところを、シグナルに使うのがよさそう・・・
買い 6 < signal1 < 80
売り 0.0125 < signal1 <0.18
本当は正規分布で二山ぐらいに分離した状態をエントリー判定に使うのがベターなのであるが
BackTest.py
from backtesting import Backtest, Strategy
from backtesting.lib import SignalStrategy, TrailingStrategy
import backtesting
def SIGNAL():
return realdf.signal
class pix2pix(Strategy):
def init(self):
self.signal1=self.I(SIGNAL)
pass
def next(self): #チャートデータの行ごとに呼び出される
super().next()
current_time = self.data.index[-1]
if self.signal1>6 and self.signal1<80:
#self.position.close()
self.buy() # 買い
elif self.signal1>0.0125 and self.signal1<0.18:
#self.position.close()
self.sell()# 売り
# Additionally, set aggressive stop-loss on trades that have been open
# for more than two days
for trade in self.trades:
if current_time - trade.entry_time > pd.Timedelta(30, "m"):
self.position.close()
bt = Backtest(
realdf, # チャートデータ
pix2pix, # 売買戦略
cash=100000, # 最初の所持金
commission=0.000, # 取引手数料
margin=0.5, # レバレッジ倍率の逆数(0.5で2倍レバレッジ)
trade_on_close=True, # True:現在の終値で取引,False:次の時間の始値で取引
exclusive_orders=True #自動でポジションをクローズ
)
output = bt.run() # バックテスト実行
print(output) # 実行結果(データ)
bt.plot() # 実行結果(グラフ)