3
11

More than 1 year has passed since last update.

Pix2Pixを使ったEURUSDのチャート予想

Last updated at Posted at 2022-02-10

内容

 外国為替市場(FOREX)のチャート予想に取り組んでいます。今回は画像生成系のタスクを使って、過去の為替チャートから未来の予想チャートを描くことをやってみました。最終的には流行であるPix2Pixを使って、何とかそれらしいチャートを描くところまで来ました。

コンセプト

 これまではメタトレードのインジケータを使ってトレードしましたが、やはりこれは難しいということで断念しました。自身のブログではチャートを山に見立てて、地形図を為替相場予想に使ってみようと思ったのが始まりです。詳細は下記の記事をお読みください。 http://1969681.blog66.fc2.com/blog-entry-637.html

手法

 アルゴリズムは完全にGITHUBのリポジトリを使っています。
https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix

やっていることは単に教師データの作成だけです(ただし、こういうコンセプトは、やはりオリジナルではないかと思います)
 以下にポイントを書きます。

 df = data.DataReader(symbol1,'av-daily',start,end, api_key=apikey)

  • 時系列データを画像タスクに変換するために、ベクトルの外積を使って二次元マッピング
     https://deepage.net/features/numpy-outer.html
  • データは96日分を一塊にして、A画像、B画像を生成。32日分の重複個所を作って、連続性を確保した。
  • Pix2PixへのINPUTは、64x128 → 4倍へ拡張 → 256x512
  • 画像はここに置く → pytorch-CycleGAN-and-pix2pix\datasets\facades\train
    image.png

PIX2PIX

  • PIX2PIXの実行コマンドは、サイトからの転機
  • エポック数は、2つのパラメータで与える
  • GPUは二枚を使用
  • 反転はしないに設定すること
  • バッチサイズは32
batch.sh
python train.py --dataroot ./datasets/facades --name facades_pix2pix --model pix2pix --direction AtoB --batch_size 32 --gpu_ids 0,1 --no_flip

ちなみに、引数を間違えても、下記のようなメッセージで教えてくれる。

batch.sh
usage: train.py [-h] --dataroot DATAROOT [--name NAME] [--use_wandb] [--gpu_ids GPU_IDS] [--checkpoints_dir CHECKPOINTS_DIR]
                [--model MODEL] [--input_nc INPUT_NC] [--output_nc OUTPUT_NC] [--ngf NGF] [--ndf NDF] [--netD NETD] [--netG NETG]
                [--n_layers_D N_LAYERS_D] [--norm NORM] [--init_type INIT_TYPE] [--init_gain INIT_GAIN] [--no_dropout]
                [--dataset_mode DATASET_MODE] [--direction DIRECTION] [--serial_batches] [--num_threads NUM_THREADS]
                [--batch_size BATCH_SIZE] [--load_size LOAD_SIZE] [--crop_size CROP_SIZE] [--max_dataset_size MAX_DATASET_SIZE]
                [--preprocess PREPROCESS] [--no_flip] [--display_winsize DISPLAY_WINSIZE] [--epoch EPOCH] [--load_iter LOAD_ITER]
                [--verbose] [--suffix SUFFIX] [--display_freq DISPLAY_FREQ] [--display_ncols DISPLAY_NCOLS] [--display_id DISPLAY_ID]
                [--display_server DISPLAY_SERVER] [--display_env DISPLAY_ENV] [--display_port DISPLAY_PORT]
                [--update_html_freq UPDATE_HTML_FREQ] [--print_freq PRINT_FREQ] [--no_html] [--save_latest_freq SAVE_LATEST_FREQ]
                [--save_epoch_freq SAVE_EPOCH_FREQ] [--save_by_iter] [--continue_train] [--epoch_count EPOCH_COUNT] [--phase PHASE]
                [--n_epochs N_EPOCHS] [--n_epochs_decay N_EPOCHS_DECAY] [--beta1 BETA1] [--lr LR] [--gan_mode GAN_MODE]
                [--pool_size POOL_SIZE] [--lr_policy LR_POLICY] [--lr_decay_iters LR_DECAY_ITERS] [--lambda_L1 LAMBDA_L1]

うまくいけば、下記のメッセージで評価が進む。

