本記事について
上記のリポジトリのDQNエージェントのコードの一部を解説します。
forex-tradingのDQNエージェントは以下に沿って行動します。
1 データの読み込み
事前に保存したデータを読み込みデータの9割をトレインデータに1割をテストデータにします。
self.x = np.load("data/x.npy")
y = np.load("data/target.npy")
self.low = y[:, :, 2].reshape((self.x.shape[0], -1))
self.high = y[:, :, 1].reshape((self.x.shape[0], -1))
self.y = y[:, :, 0].reshape((self.x.shape[0], -1))
self.atr = np.load("data/atr.npy").reshape((self.x.shape[0], -1)).astype(np.int32)
self.train_step = np.arange(0, int(self.x.shape[1] * 0.9))
self.test_step = np.arange(self.train_step[-1], int(self.x.shape[1]))
2 Modelの作成
TPUを使う場合ストラテジーを作成し、GPUを使う場合mixed_precisionを使い、CPUの場合何もしない。
if self.use_device == "tpu":
try:
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
self.strategy = tf.distribute.TPUStrategy(resolver)
except:
self.use_device = "cpu"
elif self.use_device == "gpu":
from tensorflow.keras.mixed_precision import experimental as mixed_precision
policy = mixed_precision.Policy('mixed_float16')
mixed_precision.set_policy(policy)
フーバー損失関数を作成する
def loss_function(self):
def loss(q_backup, q):
k = 2
error = q_backup - q
loss = tf.where(tf.abs(error) <= k, error ** 2 * 0.5, 0.5 * k ** 2 + k * (tf.abs(error) - k))
loss = tf.reduce_mean(loss)
return loss
return loss
モデルを作成し、要約を表示します。
def _build_model(self, lr):
loss = self.loss_function()
if self.dueling:
dqn_network.dueling = True
model = dqn_network.network.build_model(self.model_name, self.x.shape[-2:], self.action_size)
model.compile(
tf.keras.optimizers.Adam(lr, clipnorm=1.), loss=loss, steps_per_execution=100
)
return model
def build_model(self, lr=1e-4):
if self.use_device == "tpu":
with self.strategy.scope():
self.model = self._build_model(lr)
self.target_model = tf.keras.models.clone_model(self.model)
self.target_model.set_weights(self.model.get_weights())
else:
self.model = self._build_model(lr)
self.target_model = tf.keras.models.clone_model(self.model)
self.target_model.set_weights(self.model.get_weights())
self.model.summary()
3 トレーニングデータを生成する
このコードでは探査を不要にするために全てのアクションに対応する報酬を割り振ります。そして、pip_scaleを1以上にすることでオーバーフィットの度合いを制限することができます。
def train_data(self):
states, returns, new_states, old_actions = [], [], [], []
h, h_ = 0, self.train_step[-1]
n = self.n
s = self.s
for s in range(self.x.shape[0]):
df = self.x[s, h:h_ - n].copy()
trend = self.y[s, h:h_]
buy = np.array([trend[i + n] - trend[i] for i in range(len(trend) - n)]).reshape((-1,))
scale = np.quantile(abs(buy), 0.99) / 1
buy = np.clip(buy / scale, -1, 1)
spread = np.quantile(self.atr[s], 0.25) / scale
spread = np.clip(spread, 0.02, None) * self.pip_scale
spread = np.round(spread, 2)
buy *= self.pip_scale
sell = -buy
pip = np.zeros((len(trend) - n, self.action_size, self.action_size))
b = 0 if self.action_type == 2 else 1
s = 0 if self.action_type == 1 else 1
pip[:, 0, 0] = buy * b
pip[:, 0, 1] = (sell - spread) * s
pip[:, 1, 0] = (buy - spread) * b
pip[:, 1, 1] = sell * s
if self.action_size == 3:
pip[:, 2, 0] = buy - spread
pip[:, 2, 1] = sell - spread
states.append(df[:-self.n])
returns.append(pip[:-self.n])
new_states.append(np.roll(df, -self.n, axis=0)[:-self.n])
concat = np.concatenate
self.states, self.returns, self.new_states = \
np.array(states), np.array(returns), np.array(new_states)
self.returns = np.round(self.returns, 2).astype(np.float32)
3 トレーニングの実行
ターゲットQ値の計算をし、配列にnanが含まれていないか確認します。
def target_q(self, returns, target_q, target_a):
if self.train_loss:
target_a = np.argmax(target_a, -1)
rr = range(len(returns))
returns[:, 0, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]]
returns[:, 0, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]]
returns[:, 1, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]]
returns[:, 1, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]]
if self.action_size == 3:
returns[:, 0, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]]
returns[:, 1, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]]
returns[:, 2, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]]
returns[:, 2, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]]
returns[:, 2, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]]
assert np.mean(np.isnan(returns) == False) == 1
return returns
def _train(self, epoch=100, s=0, batch_size=2056):
assert isinstance(s, int)
ind = self.ind # ランダムなインデックス
# データをシャッフルする。
states, new_states, returns = \
self.states[s][ind].copy(), self.new_states[s][ind].copy(), self.returns[s][ind].copy()
# 最初のトレーニングではgammaをゼロにする。
if self.train_loss:
target_q = self.target_model.predict(new_states, 102800)
else:
target_q = np.zeros((len(returns), self.action_size, self.action_size), np.float32)
for _ in range(epoch):
# 報酬値を初期化する
returns = self.returns[s][ind].copy()
noise = np.random.randn(*states.shape) * 0.1 # ノイズを追加する
target_a = self.model.predict(new_states + noise, 102800)
returns = self.target_q(returns, target_q, target_a)
h = self.model.fit(states + noise, returns, batch_size, validation_split=0.2)
self.train_loss.extend(h.history["loss"])
self.val_loss.extend(h.history["val_loss"])
# train_lossの要素数が200以上の時にのみ評価を実行する
if len(self.train_loss) >= 200:
pips, profits, _, _, _ = self.trade(s, self.test_step[0] - 11513, self.test_step[0], train=True)
self.train_rewards.append(np.sum(pips))
pips, profits, _, _, _ = self.trade(s, self.test_step[0], self.test_step[0] + 960 * 8, train=True)
self.test_rewards.append(np.sum(pips))
# 取引数を重視したパフォーマンス指標の作成
acc = np.mean(pips > 0)
len_pip = len(pips[pips > 0]) * np.clip(acc, 0, 0.75) * 2
total_win = np.sum(pips[pips > 0])
total_lose = np.sum(pips[pips < 0])
ev = \
(np.mean(pips[pips > 0]) * acc + np.mean(pips[pips < 0]) * (1 - acc)) / abs(np.mean(pips[pips < 0]))
ev = np.clip(ev, 0, 0.75) / 0.75
rr = np.clip(total_win / abs(total_lose), 0, 2.5) / 2.5
acc /= 0.7
self.max_profit /= self.account_size
self.max_pip = (rr + ev + acc) * len_pip
self.max_pip = 0 if np.isnan(self.max_pip) else self.max_pip
self.test_pip.append(self.max_pip)
self.test_profit.append(self.max_profit)
if self.max_pips <= self.max_pip:
self.best_w = self.model.get_weights()
self.max_profits = self.max_profit
self.max_profits = np.maximum(self.max_profit, self.max_profits)
self.max_pips = np.maximum(self.max_pip, self.max_pips)
plt.figure(figsize=(20, 5), dpi=100)
plt.subplot(1, 2, 1)
plt.plot(self.train_rewards)
plt.subplot(1, 2, 2)
plt.plot(self.test_rewards)
plt.show()
print(f"profits = {self.max_profit}, max profits = {self.max_profits}\n"
f"pips = {self.max_pip}, max pip = {self.max_pips}")
40回のトレーニングごとにターゲットウエイトをアップデートする。これを15回繰り返す。
def train(self, epoch1=40, epoch2=15, batch_size=2056, save=True, agent_name="dqn", risk=.04):
self.risk = risk
for _ in range(epoch2):
clear_output()
plt.figure(figsize=(10, 5))
plt.plot(self.train_loss)
plt.plot(self.val_loss)
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()
self._train(epoch1, self.s, batch_size)
self.target_model.set_weights(self.model.get_weights())