LoginSignup
2
1

More than 1 year has passed since last update.

Pix2Pixを使ったEURUSDのチャート予想(2)

Last updated at Posted at 2022-03-14

続報である

状況

  • 1d足トレードではなく、より短期足(15m,30m,60m)との組み合わせで、3チャンネルを組み合わせた。
  • バックテスト結果から、勝率60%を得た。
  • Trades 170
  • Win Rate [%] 60.0

xxx.png

苦労した部分

  • PIX2PIXでの学習データにおいて、過去値と予測値の時系列スライド(重複部分)は、データ中の3/4とした。
  • データサイズは 過去16+48 未来48+16 重複48 → 64x4倍して256サイズに変換
  • バックテストでは、バイナリオプションを想定したアルゴリズムを実装した。
  • バックテストシグナルは、すべて予測チャート上での変化率で検出した。

実装

Trainデータの作成

main.py
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

df1 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k15.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df1.index = pd.to_datetime(df1['date']+" "+df1['time'])

df2 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k30.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df2.index = pd.to_datetime(df2['date']+" "+df2['time'])

df3 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k60.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df3.index = pd.to_datetime(df3['date']+" "+df3['time'])


wsize=80 #サンプル期間

alldata1 =[]
alldata2=[]
alldata3=[]


for time in range(len(df1)-wsize):
    try:
        dfspan = df1[time:time+wsize]
        alldata1.append({"df":dfspan,"date":dfspan.tail(1)})
    except:
        pass

for time in range(len(df2)-wsize):
    try:
        dfspan = df2[time:time+wsize]
        alldata2.append({"df":dfspan,"date":dfspan.tail(1)})
    except:
        pass

for time in range(len(df3)-wsize):
    try:
        dfspan = df3[time:time+wsize]
        alldata3.append({"df":dfspan,"date":dfspan.tail(1)})
    except:
        pass


# In[2]:


from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import pdb

def min_max(x, axis=None):
    min = x.min(axis=axis, keepdims=True)
    max = x.max(axis=axis, keepdims=True)
    result = (x-min)/(max-min)
    return result

def imagemake(dfspan1,dfspan2,dfspan3):
    a = min_max(np.array([x for x in dfspan1['df']['close']]))
    d = min_max(np.array([x for x in dfspan2['df']['close']]))
    g = min_max(np.array([x for x in dfspan3['df']['close']]))
    
    m = np.outer(a,a).astype(np.float32)
    n = np.outer(d,d).astype(np.float32)
    o = np.outer(g,g).astype(np.float32)
    
    m1 = min_max(m[0:64,0:64])
    m2 = min_max(m[16:80,16:80])
    n1 = min_max(n[0:64,0:64])
    n2 = min_max(n[16:80,16:80])
    o1 = min_max(o[0:64,0:64])
    o2 = min_max(o[16:80,16:80])
    
    te1 = np.stack([m1,n1,o1])
    te2 = np.stack([m2,n2,o2])
    te3=np.concatenate([te1,te2], 2)
    
    #te1 = np.stack([m1,n1,o1])
    #te2 = np.stack([m2,n2,o2])
    #te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
    
    
    te4=np.kron(te3, np.ones((4,4)))
    tmp = torch.from_numpy(te4).clone()
    return  transforms.ToPILImage(mode='RGB')(tmp)   


# In[3]:


alldata3s = alldata3[:-1]
alldata2s = alldata2[:-1]
i1pick=False
i2pick=False

for i1,i1s in tqdm(zip(reversed(alldata3),reversed(alldata3s))):
         
    if i1['date'].index >= alldata2[-1]['date'].index:
        continue
    
    for i2,i2s in zip(reversed(alldata2),reversed(alldata2s)):        

        if i1['date'].index >= i2['date'].index and i2['date'].index>i1s['date'].index:
            i1pick=i1s
        else:
            i1pick=False
            continue
        
        if i2['date'].index >= alldata1[-1]['date'].index:
            continue
        
        for i3 in reversed(alldata1):
  
            if i2['date'].index > i3['date'].index and i3['date'].index>=i2s['date'].index:
                i2pick=i2s
            else:  
                i2pick=False
                continue
                
            if (i1pick and i2pick):
                img=imagemake(i3,i2pick,i1pick)
                fn = i3['df'].tail(1).index.astype(str).values.tolist()
                fn1 = i2pick['df'].tail(1).index.astype(str).values.tolist()
                fn2 = i1pick['df'].tail(1).index.astype(str).values.tolist()
                print(fn,fn1,fn2)
                fname = "datasets/facades1/train/" + fn[0].replace(":","-") + ".png"
                    
                if len(fn[0])<16:
                    fname = fname.replace(".png"," 00-00-00.png")
                img.save(fname)

