3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DQNを使用したfx取引のコードの説明

Last updated at Posted at 2022-02-24

本記事について

上記のリポジトリのDQNエージェントのコードの一部を解説します。

forex-tradingのDQNエージェントは以下に沿って行動します。

1 データの読み込み

事前に保存したデータを読み込みデータの9割をトレインデータに1割をテストデータにします。

self.x = np.load("data/x.npy")

y = np.load("data/target.npy")
self.low = y[:, :, 2].reshape((self.x.shape[0], -1))
self.high = y[:, :, 1].reshape((self.x.shape[0], -1))
self.y = y[:, :, 0].reshape((self.x.shape[0], -1))

self.atr = np.load("data/atr.npy").reshape((self.x.shape[0], -1)).astype(np.int32)

self.train_step = np.arange(0, int(self.x.shape[1] * 0.9))
self.test_step = np.arange(self.train_step[-1], int(self.x.shape[1]))

2 Modelの作成

TPUを使う場合ストラテジーを作成し、GPUを使う場合mixed_precisionを使い、CPUの場合何もしない。

if self.use_device == "tpu":
    try:
        resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
        tf.config.experimental_connect_to_cluster(resolver)
        # This is the TPU initialization code that has to be at the beginning.
        tf.tpu.experimental.initialize_tpu_system(resolver)
        self.strategy = tf.distribute.TPUStrategy(resolver)
    except:
        self.use_device = "cpu"
elif self.use_device == "gpu":
    from tensorflow.keras.mixed_precision import experimental as mixed_precision

    policy = mixed_precision.Policy('mixed_float16')
    mixed_precision.set_policy(policy)

フーバー損失関数を作成する

def loss_function(self):
    def loss(q_backup, q):
        k = 2

        error = q_backup - q
        loss = tf.where(tf.abs(error) <= k, error ** 2 * 0.5, 0.5 * k ** 2 + k * (tf.abs(error) - k))
        loss = tf.reduce_mean(loss)

        return loss

    return loss

モデルを作成し、要約を表示します。

def _build_model(self, lr):
    loss = self.loss_function()

    if self.dueling:
        dqn_network.dueling = True
    model = dqn_network.network.build_model(self.model_name, self.x.shape[-2:], self.action_size)

    model.compile(
        tf.keras.optimizers.Adam(lr, clipnorm=1.), loss=loss, steps_per_execution=100
    )

    return model


def build_model(self, lr=1e-4):
    if self.use_device == "tpu":
        with self.strategy.scope():
            self.model = self._build_model(lr)
            self.target_model = tf.keras.models.clone_model(self.model)
            self.target_model.set_weights(self.model.get_weights())
    else:
        self.model = self._build_model(lr)
        self.target_model = tf.keras.models.clone_model(self.model)
        self.target_model.set_weights(self.model.get_weights())

    self.model.summary()

3 トレーニングデータを生成する

このコードでは探査を不要にするために全てのアクションに対応する報酬を割り振ります。そして、pip_scaleを1以上にすることでオーバーフィットの度合いを制限することができます。

def train_data(self):
    states, returns, new_states, old_actions = [], [], [], []
    h, h_ = 0, self.train_step[-1]
    n = self.n
    s = self.s
    for s in range(self.x.shape[0]):
        df = self.x[s, h:h_ - n].copy()
        trend = self.y[s, h:h_]

        buy = np.array([trend[i + n] - trend[i] for i in range(len(trend) - n)]).reshape((-1,))
        scale = np.quantile(abs(buy), 0.99) / 1
        buy = np.clip(buy / scale, -1, 1)
        spread = np.quantile(self.atr[s], 0.25) / scale
        spread = np.clip(spread, 0.02, None) * self.pip_scale
        spread = np.round(spread, 2)

        buy *= self.pip_scale
        sell = -buy

        pip = np.zeros((len(trend) - n, self.action_size, self.action_size))

        b = 0 if self.action_type == 2 else 1
        s = 0 if self.action_type == 1 else 1

        pip[:, 0, 0] = buy * b
        pip[:, 0, 1] = (sell - spread) * s
        pip[:, 1, 0] = (buy - spread) * b
        pip[:, 1, 1] = sell * s
        if self.action_size == 3:
            pip[:, 2, 0] = buy - spread
            pip[:, 2, 1] = sell - spread

        states.append(df[:-self.n])
        returns.append(pip[:-self.n])
        new_states.append(np.roll(df, -self.n, axis=0)[:-self.n])

    concat = np.concatenate
    self.states, self.returns, self.new_states = \
        np.array(states), np.array(returns), np.array(new_states)
    self.returns = np.round(self.returns, 2).astype(np.float32)

