58
72

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[論文解説] 強化学習による高頻度取引戦略の構築

Last updated at Posted at 2022-09-08

はじめに

このアルゴリズムの最大の強みは、ローソクチャートを観測する予測と指値注文板を観測する執行戦略を分離し、強化学習によって執行戦略を強化させたところです。これは、売買決定から注文メッセージの送信、注文決定の動きに開きがあるからです。

論文の本文は以下のリンクから読めます。

データセットは一般公開されているFI-2010データセットを使用します。また、予想モデルは以下を使用します。

実装を行なった全編は以下より見ることができます。

※環境の構築等自信がないので、修正点など是非ご指摘頂ければ幸いです。

強化学習アルゴリズム

Ape-Xアルゴリズム

Ape-Xは、代表的なoff-poicyであり、以下のモデルを加味したモデルです。

  • Double Q-learning(Double DQN)
  • 優先度付き経験再生(Priority Experience Reply DQN)
  • Dueling Network(Dueling Network DQN)
  • 並列分散処理(GORILA)
  • Multi-Step learning
  • 固定化されたε-greedy法

今回のアルゴリズム

  • Double Q-learning(Double DQN)
  • 優先度付き経験再生(Priority Experience Reply DQN)
  • Dueling Network(Dueling Network DQN)
  • 並列分散処理(GORILA)

apex_image.png

Double Q-learning(Double DQN)

Q-Learningモデル

\tilde{Q}(S_t, a) = R_{t+1}+\gamma \max _{a} Q\left(S_{t+1}, a; \boldsymbol{\theta}_{t}\right)

Double-QLearningでは以下の二つの式を用いる。

学習過程