train

python train.py --dataroot ./datasets/facades1 --name facades_pix2pix1 --model pix2pix --direction AtoB --batch_size 32 --gpu_ids 0,1 --no_flip

未来予測

future.py
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

df1 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k15.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df1.index = pd.to_datetime(df1['date']+" "+df1['time'])

df2 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k30.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df2.index = pd.to_datetime(df2['date']+" "+df2['time'])

df3 = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k60.csv', sep=",",names=('date', 'time', 'open', 'high', 'low', 'close', 'volume'))
df3.index = pd.to_datetime(df3['date']+" "+df3['time'])


wsize=96 #サンプル期間

alldata1 =[]
alldata2=[]
alldata3=[]


for time in range(len(df1)-wsize):
    try:
        dfspan = df1[time:time+wsize]
        alldata1.append({"df":dfspan,"date":dfspan.tail(1)})
    except:
        pass

for time in range(len(df2)-wsize):
    try:
        dfspan = df2[time:time+wsize]
        alldata2.append({"df":dfspan,"date":dfspan.tail(1)})
    except:
        pass

for time in range(len(df3)-wsize):
    try:
        dfspan = df3[time:time+wsize]
        alldata3.append({"df":dfspan,"date":dfspan.tail(1)})
    except:
        pass


# In[33]:


from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import pdb

def min_max(x, axis=None):
    min = x.min(axis=axis, keepdims=True)
    max = x.max(axis=axis, keepdims=True)
    result = (x-min)/(max-min)
    return result

def imagemake(dfspan1,dfspan2,dfspan3):
    a = min_max(np.array([x for x in dfspan1['df']['close']]))
    d = min_max(np.array([x for x in dfspan2['df']['close']]))
    g = min_max(np.array([x for x in dfspan3['df']['close']]))
    
    m = np.outer(a,a).astype(np.float32)
    n = np.outer(d,d).astype(np.float32)
    o = np.outer(g,g).astype(np.float32)
    
    m1 = min_max(m[0:64,0:64])
    m2 = min_max(m[16:80,16:80])
    n1 = min_max(n[0:64,0:64])
    n2 = min_max(n[16:80,16:80])
    o1 = min_max(o[0:64,0:64])
    o2 = min_max(o[16:80,16:80])

    te1 = np.stack([m1,n1,o1])
    te2 = np.stack([m2,n2,o2])
    te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
    
    
    te4=np.kron(te3, np.ones((4,4)))
    tmp = torch.from_numpy(te4).clone()
    return  transforms.ToPILImage(mode='RGB')(tmp)   


# In[34]:


COUNT=5000


alldata3s = alldata3[:-1]
alldata2s = alldata2[:-1]
i1pick=False
i2pick=False

countloop =0

for i1,i1s in tqdm(zip(reversed(alldata3),reversed(alldata3s))):
         
    if i1['date'].index >= alldata2[-1]['date'].index:
        continue
    
    for i2,i2s in zip(reversed(alldata2),reversed(alldata2s)):        

        if i1['date'].index >= i2['date'].index and i2['date'].index>i1s['date'].index:
            i1pick=i1s
        else:
            i1pick=False
            continue
        
        if i2['date'].index >= alldata1[-1]['date'].index:
            continue
        
        for i3 in reversed(alldata1):
  
            if i2['date'].index > i3['date'].index and i3['date'].index>=i2s['date'].index:
                i2pick=i2s
            else:  
                i2pick=False
                continue
                
            if (i1pick and i2pick):
                img=imagemake(i3,i2pick,i1pick)
                fn = i3['df'].tail(1).index.astype(str).values.tolist()
                fn1 = i2pick['df'].tail(1).index.astype(str).values.tolist()
                fn2 = i1pick['df'].tail(1).index.astype(str).values.tolist()
                print(fn,fn1,fn2)
                fname = "datasets/facades1/test/" + fn[0].replace(":","_") + "sk.png"
                    
                if len(fn[0])<16:
                    fname = fname.replace("sk.png"," 00_00_00sk.png")
                img.save(fname)
                
                countloop=countloop+1
                if countloop> COUNT:
                        break
        else:
            continue
        break
    else:
        continue
    break