batch.sh
----------------- Options ---------------
               batch_size: 32                                   [default: 1]
                    beta1: 0.5
          checkpoints_dir: ./checkpoints
           continue_train: False
                crop_size: 256
                 dataroot: ./datasets/facades                   [default: None]
             dataset_mode: aligned
                direction: AtoB
              display_env: main
             display_freq: 400
               display_id: 0                                    [default: 1]
            display_ncols: 4
             display_port: 8097
           display_server: http://localhost
          display_winsize: 256
                    epoch: latest
              epoch_count: 1
                 gan_mode: vanilla
                  gpu_ids: 0,1                                  [default: 0]
                init_gain: 0.02
                init_type: normal
                 input_nc: 3
                  isTrain: True                                 [default: None]
                lambda_L1: 100.0
                load_iter: 0                                    [default: 0]
                load_size: 286
                       lr: 0.0002
           lr_decay_iters: 50
                lr_policy: linear
         max_dataset_size: inf
                    model: pix2pix                              [default: cycle_gan]
                 n_epochs: 400                                  [default: 100]
           n_epochs_decay: 400                                  [default: 100]
               n_layers_D: 3
                     name: facades_pix2pix                      [default: experiment_name]
                      ndf: 64
                     netD: basic
                     netG: unet_256
                      ngf: 64
               no_dropout: False
                  no_flip: False
                  no_html: False
                     norm: batch
              num_threads: 4
                output_nc: 3
                    phase: train
                pool_size: 0
               preprocess: resize_and_crop
               print_freq: 100
             save_by_iter: False
          save_epoch_freq: 5
         save_latest_freq: 5000
           serial_batches: False
                   suffix:
         update_html_freq: 1000
                use_wandb: False
                  verbose: False
----------------- End -------------------
dataset [AlignedDataset] was created

Trainデータの作成

解析期間 WSIZE 96日 --> a(n=32)+b(n=32)+c(n=32)  TRAIN時に Aはa,b、Bはb,cを使う  
テンソル 0:終値 1:高値-終値 2:終値-安値
正規化 MAX-MIN (A,B別々に正規化) 

main.py
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np
import pdb


import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

apikey = '取得したAPIキー'
symbol1 = 'EURUSD'

start = dt.date(2001,8,1)
end = dt.date(2022,2,20)
df1 = data.DataReader(symbol1,'av-daily',start,end, api_key=apikey)

wsize=96 #サンプル期間

df1.index = pd.DatetimeIndex(df1.index)

alldata =[]

for time in range(len(df1)-wsize):
    try:
        dfspan1 = df1[time:time+wsize]
        alldata.append({"df":dfspan1,"date":dfspan1.tail(1)})
    except:
        pass
    
from PIL import Image
import matplotlib.pyplot as plt

def min_max(x, axis=None):
    min = x.min(axis=axis, keepdims=True)
    max = x.max(axis=axis, keepdims=True)
    result = (x-min)/(max-min)
    return result

def imagemake(dfspan1):
    a = np.array([x for x in dfspan1['df']['close']])
    b = np.array([x for x in dfspan1['df']['high']])
    c = np.array([x for x in dfspan1['df']['low']])
    
    m = np.outer(a,a).astype(np.float32)
    n = np.outer(b-a,b-a).astype(np.float32)
    o = np.outer(a-c,a-c).astype(np.float32)
    
    m1 = min_max(m[0:64,0:64])
    m2 = min_max(m[32:96,32:96])
    n1 = min_max(n[0:64,0:64])
    n2 = min_max(n[32:96,32:96])
    o1 = min_max(o[0:64,0:64])
    o2 = min_max(o[32:96,32:96])
    
    te1 = np.stack([m1,n1,o1])
    te2 = np.stack([m2,n2,o2])
    te3=np.concatenate([te1,te2], 2)
    
    te4=np.kron(te3, np.ones((4,4)))
    tmp = torch.from_numpy(te4).clone()
    return  transforms.ToPILImage(mode='RGB')(tmp)   

for item in alldata:
    img=imagemake(item)
    fn = item['date'].index.astype(str).values.tolist()
    fname = "datasets/facades/train/" + fn[0] + ".png"
    img.save(fname)

Pix2Pix予測¶

テストフォルダのデータで予想

batch.sh
python test.py --dataroot ./datasets/facades --name facades_pix2pix --model pix2pix --direction AtoB
main.py
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pdb

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'

path = os.getcwd()

new_dir_path = "results/facades_pix2pix/test_latest/images"
                
img=[]
image={}
for imageName in os.listdir(new_dir_path):
    inputPath = os.path.join(path, new_dir_path,imageName)
    if "fake_B" in  imageName : image['fakeB']=inputPath
    if "real_A" in  imageName: image['realA']=inputPath
    if "real_B" in  imageName: image['realB']=inputPath
    if len(image)==3:
        img.append(image)
        image={}
import re 
import datetime

def getprice(img,transform):
    match = re.search(r'\d{4}-\d{2}-\d{2}', img['fakeB'])
    date = datetime.datetime.strptime(match.group(), '%Y-%m-%d').date()
    fake = transform(Image.open(img['fakeB'])) 
    real = transform(Image.open(img['realB']))
    fake=fake.numpy()
    real=real.numpy()
    
    real1 = min_max(np.diag(real[0,:,:]))  
    plt.plot(real1)
 
    fake1 = min_max(np.sqrt(np.diag(fake[0,:,:])))
    fake2 = min_max(fake[0,0,:])
    fake3 = min_max(fake[0,:,0])
    
    plt.plot((fake1+fake2+fake3)/3)   
   
    plt.title(date)
    plt.show()