\tilde{Q}(S_t, a) = R_{t+1}+\gamma Q\left(S_{t+1}, \operatorname{argmax}_{a} Q\left(S_{t+1}, a ; \boldsymbol{\theta}_{t}\right) ; \boldsymbol{\theta}_{t}'\right)

行動選択過程

 a = \operatorname{argmax}_{a}  Q\left(S_{t+1}, (a + a')\right)

Double DQNは二つのネットワークを用いることで、Q-Learningモデルの過剰評価を軽減しています。
target_networkは一定時間のモデルの更新を経た後、更新します。

優先度付き経験再生(Priority Experience Reply DQN)

Experience Replayではすべての経験に対してランダムサンプリングを行うが、Prioritized Experience Replayでは学習する余地の大きくTD誤差が高い経験に対して集中的にサンプリングする。

δ_{t} = R_{t+1} + γ\:max_{a}Q_{target}(S_{t+1}, a')- Q(S_{t},a_{t})

上式で得られたδを下式を用いて、並べ替え、経験を抽出する。

P(i) = \frac{{p_{i}}^{\alpha}}{\Sigma_{k}{p_{k}}^{\alpha}}

Dueling Network(Dueling Network DQN)

Dueling Networkが他のモデルとは、大きく違うのは、学習の構造はDQNのままで、モデルを変化させることで、大きな成果を得た点である。
他のモデルのCNNや全結合層で構成されたQ-Networkでは、ノイズに影響されやすく、不安定。
そこで、一旦アドバンテージ(A)と状態価値関数(V)に分岐し、足し合わせることで、行動価値関数(Q)を求めることで、安定化を図った。

Q^{\pi}(s, a) = A^{\pi}(s, a) + V^{\pi}(s)

DuelingNetwork_image.png

並列分散処理

論文上では、シングルエージェントシミュレーションで行われていますが、今回の実装では、リソースに制限があるため、並列分散処理を行いました。

環境

状態

  1. 現在ポジション株数/最大保有株数
  2. 既存買注文の反対気配との乖離率 (逆数)
  3. 既存売注文の反対気配との乖離率 (逆数)
  4. Bid-Ask スプレッド
  5. 短期(5 秒)リターン予測値
  6. 長期(60 秒)リターン予測値

5,6に関しては、先述の通り別の予測モデルの出力を用います。

行動

  1. 何もしない
  2. 最良気配への新規買注文送出/既存買注文訂正
  3. 最良気配への新規売注文送出/既存売注文訂正
  4. 反対気配への新規買注文送出/既存買注文訂正
  5. 反対気配への新規売注文送出/既存売注文訂正
  6. 既存買注文の取消
  7. 既存売注文の取消

報酬

r(t) = ∆PnL(t + 1) − α(∆Pos(t + 1))^2 − βFo(t)\\ 
∆PnL(t + 1) : (累積) 損益額の変化幅(円)\\ 
∆Pos(t + 1) : ポジション金額の変化幅 (円)\\ 
Fo(t) : 1 (時刻 t でキャンセルが発生した場合に追加を行う。)\\ 
: 0 (その他)

実装

class Environment:
    def __init__(self, df, initial_money=100000):

        self.df = df
        self.df_total_steps  = len(self.df)-1
        self.initial_money   = initial_money
        self.cash_in_hand    = None
        # hold, good_buy, good_sell, bad_buy, bad_sell, buy_cancel, sell_cancel
        self.action_space    = np.array([0, 1, 2, 3, 4, 5, 6])
        self.now_step  = None
        self.alpha = 0.001
        self.beta = 0.9
        self.minimum_number_of_shares = 10000 # 最低取得株数

        self.is_cancel = False
        self.profit_and_loss = None
        self.reserve_price = None
        self.reserve_type = None
        self.have_a_position = None
        self.have_a_position_type = None
        self.price_old = None
        self.Money_old = None

        self.reset()

    def reset(self):
        self.now_step  = 0
        self.now_price = (self.df['PRICE_ASK_0'][self.now_step] + self.df['PRICE_BID_0'][self.now_step]) / 2
        self.cash_in_hand = self.initial_money / self.minimum_number_of_shares
        self.hold_a_position = 0
        self.end_step        = self.df_total_steps

        self.is_cancel = False
        self.profit_and_loss = []
        self.reserve_price = 0
        self.reserve_type = 'none'
        self.have_a_position = 0
        self.have_a_position_type = None

        self.price_old = 0
        self.Money_old = 0

        return self._get_now_state()

    def step(self, action):
        '''
        状態の内4まで出力する。
        1. 現在ポジション株数/最大保有株数
        2. 既存買注文の反対気配との乖離率 (逆数)
        3. 既存売注文の反対気配との乖離率 (逆数)
        4. Bid-Ask スプレッド
        5. 短期(5 秒)リターン予測値
        6. 長期(60 秒)リターン予測値

        最大保有株数は 100 万円相当の株数(最小取引単位の
        100 株に満たない場合は 100 株)で, 
        ショートポジションも取り得るものとしている.

        取引エージェントの行動タイミングは各銘柄のティッ ク更新時に, 
        前回行動から 1 秒以上経過している場合に
        発注判断を行うものとする (各発注判断を 1 ステップと する).
        ある時点において市場に出すことのできる注 文は買い,
        売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
        各時点で選ぶことのできない行動は選択肢から排除している.

        報酬
        r(t) = ∆PnL(t + 1) − α(∆Pos(t + 1))^2 − βFo(t)
        ∆PnL(t + 1) : (累積) 損益額の変化幅(円)
        ∆Pos(t + 1) : ポジション金額の変化幅 (円)
        Fo(t) : 1 (時刻 t でキャンセルが発生した場合に追加を行う。)
              : 0 (その他)
        損益額のみを報酬とする場合, ポジションが必要以上
        に増加する傾向にあり [3], また不必要な注文の送信, 
        取消を繰り返してしまう. そこで過度なポジショニングと
        注文取消を抑制するために調整項を加えている.
        '''
        before_profit_and_loss = copy.deepcopy(self._get_revenue())
        before_hold_a_position = copy.deepcopy(self.hold_a_position)
        before_now_price = copy.deepcopy(self._get_now_price())

        self.now_step += 1
        self.now_price = self._get_now_price()

        self._judge_the_execution() # 約定しているかを調べる。
        self._order(action) # 注文を出す。


        state = self._get_now_state()
        reward = self._get_reward(before_profit_and_loss, before_hold_a_position, before_now_price)
        done = (self.end_step == self.now_step)
        info = {'now_step': self.now_step}

        return state, reward, done, info

    def _get_now_state(self):
        state = np.empty(6)
        state[0] = self.hold_a_position
        state[1] = self.df['PRICE_ASK_0'][self.now_step] / self.df['PRICE_ASK_9'][self.now_step]
        state[2] = self.df['PRICE_BID_0'][self.now_step] / self.df['PRICE_BID_9'][self.now_step]
        state[3] = self.df['PRICE_ASK_0'][self.now_step] - self.df['PRICE_BID_0'][self.now_step]
        state[4] = self.df['SHORT_PREDICT'][self.now_step]
        state[5] = self.df['LONG_PREDICT'][self.now_step]
        return state

    def _get_reward(self, before_profit_and_loss, before_hold_a_position, before_now_price):
        pnl = self._get_revenue() - before_profit_and_loss
        # pnl = self._get_revenue()
        pos = self.hold_a_position * self.now_price - before_hold_a_position * before_now_price
        f0  = 1 if self.is_cancel else 0

        reward = pnl - self.alpha * (pos ** 2) - self.beta * f0
        return reward

    def _order(self, action):
        '''
        取引エージェントの行動タイミングは各銘柄のティック更新時に, 
        前回行動から 1 秒以上経過している場合に
        発注判断を行うものとする (各発注判断を 1 ステップとする).
        ある時点において市場に出すことのできる注文は買い,
        売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
        各時点で選ぶことのできない行動は選択肢から排除している.
        '''
        self.is_cancel = False

        if self.action_space[0] == action: # 何もしない
            pass
        elif self.action_space[1] == action: # 最良気配への新規買注文送出/既存買注文訂正
            price = self.df['PRICE_ASK_0'][self.now_step]
            order_type = 'buy'
        elif self.action_space[2] == action: # 最良気配への新規売注文送出/既存売注文訂正
            price = self.df['PRICE_BID_0'][self.now_step]
            order_type = 'sell'
        elif self.action_space[3] == action: # 反対気配への新規買注文送出/既存買注文訂正
            price = self.df['PRICE_ASK_9'][self.now_step]
            order_type = 'buy'
        elif self.action_space[4] == action: # 反対気配への新規売注文送出/既存売注文訂正
            price = self.df['PRICE_BID_9'][self.now_step]
            order_type = 'sell'
        elif self.action_space[5] == action: # 既存買注文の取消
            price = 0
            order_type = 'buy'
            self.is_cancel = True
        else: # 既存売注文の取消
            price = 0
            order_type = 'sell'
            self.is_cancel = True

        if self.action_space[0] != action:
            self._reservation_change(price, order_type)

    def _reservation_change(self, price, order_type):
        if not self.is_cancel:
            self.reserve_price = price
            self.reserve_type = order_type    
        else:
            self.reserve_price = 0
            self.reserve_type = 'none'

    def _judge_the_execution(self):
        '''
        約定しているかを調べる。

        取引エージェントの行動タイミングは各銘柄のティック更新時に, 
        前回行動から 1 秒以上経過している場合に
        発注判断を行うものとする (各発注判断を 1 ステップとする).
        ある時点において市場に出すことのできる注文は買い,
        売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
        各時点で選ぶことのできない行動は選択肢から排除している.

        self.minimum_number_of_shares = 10000 # 最低取得株数
        '''

        if self.have_a_position_type == 'buy':
            '''買いのポジションを持っている場合'''
            if self.reserve_price <= self.df['PRICE_ASK_0'][self.now_step] and self.reserve_type == 'sell':
                self.cash_in_hand += self._bear_position_calc()
                self._reserve_and_position_reset(True)
                self.price_old = 0
                self.Money_old = 0
        elif self.have_a_position_type == 'sell':
            '''売りのポジションを持っている場合'''
            if self.reserve_price >= self.df['PRICE_BID_0'][self.now_step] and self.reserve_type == 'buy':
                self.cash_in_hand += self.reserve_price * self.hold_a_position
                self._reserve_and_position_reset(True)
        else:
            '''ポジションを持っていない場合'''
            if self.reserve_type == 'buy':
                if self.reserve_price >= self.df['PRICE_BID_0'][self.now_step] and self.cash_in_hand >= 0:
                    buy_flag = True
                    while buy_flag:
                        if self.cash_in_hand > self.reserve_price:
                            self.hold_a_position += 1
                            self.cash_in_hand -= self.now_price
                        else:
                            buy_flag = False
                    self.have_a_position_type = 'buy'
                    self._reserve_and_position_reset()
            if self.reserve_type == 'sell':
                if self.reserve_price <= self.df['PRICE_ASK_0'][self.now_step] and self.cash_in_hand >= 0:
                    sell_flag = True
                    self.price_old = self.reserve_price
                    self.Money_old = self.cash_in_hand
                    while sell_flag:
                        if self.cash_in_hand > self.reserve_price:
                            self.hold_a_position += 1
                            self.cash_in_hand -= self.now_price
                        else:
                            sell_flag = False
                    self.have_a_position_type = 'sell'
                    self._reserve_and_position_reset()

    def _reserve_and_position_reset(self, position_reset = False):
            self.reserve_price = 0
            self.reserve_type == 'none'
            if position_reset:
                self.hold_a_position = 0
                self.have_a_position_type = 'none'

    def _get_revenue(self):
        if self.have_a_position_type == 'sell':
            return self._bear_position_calc() + self.cash_in_hand
        else:
            return self.hold_a_position * self.now_price + self.cash_in_hand

    def _bear_position_calc(self):
        return ((self.price_old - self.now_price) * self.hold_a_position
                + self.Money_old)
        
    def _get_now_price(self):
        return (self.df['PRICE_ASK_0'][self.now_step] + self.df['PRICE_BID_0'][self.now_step]) / 2

ニューラルネットワーク

今回は、TensorFlow2.xのSubclassing APIを採用しました。

DuelingNetwork.png

class NeuralNetwork(tf.keras.Model):
    def __init__(self, *args, **kwargs):
        super(NeuralNetwork, self).__init__(*args, **kwargs)

        self.action_space = 7

        self.input_layer = kl.Dense(20, activation="tanh", kernel_initializer="he_normal")
        self.dense1 = kl.Dense(10, activation="tanh", kernel_initializer="he_normal")
        self.value = kl.Dense(1, kernel_initializer="he_normal")
        self.dense2 = kl.Dense(10, activation="tanh", kernel_initializer="he_normal")
        self.advantages = kl.Dense(self.action_space, kernel_initializer="he_normal")

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

    @tf.function
    def call(self, x, training=None):
        x = self.input_layer(x)
        x1 = self.dense1(x)
        value = self.value(x1)
        x2 = self.dense2(x)
        advantages = self.advantages(x2)
        q = value + advantages - tf.reduce_mean(advantages, axis=1, keepdims=True)
        return q

    def save_model(self, name):
        self.save_weights(name)

    def load_model(self, name):
        self.load_weights(name)

    def update_model(self, value):
        self.set_weights(value)

    def get_model(self):
        return self.get_weights()
58
72
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
58
72

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?