はじめに
このアルゴリズムの最大の強みは、ローソクチャートを観測する予測と指値注文板を観測する執行戦略を分離し、強化学習によって執行戦略を強化させたところです。これは、売買決定から注文メッセージの送信、注文決定の動きに開きがあるからです。
論文の本文は以下のリンクから読めます。
データセットは一般公開されているFI-2010データセットを使用します。また、予想モデルは以下を使用します。
実装を行なった全編は以下より見ることができます。
※環境の構築等自信がないので、修正点など是非ご指摘頂ければ幸いです。
強化学習アルゴリズム
Ape-Xアルゴリズム
Ape-Xは、代表的なoff-poicyであり、以下のモデルを加味したモデルです。
- Double Q-learning(Double DQN)
- 優先度付き経験再生(Priority Experience Reply DQN)
- Dueling Network(Dueling Network DQN)
- 並列分散処理(GORILA)
- Multi-Step learning
- 固定化されたε-greedy法
今回のアルゴリズム
- Double Q-learning(Double DQN)
- 優先度付き経験再生(Priority Experience Reply DQN)
- Dueling Network(Dueling Network DQN)
- 並列分散処理(GORILA)
Double Q-learning(Double DQN)
Q-Learningモデル
\tilde{Q}(S_t, a) = R_{t+1}+\gamma \max _{a} Q\left(S_{t+1}, a; \boldsymbol{\theta}_{t}\right)
Double-QLearningでは以下の二つの式を用いる。
学習過程
\tilde{Q}(S_t, a) = R_{t+1}+\gamma Q\left(S_{t+1}, \operatorname{argmax}_{a} Q\left(S_{t+1}, a ; \boldsymbol{\theta}_{t}\right) ; \boldsymbol{\theta}_{t}'\right)
行動選択過程
a = \operatorname{argmax}_{a} Q\left(S_{t+1}, (a + a')\right)
Double DQNは二つのネットワークを用いることで、Q-Learningモデルの過剰評価を軽減しています。
target_networkは一定時間のモデルの更新を経た後、更新します。
優先度付き経験再生(Priority Experience Reply DQN)
Experience Replayではすべての経験に対してランダムサンプリングを行うが、Prioritized Experience Replayでは学習する余地の大きくTD誤差が高い経験に対して集中的にサンプリングする。
δ_{t} = R_{t+1} + γ\:max_{a}Q_{target}(S_{t+1}, a')- Q(S_{t},a_{t})
上式で得られたδを下式を用いて、並べ替え、経験を抽出する。
P(i) = \frac{{p_{i}}^{\alpha}}{\Sigma_{k}{p_{k}}^{\alpha}}
Dueling Network(Dueling Network DQN)
Dueling Networkが他のモデルとは、大きく違うのは、学習の構造はDQNのままで、モデルを変化させることで、大きな成果を得た点である。
他のモデルのCNNや全結合層で構成されたQ-Networkでは、ノイズに影響されやすく、不安定。
そこで、一旦アドバンテージ(A)と状態価値関数(V)に分岐し、足し合わせることで、行動価値関数(Q)を求めることで、安定化を図った。
Q^{\pi}(s, a) = A^{\pi}(s, a) + V^{\pi}(s)
並列分散処理
論文上では、シングルエージェントシミュレーションで行われていますが、今回の実装では、リソースに制限があるため、並列分散処理を行いました。
環境
状態
- 現在ポジション株数/最大保有株数
- 既存買注文の反対気配との乖離率 (逆数)
- 既存売注文の反対気配との乖離率 (逆数)
- Bid-Ask スプレッド
- 短期(5 秒)リターン予測値
- 長期(60 秒)リターン予測値
5,6に関しては、先述の通り別の予測モデルの出力を用います。
行動
- 何もしない
- 最良気配への新規買注文送出/既存買注文訂正
- 最良気配への新規売注文送出/既存売注文訂正
- 反対気配への新規買注文送出/既存買注文訂正
- 反対気配への新規売注文送出/既存売注文訂正
- 既存買注文の取消
- 既存売注文の取消
報酬
r(t) = ∆PnL(t + 1) − α(∆Pos(t + 1))^2 − βFo(t)\\
∆PnL(t + 1) : (累積) 損益額の変化幅(円)\\
∆Pos(t + 1) : ポジション金額の変化幅 (円)\\
Fo(t) : 1 (時刻 t でキャンセルが発生した場合に追加を行う。)\\
: 0 (その他)
実装
class Environment:
def __init__(self, df, initial_money=100000):
self.df = df
self.df_total_steps = len(self.df)-1
self.initial_money = initial_money
self.cash_in_hand = None
# hold, good_buy, good_sell, bad_buy, bad_sell, buy_cancel, sell_cancel
self.action_space = np.array([0, 1, 2, 3, 4, 5, 6])
self.now_step = None
self.alpha = 0.001
self.beta = 0.9
self.minimum_number_of_shares = 10000 # 最低取得株数
self.is_cancel = False
self.profit_and_loss = None
self.reserve_price = None
self.reserve_type = None
self.have_a_position = None
self.have_a_position_type = None
self.price_old = None
self.Money_old = None
self.reset()
def reset(self):
self.now_step = 0
self.now_price = (self.df['PRICE_ASK_0'][self.now_step] + self.df['PRICE_BID_0'][self.now_step]) / 2
self.cash_in_hand = self.initial_money / self.minimum_number_of_shares
self.hold_a_position = 0
self.end_step = self.df_total_steps
self.is_cancel = False
self.profit_and_loss = []
self.reserve_price = 0
self.reserve_type = 'none'
self.have_a_position = 0
self.have_a_position_type = None
self.price_old = 0
self.Money_old = 0
return self._get_now_state()
def step(self, action):
'''
状態の内4まで出力する。
1. 現在ポジション株数/最大保有株数
2. 既存買注文の反対気配との乖離率 (逆数)
3. 既存売注文の反対気配との乖離率 (逆数)
4. Bid-Ask スプレッド
5. 短期(5 秒)リターン予測値
6. 長期(60 秒)リターン予測値
最大保有株数は 100 万円相当の株数(最小取引単位の
100 株に満たない場合は 100 株)で,
ショートポジションも取り得るものとしている.
取引エージェントの行動タイミングは各銘柄のティッ ク更新時に,
前回行動から 1 秒以上経過している場合に
発注判断を行うものとする (各発注判断を 1 ステップと する).
ある時点において市場に出すことのできる注 文は買い,
売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
各時点で選ぶことのできない行動は選択肢から排除している.
報酬
r(t) = ∆PnL(t + 1) − α(∆Pos(t + 1))^2 − βFo(t)
∆PnL(t + 1) : (累積) 損益額の変化幅(円)
∆Pos(t + 1) : ポジション金額の変化幅 (円)
Fo(t) : 1 (時刻 t でキャンセルが発生した場合に追加を行う。)
: 0 (その他)
損益額のみを報酬とする場合, ポジションが必要以上
に増加する傾向にあり [3], また不必要な注文の送信,
取消を繰り返してしまう. そこで過度なポジショニングと
注文取消を抑制するために調整項を加えている.
'''
before_profit_and_loss = copy.deepcopy(self._get_revenue())
before_hold_a_position = copy.deepcopy(self.hold_a_position)
before_now_price = copy.deepcopy(self._get_now_price())
self.now_step += 1
self.now_price = self._get_now_price()
self._judge_the_execution() # 約定しているかを調べる。
self._order(action) # 注文を出す。
state = self._get_now_state()
reward = self._get_reward(before_profit_and_loss, before_hold_a_position, before_now_price)
done = (self.end_step == self.now_step)
info = {'now_step': self.now_step}
return state, reward, done, info
def _get_now_state(self):
state = np.empty(6)
state[0] = self.hold_a_position
state[1] = self.df['PRICE_ASK_0'][self.now_step] / self.df['PRICE_ASK_9'][self.now_step]
state[2] = self.df['PRICE_BID_0'][self.now_step] / self.df['PRICE_BID_9'][self.now_step]
state[3] = self.df['PRICE_ASK_0'][self.now_step] - self.df['PRICE_BID_0'][self.now_step]
state[4] = self.df['SHORT_PREDICT'][self.now_step]
state[5] = self.df['LONG_PREDICT'][self.now_step]
return state
def _get_reward(self, before_profit_and_loss, before_hold_a_position, before_now_price):
pnl = self._get_revenue() - before_profit_and_loss
# pnl = self._get_revenue()
pos = self.hold_a_position * self.now_price - before_hold_a_position * before_now_price
f0 = 1 if self.is_cancel else 0
reward = pnl - self.alpha * (pos ** 2) - self.beta * f0
return reward
def _order(self, action):
'''
取引エージェントの行動タイミングは各銘柄のティック更新時に,
前回行動から 1 秒以上経過している場合に
発注判断を行うものとする (各発注判断を 1 ステップとする).
ある時点において市場に出すことのできる注文は買い,
売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
各時点で選ぶことのできない行動は選択肢から排除している.
'''
self.is_cancel = False
if self.action_space[0] == action: # 何もしない
pass
elif self.action_space[1] == action: # 最良気配への新規買注文送出/既存買注文訂正
price = self.df['PRICE_ASK_0'][self.now_step]
order_type = 'buy'
elif self.action_space[2] == action: # 最良気配への新規売注文送出/既存売注文訂正
price = self.df['PRICE_BID_0'][self.now_step]
order_type = 'sell'
elif self.action_space[3] == action: # 反対気配への新規買注文送出/既存買注文訂正
price = self.df['PRICE_ASK_9'][self.now_step]
order_type = 'buy'
elif self.action_space[4] == action: # 反対気配への新規売注文送出/既存売注文訂正
price = self.df['PRICE_BID_9'][self.now_step]
order_type = 'sell'
elif self.action_space[5] == action: # 既存買注文の取消
price = 0
order_type = 'buy'
self.is_cancel = True
else: # 既存売注文の取消
price = 0
order_type = 'sell'
self.is_cancel = True
if self.action_space[0] != action:
self._reservation_change(price, order_type)
def _reservation_change(self, price, order_type):
if not self.is_cancel:
self.reserve_price = price
self.reserve_type = order_type
else:
self.reserve_price = 0
self.reserve_type = 'none'
def _judge_the_execution(self):
'''
約定しているかを調べる。
取引エージェントの行動タイミングは各銘柄のティック更新時に,
前回行動から 1 秒以上経過している場合に
発注判断を行うものとする (各発注判断を 1 ステップとする).
ある時点において市場に出すことのできる注文は買い,
売りのそれぞれに対して1注文のみとし,最大ポジションを超える新規注文は行わない.
各時点で選ぶことのできない行動は選択肢から排除している.
self.minimum_number_of_shares = 10000 # 最低取得株数
'''
if self.have_a_position_type == 'buy':
'''買いのポジションを持っている場合'''
if self.reserve_price <= self.df['PRICE_ASK_0'][self.now_step] and self.reserve_type == 'sell':
self.cash_in_hand += self._bear_position_calc()
self._reserve_and_position_reset(True)
self.price_old = 0
self.Money_old = 0
elif self.have_a_position_type == 'sell':
'''売りのポジションを持っている場合'''
if self.reserve_price >= self.df['PRICE_BID_0'][self.now_step] and self.reserve_type == 'buy':
self.cash_in_hand += self.reserve_price * self.hold_a_position
self._reserve_and_position_reset(True)
else:
'''ポジションを持っていない場合'''
if self.reserve_type == 'buy':
if self.reserve_price >= self.df['PRICE_BID_0'][self.now_step] and self.cash_in_hand >= 0:
buy_flag = True
while buy_flag:
if self.cash_in_hand > self.reserve_price:
self.hold_a_position += 1
self.cash_in_hand -= self.now_price
else:
buy_flag = False
self.have_a_position_type = 'buy'
self._reserve_and_position_reset()
if self.reserve_type == 'sell':
if self.reserve_price <= self.df['PRICE_ASK_0'][self.now_step] and self.cash_in_hand >= 0:
sell_flag = True
self.price_old = self.reserve_price
self.Money_old = self.cash_in_hand
while sell_flag:
if self.cash_in_hand > self.reserve_price:
self.hold_a_position += 1
self.cash_in_hand -= self.now_price
else:
sell_flag = False
self.have_a_position_type = 'sell'
self._reserve_and_position_reset()
def _reserve_and_position_reset(self, position_reset = False):
self.reserve_price = 0
self.reserve_type == 'none'
if position_reset:
self.hold_a_position = 0
self.have_a_position_type = 'none'
def _get_revenue(self):
if self.have_a_position_type == 'sell':
return self._bear_position_calc() + self.cash_in_hand
else:
return self.hold_a_position * self.now_price + self.cash_in_hand
def _bear_position_calc(self):
return ((self.price_old - self.now_price) * self.hold_a_position
+ self.Money_old)
def _get_now_price(self):
return (self.df['PRICE_ASK_0'][self.now_step] + self.df['PRICE_BID_0'][self.now_step]) / 2
ニューラルネットワーク
今回は、TensorFlow2.xのSubclassing APIを採用しました。
class NeuralNetwork(tf.keras.Model):
def __init__(self, *args, **kwargs):
super(NeuralNetwork, self).__init__(*args, **kwargs)
self.action_space = 7
self.input_layer = kl.Dense(20, activation="tanh", kernel_initializer="he_normal")
self.dense1 = kl.Dense(10, activation="tanh", kernel_initializer="he_normal")
self.value = kl.Dense(1, kernel_initializer="he_normal")
self.dense2 = kl.Dense(10, activation="tanh", kernel_initializer="he_normal")
self.advantages = kl.Dense(self.action_space, kernel_initializer="he_normal")
self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
@tf.function
def call(self, x, training=None):
x = self.input_layer(x)
x1 = self.dense1(x)
value = self.value(x1)
x2 = self.dense2(x)
advantages = self.advantages(x2)
q = value + advantages - tf.reduce_mean(advantages, axis=1, keepdims=True)
return q
def save_model(self, name):
self.save_weights(name)
def load_model(self, name):
self.load_weights(name)
def update_model(self, value):
self.set_weights(value)
def get_model(self):
return self.get_weights()