bitflyer FXの価格予測モデルを作ってみました。
現在までの価格推移を学習して60分後の価格を予測します。
精度はこれから追及していきますが、ひとまず動くようになったので公開します。
python 3.5.6
Platform: Windows-10-10.0.17134-SP0
Chainer: 5.0.0
NumPy: 1.15.2
CuPy: Not Available
iDeep: Not Available
参考にさせて頂いたサイト様
bitflyer_deeplearning.py
#-- DEEP LEARNING TEST 1 --
import requests
import matplotlib.pyplot as plt
import time
import numpy as np
import json
from datetime import datetime
from sklearn import preprocessing
import chainer
import chainer.functions as F
import chainer.links as L
from chainer import cuda, Function, gradient_check, report, training, utils, Variable
from chainer import datasets, iterators, optimizers, serializers
from chainer import Link, Chain, ChainList
from chainer.datasets import tuple_dataset
from chainer import report,training
from chainer.training import extensions
#-------------設定項目------------------------
chart_API = "cryptowatch" # 価格の取得元
epoch = 50 # エポック数
#-------------設定項目------------------------
#-------------価格APIの関数--------------
# BTCFXのチャート価格をAPIで取得する関数(初回用)
def get_price(min, before=0, after=0):
# Cryptowatchを使用する場合
if chart_API == "cryptowatch":
price = []
params = {"periods" : min }
if before != 0:
params["before"] = before
if after != 0:
params["after"] = after
response = requests.get("https://api.cryptowat.ch/markets/bitflyer/btcfxjpy/ohlc",params)
data = response.json()
if data["result"][str(min)] is not None:
for i in data["result"][str(min)]:
if i[1] != 0 and i[2] != 0 and i[3] != 0 and i[4] != 0 and i[5] != 0:
price.append({ "close_time" : i[0],
"close_time_dt" : datetime.fromtimestamp(i[0]).strftime('%Y/%m/%d %H:%M'),
"open_price" : i[1],
"high_price" : i[2],
"low_price" : i[3],
"close_price": i[4],
"volume": i[5] })
return price
else:
print_log("データが存在しません")
return None
#-------------その他の補助関数--------------
# ログの出力
def print_log( text ):
print( text )
#------------データ処理--------------
# 必要なローソク足を準備
print_log("---データの準備中---")
price = get_price(3600,after=1451606400)
last_data = price[-1*6000:]
print_log("---実行開始---")
close_float_arr = []
for i in last_data:
# 個別にバラして読み込み
close_float_arr.append(float(i['close_price']))
# 正規化
close_float_arr = preprocessing.minmax_scale(close_float_arr)
# 入力変数xと出力変数tの作成
x, t = [], []
N = len(close_float_arr)
# 終値の配列を加工
for n in range(30, N):
_x = close_float_arr[n-30: n] # 入力変数にはM本分の終値を順に積み込む
_t = close_float_arr[n] # 出力変数には入力変数の一番新しい足より60分先の終値を積み込む
# リスト(x, t)に追加
x.append(_x)
t.append(_t)
# np.arrayに変換する
x = np.array(x,dtype='float32')
t = np.array(t,dtype='float32').reshape(len(t), 1)
# # 訓練用、検証用に分割
N_train = int(N * 0.3)
x_train, x_test = x[:N_train], x[N_train:]
t_train, t_test = t[:N_train], t[N_train:]
#------------Chainerの記述--------------
#------------モデルを定義--------------
class LSTM(Chain):
def __init__(self, n_units, n_output):
super().__init__()
with self.init_scope():
self.l1 = L.LSTM(None, n_units) # LSTM層
self.l2 = L.Linear(None, n_output)
# LSTM内のステートをリセット
def reset_state(self):
self.l1.reset_state()
# 損失関数
def __call__(self, x, y, train=True):
z = self.predict(x, train)
loss = F.mean_squared_error(z, y)
if train:
report({'loss': loss}, self)
return loss
# 順伝播
def predict(self, x, train=False):
# ドロップアウト
if train:
h1 = F.dropout(self.l1(x), ratio=0.05)
else:
h1 = self.l1(x)
h2 = self.l2(h1)
return h2
class LSTMUpdater(training.StandardUpdater):
def __init__(self, data_iter, optimizer, device=None):
super(LSTMUpdater, self).__init__(data_iter, optimizer, device=None)
self.device = device
def update_core(self):
data_iter = self.get_iterator("main")
optimizer = self.get_optimizer("main")
batch = data_iter.__next__()
x_batch, y_batch = chainer.dataset.concat_examples(batch, self.device)
# ステートをリセット
optimizer.target.reset_state()
# その他は時系列系の更新と同じ
optimizer.target.cleargrads()
loss = optimizer.target(x_batch, y_batch)
loss.backward()
# 忘却ゲートunchain_backwardを実装
loss.unchain_backward()
optimizer.update()
#------------データの準備--------------
# chainer用のデータセットをzip化して準備
train = list(zip(x_train, t_train))
test = list(zip(x_test, t_test))
#------------モデルを宣言--------------
# モデルの宣言
model = LSTM(30, 1)
#------------Optimizerの記述--------------
# optimizerの定義
optimizer = optimizers.Adam() # 最適化アルゴリズムにはAdamを採用
optimizer.setup(model)
#------------iteratorの定義--------------
train_iter = chainer.iterators.SerialIterator(train, 20)
test_iter = chainer.iterators.SerialIterator(test, 20, repeat=False, shuffle=False)
#------------updaterの定義------------
updater = LSTMUpdater(train_iter, optimizer)
#------------trainerの定義------------
trainer = training.Trainer(updater, (epoch, 'epoch'), out='result')
# trainerの拡張機能
trainer.extend(extensions.Evaluator(test_iter, model)) # 評価データで評価
trainer.extend(extensions.LogReport(trigger=(1, 'epoch'))) # 学習結果の途中を表示する
# 1エポックごとに、trainデータに対するlossと、testデータに対するlossを出力させる
trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'validation/main/loss', 'elapsed_time']), trigger=(1, 'epoch'))
#------------学習開始------------
trainer.run()
# 予測値の計算
model.reset_state()
y_train = model.predict(Variable(x_train)).data
# プロット
plt.plot(t_train, color='#2980b9') # 実測値は青色
plt.plot(y_train, color='#f39c12') # 予測値はオレンジ
plt.show()
# 予測値の計算
model.reset_state()
y_test = model.predict(Variable(x_test)).data
# プロット
plt.plot(t_test, color='#2980b9') # 実測値は青色
plt.plot(y_test, color='#f39c12') # 予測値はオレンジ
plt.show()