3 トレーニングの実行

ターゲットQ値の計算をし、配列にnanが含まれていないか確認します。

def target_q(self, returns, target_q, target_a):
    if self.train_loss:
        target_a = np.argmax(target_a, -1)
        rr = range(len(returns))
        returns[:, 0, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]]
        returns[:, 0, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]]
        returns[:, 1, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]]
        returns[:, 1, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]]
        if self.action_size == 3:
            returns[:, 0, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]]
            returns[:, 1, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]]
            returns[:, 2, 0] += self.gamma * target_q[rr, 0, target_a[rr, 0]]
            returns[:, 2, 1] += self.gamma * target_q[rr, 1, target_a[rr, 1]]
            returns[:, 2, 2] += self.gamma * target_q[rr, 2, target_a[rr, 2]]

    assert np.mean(np.isnan(returns) == False) == 1

    return returns
def _train(self, epoch=100, s=0, batch_size=2056):
    assert isinstance(s, int)
    
    ind = self.ind # ランダムなインデックス
    
    # データをシャッフルする。
    states, new_states, returns = \
        self.states[s][ind].copy(), self.new_states[s][ind].copy(), self.returns[s][ind].copy()
    
    # 最初のトレーニングではgammaをゼロにする。
    if self.train_loss:
        target_q = self.target_model.predict(new_states, 102800)
    else:
        target_q = np.zeros((len(returns), self.action_size, self.action_size), np.float32)

    for _ in range(epoch):
        # 報酬値を初期化する
        returns = self.returns[s][ind].copy()
        noise = np.random.randn(*states.shape) * 0.1 # ノイズを追加する

        target_a = self.model.predict(new_states + noise, 102800)
        returns = self.target_q(returns, target_q, target_a)

        h = self.model.fit(states + noise, returns, batch_size, validation_split=0.2)
        self.train_loss.extend(h.history["loss"])
        self.val_loss.extend(h.history["val_loss"])
        
        # train_lossの要素数が200以上の時にのみ評価を実行する
        if len(self.train_loss) >= 200:
        
            pips, profits, _, _, _ = self.trade(s, self.test_step[0] - 11513, self.test_step[0], train=True)
            self.train_rewards.append(np.sum(pips))
            pips, profits, _, _, _ = self.trade(s, self.test_step[0], self.test_step[0] + 960 * 8, train=True)
            self.test_rewards.append(np.sum(pips))
            
            # 取引数を重視したパフォーマンス指標の作成
            acc = np.mean(pips > 0)
            len_pip = len(pips[pips > 0]) * np.clip(acc, 0, 0.75) * 2

            total_win = np.sum(pips[pips > 0])
            total_lose = np.sum(pips[pips < 0])
            ev = \
                (np.mean(pips[pips > 0]) * acc + np.mean(pips[pips < 0]) * (1 - acc)) / abs(np.mean(pips[pips < 0]))
            ev = np.clip(ev, 0, 0.75) / 0.75
            rr = np.clip(total_win / abs(total_lose), 0, 2.5) / 2.5
            acc /= 0.7

            self.max_profit /= self.account_size
            self.max_pip = (rr + ev + acc) * len_pip
            self.max_pip = 0 if np.isnan(self.max_pip) else self.max_pip

            self.test_pip.append(self.max_pip)
            self.test_profit.append(self.max_profit)

            if self.max_pips <= self.max_pip:
                self.best_w = self.model.get_weights()
                self.max_profits = self.max_profit

            self.max_profits = np.maximum(self.max_profit, self.max_profits)
            self.max_pips = np.maximum(self.max_pip, self.max_pips)

            plt.figure(figsize=(20, 5), dpi=100)
            plt.subplot(1, 2, 1)
            plt.plot(self.train_rewards)
            plt.subplot(1, 2, 2)
            plt.plot(self.test_rewards)
            plt.show()

            print(f"profits = {self.max_profit}, max profits = {self.max_profits}\n"
                  f"pips = {self.max_pip}, max pip = {self.max_pips}")

40回のトレーニングごとにターゲットウエイトをアップデートする。これを15回繰り返す。

def train(self, epoch1=40, epoch2=15, batch_size=2056, save=True, agent_name="dqn", risk=.04):
    self.risk = risk
    for _ in range(epoch2):
        clear_output()
        plt.figure(figsize=(10, 5))
        plt.plot(self.train_loss)
        plt.plot(self.val_loss)
        plt.title('Model loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper left')
        plt.show()
        self._train(epoch1, self.s, batch_size)
        self.target_model.set_weights(self.model.get_weights())
3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?