1. なぜ実装したか
その一環として、以前より株式や為替市場の予測モデル関する論文を読んだり実装をしてきました。正確な予測モデルが実装できれば、EAなどに組み込むことでユーザの金銭的利益につながると考えます。スタンダードなモデルとしては LSTM などの時系列モデルや LightGBM 等の勾配ブースティング木などが使用されることが多いです。これらのモデルを利用した論文では、実際に高い精度を叩き出しているものもあります。
しかし、これらのStock Market、Forex系統の論文で以下のように異常に高い予測精度を示しているモデルには注意しなければなりません。
lag-model という現象が発生している危険性があるためです。
これは一般に lag-model (遅延モデル) と呼ばれ、予測値が特定ステップ数前のデータを強く参照してしまい、実測値の推移が予測値の推移を追いかけるようなモデルとなってしまう現象です。
他にもこの現象の特徴として、RMSE や 決定係数 等の評価指標が異常に高い傾向があると確認されています。
2. STDCとは
STDCとは Single Threshold Directional Change の略称です。この手法を用いると、株式や為替の時系列データを上昇・下降トレンドに区分することが出来ます。
STDCでは DC(Directional Change)イベント と OS(OverShoot)イベント の2つに区分されます。
ユーザが任意に設定した Threshold(閾値) を基に高値と底値から一定の変動があった場合トレンドが転換していると定義しています。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
class DCEventDetector:
def __init__(self, dataframe, delta):
self.eventsDC = []
self.eventsOS = []
self.delta = delta
self.indexDC = [0, 0]
self.indexOS = [0, 0]
self.dataframe = dataframe
def generateDC(self):
countDC = 0
countOS = 0
delta = self.delta
df = self.dataframe
for index in range(len(df)):
price = df.iloc[index]
if (index == 0): #初期設定
type = "Upturn"
high = df.iloc[index]
low = df.iloc[index]
isExistOS = False
if (type == "Upturn"):
if (price <= (high * (1 - delta))): #上昇トレンドから下降トレンドに転換
if (self.indexOS[0] >= self.indexOS[1]):
isExistOS = False
if (isExistOS is False):
if (countDC == 0):
countDC += 1
self.eventsDC.append((0, df.iloc[0], index, price, "DC Upturn", countDC))
#print(f"Upturn DCC with not OS at 0, DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {index}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
countDC += 1
self.eventsDC.append((self.indexDC[0], df.iloc[self.indexDC[0]], index, price, "DC Upturn", countDC))
self.indexDC[0] = self.indexDC[1] + 1
#print(f"Upturn DCC with not OS, DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {index}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
countDC += 1
countOS = countDC - 1
self.eventsDC.append((self.indexDC[0], df.iloc[self.indexDC[0]], index, price, "DC Upturn", countDC))
self.eventsOS.append((self.indexOS[0], df.iloc[self.indexOS[0]], self.indexOS[1] , df.iloc[self.indexOS[1]] , "OS Upturn", countOS))
#print(f"Upturn DCC with OS , DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {index}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
self.indexDC[1] = index
self.indexOS[0] = index + 1
low = price
isExistOS = False
type = "Downturn"
#print(f"Change Index indexDC[0] {self.indexDC[0]}, indexDC[1] {self.indexDC[1]}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
elif (high < price):
high = price
self.indexDC[0] = index
self.indexOS[1] = index - 1
isExistOS = True
#print(f"Upturn OS is valid, DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {self.indexDC[1]}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
elif (type == "Downturn"):
if (price >= (low * (1 + delta))): #下降トレンドから上昇トレンドに転換
if (self.indexOS[0] >= self.indexOS[1]):
isExistOS = False
if (isExistOS is False):
#print(f"Downturn DCC with not OS, DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {index}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
countDC += 1
self.eventsDC.append((self.indexDC[0], df.iloc[self.indexDC[0]], index, price, "DC Downturn", countDC))
self.indexDC[0] = self.indexDC[1] + 1
countDC += 1
countOS = countDC -1
self.eventsDC.append((self.indexDC[0], df.iloc[self.indexDC[0]], index, price, "DC Downturn", countDC))
self.eventsOS.append((self.indexOS[0], df.iloc[self.indexOS[0]], self.indexOS[1] , df.iloc[self.indexOS[1]] , "OS Downturn", countOS))
#print(f"Downturn DCC with OS, DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {index}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
self.indexDC[1] = index
self.indexOS[0] = index + 1
high = price
isExistOS = False
type = "Upturn"
#print(f"Change Index indexDC[0] {self.indexDC[0]}, indexDC[1] {index}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
elif (low > price):
low = price
self.indexDC[0] = index
self.indexOS[1] = index - 1
isExistOS = True
#print(f"Downturn OS is valid, DC count {countDC}, indexDC[0] {self.indexDC[0]}, indexDC[1] {self.indexDC[1]}, indexOS[0] {self.indexOS[0]}, indexOS[1] {self.indexOS[1]}")
df_DC = pd.DataFrame(self.eventsDC, columns=["DC_Start_Index", "Price_DC_Start", "DC_End_Index", "Price_DC_End", "DC_Type", "Number"])
df_OS = pd.DataFrame(self.eventsOS, columns=["OS_Start_Index", "Price_OS_Start", "OS_End_Index", "Price_OS_End", "OS_Type", "Number"])
df_DC = df_DC.iloc[:-1]
df_OS = df_OS.iloc[1:]
return df_DC, df_OS
phyisical_time_df = pd.read_csv('./USDJPY_10 Mins_Ask_2003.05.04_2024.03.29.csv')
price_df = phyisical_time_df["Close"]
dc_generator = DCEventDetector(price_df, 0.1)
df_dc, df_os = dc_generator.generateDC()
def plot_events_with_lines(price_df, df_dc, df_os):
plt.figure(figsize=(30, 10))
plt.scatter(df_dc["DC_Start_Index"], df_dc["Price_DC_Start"], color="orange", label='DC Start')
plt.scatter(df_dc["DC_End_Index"], df_dc["Price_DC_End"], color="red", label='DC End')
plt.scatter(df_os["OS_Start_Index"], df_os["Price_OS_Start"], color="cyan", label='OS Start')
plt.scatter(df_os["OS_End_Index"], df_os["Price_OS_End"], color="blue", label='OS End')
plot_events_with_lines(price_df, df_dc, df_os)
OS ≃2 × DC
※全データから 第三四分位数+1.5×IQR
また、比率の分布は χ二乗分布(以下は自由度1.5、増分は0.1) で近似できると考えられるので、この法則を有用に使えば機械学習・深層学習を利用せず統計的に自動売買が行える可能性があります。
3. 遺伝的プログラミングとは
遺伝的プログラミングとは、簡単に言ってしまえば 変数間の関係式を逐次的に生成するアルゴリズムです。構造には主に木構造が用いられています。
名前にもある通り、生物の遺伝的性質 をアルゴリズムに応用しています。
キーワード | 意味 |
初期化 | 比較元となる関係式を生成する方法論。grow 法、full 法、half and half 法などがある。 |
適応度 | 生成した関係式を評価する関数。機械学習や深層学習における損失関数と同じ位置付け。MSE や RMSE、決定係数 を使用することが多い。 |
選択 | 生成した関係式から優秀なものを抽出する方法論。 ルーレット選択 や トーナメント選択 など様々な選択方法がある。 |
交叉 | 生成した関係式から新たな関係式を生成する方法。2つの関係式を示す木構造からランダムにサブツリーを選択し、それを新たな関係式とする |
突然変異 | 生成した関係式の構造をランダムに変更する方法論。関係式の進化の早期収束を防止できる。サブツリー突然変異 や ホイスト突然変異、ポイント突然変異 などがある。 |
ここでは、説明変数の分布から目的変数を表現する最適なモデルを構築する、深層学習でいうところの 回帰モデル を生成しているという認識で構いません。
- OSイベントの長さはDCイベントの長さの平均2倍になる という2値間の関係性が判明していたため、比較的簡易な関係式で表現できると考えられた。
- 為替取引に使用する以上、リアルタイム性が求められるため、LSTMや等の時系列モデルより高速である必要がある。
4. モデル構造
- 任意の閾値範囲から一つ閾値を選択し、各閾値ごとで訓練データからDC-OSデータセットを作成する。
- 遺伝的プログラミングを用いて、各DC-OSデータセットごとの最適な関係式を生成する。
- 各閾値ごとの最適な関係式を比較し、最も適合度が高い(閾値、DC-OSデータセット、関係式)を探索する。
- 3.で獲得したDC-OSデータセットを用いて、DCイベントがOSイベントを持つかどうかを判別する2値分類モデルを作成する。
- 1.~ 4.で得た(閾値、関係式、判別モデル)を利用して、まず評価データをDC-OSデータセットを作成する。
- 判別モデルでDCイベントがOSイベントを持つと判断したときのDCイベントの長さを関係式に入力しOSイベントの長さを予測、評価する。
5. 学習
5.1. データセットの作成
2. STDCとは の項目でご説明したコードですが、これは描画用のコードです。実際にデータセットとして使用するには適しておりません。
class Event:
def __init__(self, start, start_price, end, end_price, event_type, percentage_displacement=0):
self.start = start
self.start_price = start_price
self.end = end
self.end_price = end_price
self.event_type = event_type
self.overshoot = None
self.percentage_displacement = percentage_displacement
class Type:
Upturn = "Upturn"
Downturn = "Downturn"
UpwardOvershoot = "UpwardOvershoot"
DownwardOvershoot = "DownwardOvershoot"
def generate_events(price_df, delta):
events = []
event = Type.Upturn
last = Event(-1, 0, -1, 0, Type.Upturn)
p_high = 0
p_low = 0
index_dc = [-1, -1] # DC event indexes
index_os = [-1, -1] # OS event indexes
index = 1
for index, price in enumerate(price_df):
if index == 0:
p_high = price
p_low = price
index_dc = [index] * 2
index_os = [index] * 2
elif event == Type.Upturn:
if price <= (p_high * (1 - delta)):
last.overshoot = detect(price_df, Type.UpwardOvershoot, index_dc, index_os)
adjust(last.overshoot if last.overshoot else last, index_dc, index_os)
event = Type.Downturn
p_low = price
index_dc[1] = index
index_os[0] = index + 1
last = Event(index_dc[0], price_df.iloc[index_dc[0]], index_dc[1], price_df.iloc[index_dc[1]], Type.Downturn)
elif p_high < price:
p_high = price
index_dc[0] = index
index_os[1] = index - 1
if price >= (p_low * (1 + delta)):
last.overshoot = detect(price_df, Type.DownwardOvershoot, index_dc, index_os)
adjust(last.overshoot if last.overshoot else last, index_dc, index_os)
event = Type.Upturn
p_high = price
index_dc[1] = index
index_os[0] = index + 1
last = Event(index_dc[0], price_df.iloc[index_dc[0]], index_dc[1], price_df.iloc[index_dc[1]], Type.Upturn)
elif p_low > price:
p_low = price
index_dc[0] = index
index_os[1] = index - 1
return events
def detect(price_df, event_type, index_dc, index_os):
if index_os[0] < index_os[1] and index_os[0] < index_dc[0]:
return Event(index_os[0], price_df.iloc[index_os[0]], index_os[1], price_df.iloc[index_os[1]], event_type)
return None
def adjust(last, index_dc, index_os):
if index_dc[0] == last.start:
index_dc[0] = last.end + 1
elif index_dc[0] > (last.end + 1):
index_dc[0] = (last.end + 1)
def output_dataframe_dc_os(price_df, threshold):
events = generate_events(price_df, threshold)
data = []
for event in events:
add_data = {
"startIndexDC": int(event.start),
"startPriceDC": event.start_price,
"endIndexDC": int(event.end),
"endPriceDC": event.end_price,
"DC_type": event.event_type,
"startIndexOS": int(event.overshoot.start) if event.overshoot else None,
"startPriceOS": event.overshoot.start_price if event.overshoot else None,
"endIndexOS": int(event.overshoot.end) if event.overshoot else None,
"endPriceOS": event.overshoot.end_price if event.overshoot else None,
"OS_type": event.overshoot.event_type if event.overshoot else None
df_dc_os = pd.DataFrame(data)
df_dc_os = df_dc_os.iloc[1:]
return df_dc_os
def output_datafraome_classification(price_df, threshold):
df_dc_os = output_dataframe_dc_os(price_df, threshold)
df_classification = df_dc_os.copy()
df_classification["PreviousOS"] = df_classification["OS_type"].shift(1)
df_classification["PreviousOS"] = df_classification["PreviousOS"].apply(lambda x: 1 if x !=None else 0)
df_classification["TimeDifference"] = (df_classification["endIndexDC"] - df_classification["startIndexDC"]) * 10
df_classification["CurrentTmv"] = ((df_classification["endPriceDC"] - df_classification["startPriceDC"]) / df_classification["startPriceDC"]) / threshold
df_classification["PreviousDCPrice"] = df_classification["endPriceDC"].shift(1)
df_classification["Sigma"] = df_classification.apply(lambda x: (x["CurrentTmv"] * threshold) / x["TimeDifference"] if x["TimeDifference"] != 0 else -1, axis=1)
df_classification["FlashEvent"] = df_classification.apply(lambda x: 1 if (x["endIndexDC"] == x["startIndexDC"]) else 0, axis=1)
df_classification["IsOS"] = df_classification.apply(lambda x: 1 if x["OS_type"] != None else 0, axis=1)
df_classification = df_classification.drop(columns=["startIndexDC", "startPriceDC", "endIndexDC", "endPriceDC", "DC_type", "startIndexOS", "startPriceOS", "endIndexOS", "endPriceOS", "OS_type"])
df_classification = df_classification[1:]
return df_classification
phyisical_time_df = pd.read_csv('./USDJPY_10 Mins_Ask_2003.05.04_2024.03.29.csv')
price_df = phyisical_time_df["Close"]
threshold = 0.0025
df_classification = output_datafraome_classification(price_df, threshold)
変数名 | 意味 |
PreviousOS | DCイベントの前にOSイベントがあったかどうか。 |
TimeDifference | DCイベントの長さ。 |
CurrentTmv | (DCイベントの終了地点の価格-開始地点の価格)/閾値 |
PreviousDCPrice | 前のDCイベントの終了地点の価格 |
Sigma | CurrentTmv × 閾値 / TimeDifference |
FlashEvent | 価格の急変動後に発生したDCイベントがどうか。 |
IsOS | OSイベントを持つかどうか。目的変数。 |
5.2. 遺伝的プログラミングの学習
遺伝的プログラミングの実装について、gplearn というpythonライブラリを使用しました。以下のリンクから使用方法などを確認できますので、よろしければ参考にしてください。
pip install gplearn
from gplearn.genetic import SymbolicRegressor
def gpTuning(X, y, population_size, generations,
tournament_size, stopping_criteria, const_range, init_Depth, init_method,
function_set, metric, parsimony_coefficient, p_crossover, p_subtree_mutation,
p_hoist_mutation, p_point_mutation, p_point_replace, n_jobs, verbose, random_state):
model = SymbolicRegressor(population_size=population_size, generations=generations, stopping_criteria=stopping_criteria, const_range=const_range,
init_depth=init_Depth, init_method=init_method, function_set=function_set, metric=metric, parsimony_coefficient=parsimony_coefficient,
p_crossover=p_crossover, p_subtree_mutation=p_subtree_mutation, p_hoist_mutation=p_hoist_mutation, p_point_mutation=p_point_mutation,
p_point_replace=p_point_replace, tournament_size=tournament_size, n_jobs=n_jobs, verbose=verbose, random_state=random_state)
optimized_model = model.fit(X, y)
return optimized_model
def outputBestFitnessAndModel(optimized_model_list):
tmp_fitness = np.inf
for model in optimized_model_list:
fitness = model._best_fitness
if (fitness < tmp_fitness):
best_fitness = fitness
best_model = model
return best_fitness, best_model
def outputBestThresholdAndModel(price_df, min_threshold, max_threshold, population_size, generations, tournament_size,
stopping_criteria, const_range, init_Depth, init_method, function_set, metric,
parsimony_coefficient, p_crossover, p_subtree_mutation, p_hoist_mutation,
p_point_mutation, p_point_replace, n_jobs, verbose, random_state):
optimized_model_list = []
for threshold in np.arange(min_threshold, max_threshold, 0.0005):
df = price_df.copy()
df_dc_os = output_dataframe_dc_os(price_df, threshold)
X = (df_dc_os["endIndexDC"] - df_dc_os["startIndexDC"]).values.reshape(-1, 1)
y = (df_dc_os["startIndexOS"] - df_dc_os["endIndexOS"]).values.reshape(-1, 1)
optimized_model = gpTuning(X, y, population_size=population_size, generations=generations, tournament_size=tournament_size, stopping_criteria=stopping_criteria, const_range=const_range,
init_Depth=init_Depth, init_method=init_method, function_set=function_set, metric=metric, parsimony_coefficient=parsimony_coefficient,
p_crossover=p_crossover, p_subtree_mutation=p_subtree_mutation, p_hoist_mutation=p_hoist_mutation, p_point_mutation=p_point_mutation,
p_point_replace=p_point_replace, n_jobs=n_jobs, verbose=verbose, random_state=random_state)
return optimized_model_list
optimized_model_list = outputBestThresholdAndModel(price_df, min_threshold=0.005, max_threshold=0.01, population_size=1000, generations=100, tournament_size=20,
stopping_criteria=3, const_range=(0, 10), init_Depth=(3, 9), init_method='half and half',
function_set=('add', 'sub', 'mul', 'div', 'log', 'sin', 'cos'),
metric='rmse', parsimony_coefficient='auto', p_crossover=0.96, p_subtree_mutation=0.01, p_hoist_mutation=0.01,
p_point_mutation=0.01, p_point_replace=0.01, n_jobs=4, verbose=1, random_state=1234)
5.3 判別モデルの学習
判別モデルの作成には、原論文でAutoWeka という AutoML が使用されていたため、比較的有名なAutoMLライブラリである Pycaret を用いて代用しました。
pip install pycaret
import numpy as np
from pycaret.classification import *
def output_multi_thresholds_dataframe_classifiation(price_df, initial_threshold, max_threshold, num_threshold, search_step, min_ratio_is_os, max_ratio_is_os):
min_max_threshold_list =[]
thresholds_list = []
dataframe_classification_list = []
for threshold in np.arange(initial_threshold, max_threshold, search_step):
df_classification = output_datafraome_classification(price_df, threshold)
ratio_is_os = df_classification["IsOS"].sum() / len(df_classification)
if min_ratio_is_os <= ratio_is_os:
threshold_min_ration_is_os = threshold
print(f"Find the threshold {threshold_min_ration_is_os} for min_ratio_is_os")
for threshold in np.arange(max_threshold, initial_threshold, -search_step):
df_classification = output_datafraome_classification(price_df, threshold)
ratio_is_os = df_classification["IsOS"].sum() / len(df_classification)
if ratio_is_os <= max_ratio_is_os:
threshold_max_ration_is_os = threshold
print(f"Find the threshold {threshold_max_ration_is_os} for max_ratio_is_os")
print(f"Start the process from min_ratio_is_os to max_ratio_is_os")
print(f"threshold_min_ration_is_os: {min_max_threshold_list[0]} to threshold_max_ration_is_os: {min_max_threshold_list[1]} by {num_threshold} steps")
for step, threshold in enumerate(np.arange(min_max_threshold_list[0], min_max_threshold_list[1], (min_max_threshold_list[1] - min_max_threshold_list[0]) / num_threshold)):
df_classification = output_datafraome_classification(price_df, threshold)
if (step + 1)%5 == 0:
print(f"step: {step + 1} / {num_threshold}")
return thresholds_list, dataframe_classification_list
thresholds_list, dataframe_classification_list = output_multi_thresholds_dataframe_classifiation(price_df=price_df,
setup_data = setup(data=dataframe_classification_list[49], categorical_features=["PreviousOS", "FlashEvent"],
numeric_features= ["PreviousDCPrice", "TimeDifference", "CurrentTmv", "Sigma"],
target="IsOS", use_gpu=True, normalize_method="zscore", session_id=123)
compare_result = compare_models(sort="precision", verbose=True)
gbc = create_model("gbc")
gbc_tuned = tune_model(gbc)
以下は setup関数
でデータの前処理を行った結果です。しっかりと、目標変数が IsOS
で各モデルの学習を行った結果です。評価指標には precision
次は、パラメータチューニングを行います。現状最適なモデルは Accuracy
共に最高値を叩き出している Gradient Boosting Classifier であるため、このモデルを調整します。
6. 議論
しかし、原論文ではデータセットの生成に使用する閾値を0.010%, 0.013%, 0.015%, 0.018% and 0.020%に限定しており、そこから最適な閾値を選択していました。
0.010%と0.02%の閾値を用いたデータセットにおける 目的変数の均衡性 を、今回作者が作成したデータセットと比較するため棒グラフで説明します。
論文で示されている閾値を用いると、平均して 20~25% 程度しかOSイベントを持っていないことが分かります。そのため、不均衡データを用いた判別モデルによる評価結果ということが分かりました。
このことから、作者が作成した判別モデルの精度と論文の判別モデルの精度に差が生じた要因の一つとして、データセットの均衡性 が原因と推測します。
また遺伝的プログラミングを用いた予測に関しても同様だと考えています。DC-OSデータセットはその性質上、閾値を小さくするほどDCイベントやOSイベントの長さが短くなります。そのため、閾値が小さいほどイベントの長さのボラリティが小さい ことから予測が容易になると考えられます。