transform = transforms.PILToTensor()
for item in img:
    price = getprice(item,transform)

image.png

未来予測

教師データの作成

変更点は以下のみ
te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える

OANDAの日足データを使用するように変更した
 EURUSD.oj5k1440.csv

main.py
import pandas as pd
import datetime as dt
from pandas_datareader import data
import mplfinance as mpf
import torch
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
import torch.nn as nn
import numpy as np
import pdb


import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
#

apikey = '取得したAPIキー'
symbol1 = 'EURUSD'

start = dt.date(2021,1,1)
end = dt.date(2021,1,2)
df1 = data.DataReader(symbol1,'av-daily',start,end, api_key=apikey)

df = pd.read_csv(r'C://Users//User//Desktop//EURUSD.oj5k1440.csv', index_col=0)
df=df.drop(df.columns[0], axis=1)
df.columns=df1.columns
df_new=pd.concat([df1,df.tail(100)])
print(df_new)

wsize=96 #サンプル期間

df_new.index = pd.DatetimeIndex(df_new.index)

alldata =[]

for time in range(len(df_new)-wsize+1):
    try:
        dfspan1 = df_new[time:time+wsize]
        alldata.append({"df":dfspan1,"date":dfspan1.tail(1)})
    except:
        pass
    
from PIL import Image
import matplotlib.pyplot as plt

def min_max(x, axis=None):
    min = x.min(axis=axis, keepdims=True)
    max = x.max(axis=axis, keepdims=True)
    result = (x-min)/(max-min)
    return result

def imagemake(dfspan1):
    a = np.array([x for x in dfspan1['df']['close']])
    b = np.array([x for x in dfspan1['df']['high']])
    c = np.array([x for x in dfspan1['df']['low']])
    
    m = np.outer(a,a).astype(np.float32)
    n = np.outer(b-a,b-a).astype(np.float32)
    o = np.outer(a-c,a-c).astype(np.float32)
    
    m1 = min_max(m[0:64,0:64])
    m2 = min_max(m[32:96,32:96])
    n1 = min_max(n[0:64,0:64])
    n2 = min_max(n[32:96,32:96])
    o1 = min_max(o[0:64,0:64])
    o2 = min_max(o[32:96,32:96])
    
    #te1 = np.stack([m1,n1,o1])
    te2 = np.stack([m2,n2,o2])

    te3=np.concatenate([te2,te2], 2)#real = fake = te2で与える
    
    te4=np.kron(te3, np.ones((4,4)))
    tmp = torch.from_numpy(te4).clone()
    return  transforms.ToPILImage(mode='RGB')(tmp)   


for item in alldata:
    img=imagemake(item)
    fn = item['date'].index.astype(str).values.tolist()
    fname = "datasets/facades/test/" + fn[0] + "sk.png"
    img.save(fname)

後処理

テストを実施 → 予想チャートを描く

batch.sh
python test.py --dataroot ./datasets/facades --name facades_pix2pix --model pix2pix --direction AtoB
main.py
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pdb

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'


path = os.getcwd()

new_dir_path = "results/facades_pix2pix/test_latest/images"
                
img=[]
image={}
for imageName in os.listdir(new_dir_path):
    inputPath = os.path.join(path, new_dir_path,imageName)
    if "fake_B" in  imageName : image['fakeB']=inputPath
    if "real_A" in  imageName: image['realA']=inputPath
    if "real_B" in  imageName: image['realB']=inputPath
    if len(image)==3:
        img.append(image)
        image={}
import re 
import datetime
import pdb

def getprice(img,transform):
    match = re.search(r'\d{4}-\d{2}-\d{2}', img['fakeB'])
    date = datetime.datetime.strptime(match.group(), '%Y-%m-%d').date()
    fake = transform(Image.open(img['fakeB'])) 
    real = transform(Image.open(img['realA']))
    fake=fake.numpy()
    real=real.numpy()
            
    plt.figure(figsize=(10,4))
    plt.subplot(1,2,1)
    plt.title(date) 
    
    real1 = np.sqrt(np.diag(real[0,:,:]))#対角成分
    real1_avg = np.mean(real1[128:256])
    print(real1_avg)
    real2= real1-real1_avg
    plt.plot(real2,label="real") 

    fake1 = np.sqrt(np.diag(fake[0,:,:]))#対角成分
    fake1_avg = np.mean(fake1[0:128])
    print(fake1_avg)
    fake2= fake1-fake1_avg
    fake3=np.append(real2[0:128],fake2)
    plt.plot(fake3,label="fake")
    
    plt.legend() 
    plt.tight_layout()
    plt.show()
transform = transforms.PILToTensor()
for item in img:
    price = getprice(item,transform)

image.png

3
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
11