14
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

PyTorchで画像分析により為替(FX)予測 ①

Last updated at Posted at 2021-03-10

LSTMなどを使って相場の予測など作ってみたのですが、なかなか精度がでず困っていたところ以下の記事を見つけたので、同じようにしてFXの予測をしてみました。

結果として、全然精度が出なかったのですが、日経平均の予想で結構な精度が出るのに為替では出ないというのも腑に落ちないので何か根本的なミスをしている気がします。
(わかりましたら、修正します。)

当日終値までのデータを利用して、翌日の終値の上下を予想していたのですが、どうも前日終値を使って翌日以降の終値を予想するのが正しいようです。
修正したところValデータでも85%程度の精度が出ました。
他の通貨のデータ等を利用して学習データを増やすとより汎化性能を上げられそうです。

基本的に日経平均と為替相場はかなり相関しているはずですね。

為替データの取得

DataReaderを使ってYahooより為替データをDataFrameに入れます。

↓EURJPYの取得

data = pdr.DataReader('EURJPY=X','yahoo',end='2021-03-09',start='2001-01-03')
data.head()

目的関数の取得

今回は、終日終値が目標とする日の終日終値を上回っていれば、「Up」のフラグを立てることとする。

data['Up'] = 0
# future_num日後の終値の上下を予測
future_num = 1
for i in range(len(data)-1):
    if data['Close'][i] < data['Close'][i+future_num]:
        data['Up'][i] = 1

保存するファイル名の準備

保存するファイルを格納するフォルダを作成し、パス及びフォルダを設定(対象日をファイル名として保存)

data['ImageName'] =  ['./DataFolder/' + str(data.index[i].year)+ '-'  + str(data.index[i].month) + '-' + str(data.index[i].day) + '.png' for i in range(len(data))]

チャート画像ファイルの出力

mplfinanceを利用して、ファイルを出力します。
参考サイトは、mpl_financeを利用していましたが、mpl_financeは今後終了するため、mplfinanceを利用した方が良いそうです。
設定値などの元論文は、以下
https://arxiv.org/pdf/1903.12258.pdf

↓画像データのファイル名の位置を修正しました。

img_create = 1
if img_create == 1:
    seq_len = 20

    data.fillna(0)
    #data.reset_index(inplace=True)
    data['Date2'] = data.index
    data['Date2'] = data['Date2'].map(mdates.date2num)
    for i in range(0, len(data)-1):
        c = data.iloc[i:i + int(seq_len) , :]
        if len(c) == int(seq_len):
            # Date,Open,High,Low,Adj Close,Volume
            target = ['Date2','Open','High','Low','Close']
            pngfile = data['ImageName'][i + int(seq_len)-2]
            
            my_dpi = 96
            # Create my own `marketcolors` to use with the `nightclouds` style:
            mc = mpf.make_marketcolors(up='#77d879',down='#db3f3f',inherit=True)

            # Create a new style based on `nightclouds` but with my own `marketcolors`:
            s  = mpf.make_mpf_style(base_mpf_style='nightclouds',marketcolors=mc,gridstyle = '')
            
            fig = mpf.figure(style=s,figsize=(48 / my_dpi, 48 / my_dpi), dpi=my_dpi)
            ax1 = fig.add_subplot(1,1,1)

            ax1.grid(False)
            ax1.set_xticklabels([])
            ax1.set_yticklabels([])
            ax1.xaxis.set_visible(False)
            ax1.yaxis.set_visible(False)
            ax1.axis('off')

            mpf.plot(c[target], ax=ax1,volume=False,type='candle',update_width_config=dict(candle_linewidth=1))
            fig.savefig(pngfile,  pad_inches=0, transparent=False, bbox_inches="tight")

            plt.close(fig)


    print("Converting olhc to candlestik finished.")

データセット準備

PyTorchのデータセット準備

x_train, x_val, y_train, y_val = model_selection.train_test_split(data['ImageName'].values, data['Up'].values, test_size=0.1,random_state=1)

データ読み込み

X = []
Y = []
X_val = []
Y_val = []

transform = transforms.Compose([transforms.ToTensor()])