Test

python test.py --dataroot ./datasets/facades1 --name facades_pix2pix1 --model pix2pix --direction AtoB
BackTest.py
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
get_ipython().run_line_magic('matplotlib', 'inline')
import numpy as np
import pdb
import re

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

path = os.getcwd()

new_dir_path = "results/facades_pix2pix1/test_latest/images"
                
img=[]
image={}
for imageName in os.listdir(new_dir_path):
    inputPath = os.path.join(path, new_dir_path,imageName)
    if "fake_B" in  imageName : image['fakeB']=inputPath
    if "real_A" in  imageName: image['realA']=inputPath
    if "real_B" in  imageName: image['realB']=inputPath
    if len(image)==3:
        ddd=re.findall(r"\d\d\d\d-\d\d-\d\d \d\d_\d\d_\d\d",inputPath)
        
        try:
            image['date']=ddd[0].replace("_",":")
            img.append(image)
            image={}
        except:
            pass


# In[36]:


import re 
import datetime

def min_max(x, axis=None):
    min = x.min(axis=axis, keepdims=True)
    max = x.max(axis=axis, keepdims=True)
    result = (x-min)/(max-min)
    return result

def getprice(img,transform,info):
    #match = re.search(r'\d{4}-\d{2}-\d{2}', img['fakeB'])
    #date = datetime.datetime.strptime(match.group(), '%Y-%m-%d').date()
    fake = transform(Image.open(img['fakeB'])) 
    fake=fake.numpy()
  
    fake1x = fake[info,:,0]
    fake1y = fake[info,0,:]
    fake2=(fake1x+fake1y)/2   
    
    return min_max(fake2),img['date']


# In[165]:


def GetSignal(item):
    y1 = np.mean(item[128:192])
    y2 = np.mean(item[192:256])
    return y2/y1


# In[166]:


import matplotlib.animation as animation
get_ipython().run_line_magic('matplotlib', 'nbagg')

import random
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

transform = transforms.PILToTensor()
fig = plt.figure()

ims = []


# In[167]:


import matplotlib.pyplot as plt
import pandas as pd

get_ipython().run_line_magic('matplotlib', 'inline')
from tqdm import tqdm

realdf = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k15.csv', sep=",",names=('date', 'time', 'Open', 'High', 'Low', 'Close', 'Volume'))
realdf.index = pd.to_datetime(realdf['date']+" "+realdf['time'])
realdf['signal']=-1


for item in img:
    v2,date = getprice(item,transform,0)   
    signal =GetSignal(v2)

    realdf.at[date, 'signal'] = signal

BackTest

シグナルは過去の平均値と、未来の平均値を割り算して閾値判定した。
6倍ぐらい(逆数は0.18)の差がついたところを、シグナルに使うのがよさそう・・・

 買い  6 < signal1 < 80
 売り  0.0125 < signal1 <0.18

本当は正規分布で二山ぐらいに分離した状態をエントリー判定に使うのがベターなのであるが

BackTest.py
from backtesting import Backtest, Strategy
from backtesting.lib import SignalStrategy, TrailingStrategy
import backtesting


def SIGNAL():
    return realdf.signal

class pix2pix(Strategy):
    
    def init(self):
        self.signal1=self.I(SIGNAL)
        pass
    
    def next(self): #チャートデータの行ごとに呼び出される
        super().next()

        current_time = self.data.index[-1]
            
        if self.signal1>6 and self.signal1<80:
            #self.position.close()
            self.buy() # 買い
            
        elif self.signal1>0.0125 and self.signal1<0.18:
            #self.position.close()
            self.sell()# 売り
    
        # Additionally, set aggressive stop-loss on trades that have been open 
        # for more than two days
        for trade in self.trades:
            if current_time - trade.entry_time > pd.Timedelta(30, "m"):
                self.position.close()
                    
                    
bt = Backtest(
    realdf, # チャートデータ
    pix2pix, # 売買戦略
    cash=100000, # 最初の所持金
    commission=0.000, # 取引手数料
    margin=0.5, # レバレッジ倍率の逆数(0.5で2倍レバレッジ)
    trade_on_close=True, # True:現在の終値で取引,False:次の時間の始値で取引
    exclusive_orders=True #自動でポジションをクローズ
)

output = bt.run() # バックテスト実行
print(output) # 実行結果(データ)
bt.plot() # 実行結果(グラフ)
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1