# for image_file,label in zip(x_train,y_train):
for image_file,label in zip(x_train,y_train):
    try:
        image = Image.open(image_file)
        image = image.convert('RGB')
        image = transform(image)
        #image = np.asarray(image)
        X.append(image)
        Y.append(label)
    except FileNotFoundError as e:
        print(e)
    
    
for image_file,label in zip(x_val,y_val):
    try:
        image = Image.open(image_file)
        image = image.convert('RGB')
        image = transform(image)
        X_val.append(image)
        Y_val.append(label)
    except FileNotFoundError as e:
        print(e)
class EUR_Dataset(Dataset):
    def __init__(self, image, label_list, phase=None):
        
        self.transform = transforms.Compose([transforms.ToTensor()])
        self.image = image
        self.label_list = torch.tensor(label_list,dtype=torch.long)
        self.phase = phase
        
        
    def __getitem__(self,index):
        # index番目の画像を読み込み、前処理を行う
        image = self.image[index]
        # index番目のラベルを取得する
        label = self.label_list[index]
        
        return image, label

    def __len__(self):
        return len(self.image)
train_dataset = EUR_Dataset(X,Y,phase='train')
total_samples = len(train_dataset)
print(total_samples)
val_dataset = EUR_Dataset(X_val,Y_val,phase='val')
val_samples = len(val_dataset)
print(val_samples)

モデルの読み込み(resnet18を利用)

model = models.resnet18(pretrained=False)

num_ftrs = model.fc.in_features

model.fc  = nn.Sequential(
                      nn.Linear(num_ftrs ,2))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = model.to(device)

criterion = nn.CrossEntropyLoss()


# Observe that all parameters are being optimized
optimizer_ft = optim.AdamW(model.parameters())

batchsize = 16

loss_list = []
val_loss_list = []
# DataLoaderのインスタンス作成
train_dataloader = DataLoader(dataset=train_dataset, batch_size=batchsize, shuffle=True)
val_dataloader = DataLoader(dataset=val_dataset, batch_size=batchsize, shuffle=True)

# データローダーを辞書として格納する
dataloaders_dict = {
    'train' : train_dataloader,
    'val' : val_dataloader
}

学習の実施

Train_loss = []
Val_loss = []
Train_acc = []
Val_acc = []


def train_model(model, criterion, optimizer,  num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    #best_acc = 0.0
    best_loss = 1000.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
    
        for phase in ['train', 'val']:
            running_loss = 0.0
            running_corrects = 0
            
            # 学習モード、検証モードの切替
            if phase == 'train':
                model.train()
            else:
                model.eval()

            #for i,data in enumerate(trainloader):
            i = 0
            data_num = 0
            for inputs, labels in dataloaders_dict[phase]:
                i += 1
                #for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase=='train'):
                    outputs = model(inputs)
                    preds = torch.argmax(outputs, axis=1)

                    #print(preds)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                    data_num += inputs.size(0)
                    
            epoch_loss = running_loss/data_num
            epoch_acc = running_corrects.double()/data_num
                    

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))
            
            if phase == 'train':
                Train_loss.append(epoch_loss)
                Train_acc.append(epoch_acc)
            else:
                Val_loss.append(epoch_loss)
                Val_acc.append(epoch_acc)
            
            # deep copy the model
            if phase == 'val' and epoch_loss < best_loss:
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Loss: {:4f}'.format(best_loss))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model
model = train_model(model,criterion,optimizer_ft,num_epochs=20)

学習結果

~~
学習の結果、Valデータがほとんど学習できてないので使い物にならないですね。
(ランダムValデータを取っており、Valデータの前後のデータはTrainデータに含まれています。そういう意味でももっと精度が出なければいけません。)
ユーロの他に豪ドルも試してみましたが、同じような結果です。
為替データと株価データのチャートの特徴にそんなに違いはないと思うので、もっと精度は出るはずだと思うので、何かミスをしているように思います。~~

汎化性能はまだ改善の余地がありそうですが、Valデータで80%近くの数値が出ました。
(学習データは以下の比率なので、まずまず予測はできている数値と考えて良さそうです。
up: 2410
down: 2273
)

image.png

今後、他の通貨データも利用して汎化性能を高めて、実際の取引にも利用してみようかと思います。
また、株価データにも適用して考察してみたいと思います。

精度が出るように修正できたらアップしていきたいと思います。
(他のデータでも試してみようかと思います。)

14
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?