以前に検討していた取り組みの続報である。
新規内容
今回のコンセプトでは、以下の点を改めることとした。
- テクニカル指標RSIを直接の予想に使う → 前回はチャートの終値を使用していた
- RSI-BREAKOUTインジケータを追加する → サポートラインとの位置関係も学習させる
- チャート予想をマルチタイム化する → 予測時は、週足、日足、時間足、15分足を併用する
詳細
PIX2PIXを使う発想は前回とは変えていない。
RSI-BREAKOUT 説明書 → https://github.com/chanmoto/rsi_breakout/blob/master/RSI_BREAKOUT%E3%80%80%E5%8F%96%E6%89%B1%E8%AA%AC%E6%98%8E%E6%9B%B8.pdf
(web版) → https://chanmoto.github.io/
(リポジトリ) → https://github.com/chanmoto/chanmoto.github.io/tree/master
データマッピング
PIX2PIXはRGBの3CHにマッピングできるため、終値、サポート(下値支持線)、レジスタンス(上値抵抗線)を使う
黒 → 終値
赤 → サポート(下値支持線)
青 → レジスタンス(上値抵抗線)
PIX2PIXでは重複部分をスライドさせて、各々をマッピングするのは、前回と同様である。
プログラミング
まずはRSI-BREAKOUTのソースコードをpythonに移植する。
def calculate_rsi(prices, period):
# Calculate the initial up/down movements.
up_moves = []
down_moves = []
prev_price = prices[0]
for price in prices[1:]:
if price > prev_price:
up_moves.append(price - prev_price)
down_moves.append(0)
else:
up_moves.append(0)
down_moves.append(prev_price - price)
prev_price = price
# Calculate the initial up/down averages.
up_avg = sum(up_moves[:period]) / period
down_avg = sum(down_moves[:period]) / period
# Calculate the first RSI value.
rs = up_avg / down_avg
rsi = [100 - 100 / (1 + rs)]
# Calculate the remaining RSI values.
for i in range(period, len(prices)):
up_chg = up_moves[i-1]
down_chg = down_moves[i-1]
up_avg = (up_avg * (period - 1) + up_chg) / period
down_avg = (down_avg * (period - 1) + down_chg) / period
rs = up_avg / down_avg
rsi.append(100 - 100 / (1 + rs))
return rsi
def calculate_sma(Price, Period):
Price.reverse()
sma = [0] * len(Price)
sma[Period - 1] = sum(Price[:Period]) / Period
for i in range(Period, len(Price)):
sma[i] = sma[i - 1] + (Price[i] - Price[i - Period]) / Period
Price.reverse()
sma.reverse()
return sma
def cal_kairi(RSI,MovRsi):
kBuf = []
for i in range(len(RSI) - 1):
if MovRsi[i] !=0:
kBuf.append((RSI[i] - MovRsi[i]) / MovRsi[i])
else:
kBuf.append(0)
return kBuf
RSI_BREAKOUT
- RSI-breakoutの関数は、移動平均の乖離率から極大、極小を求めてトレンドラインを引いている
- 実体線は過去の実績から計算されるトレンドライン。最高値や最安値をそれぞれ結ぶことで、乖離率最大のラインを形成する
- 予想線は実体線を延長したもの。トレンドラインは最高値や最安値を結んだ同一ラインであり、区間のトレンドラインの延長上に形成されやすい特徴がある
戻り値
buf1 上側の実体線 nLine=0 の時だけ 予想線
buf2 下側の実体線 nLine=0 の時だけ 予想線
def RSI_BREAKOUT(start,size, RSI_,MovRsi_,kBuf_,nLine):
RSI=RSI_[start:start + size*10]
MovRsi=MovRsi_[start:start + size*10]
kBuf=kBuf_[start:start + size*10]
min_gap = 5
j = 0
ii = []
for i in range(0, len(kBuf)):
print(i)
if kBuf[i] * kBuf[i + 1] <= 0:
ii.append(i)
j += 1
if j > 2:
if ii[j - 1] - ii[j - 3] < min_gap:
j -= 2
HiStack = []
LoStack = []
if kBuf[ii[0]] < 0:
for linel in range(nLine + 3):
a1 = ii[0 + linel * 2] + 1
a2 = ii[1 + linel * 2] + 1
b1 = ii[1 + linel * 2] + 1
b2 = ii[2 + linel * 2] + 1
HiStack.append(RSI.index(max(RSI[a1:a2]), a1))
LoStack.append(RSI.index(min(RSI[b1:b2]), b1))
else:
for linel in range(nLine + 3):
a1 = ii[1 + linel * 2] + 1
a2 = ii[2 + linel * 2] + 1
b1 = ii[0 + linel * 2] + 1
b2 = ii[1 + linel * 2] + 1
HiStack.append(RSI.index(max(RSI[a1:a2]), a1))
LoStack.append(RSI.index(min(RSI[b1:b2]), b1))
buf1 = [float("nan")] * (size*5)
buf2 = [float("nan")] * (size*5)
for linel in range(nLine):
hp1 = RSI[HiStack[linel]]
hp2 = RSI[HiStack[linel + 1]]
hp1n = HiStack[linel]
hp2n = HiStack[linel + 1]
hp3n = HiStack[linel - 1] if linel - 1 >= 0 else None
if hp2n != hp1n:
rh = (hp2 - hp1) / (hp2n - hp1n)
lp1 = RSI[LoStack[linel]]
lp2 = RSI[LoStack[linel + 1]]
lp1n = LoStack[linel]
lp2n = LoStack[linel + 1]
lp3n = LoStack[linel - 1] if linel - 1 >= 0 else None
if lp1n != lp2n:
rl = (lp1 - lp2) / (lp2n - lp1n)
for k in range(hp2n - hp1n):
buf1[hp1n + k] = hp1 + rh * k
for k in range(lp2n - lp1n):
buf2[lp1n + k] = lp1 - rl * k
if linel ==0: #nLine=0 の時だけ 予想線
if hp3n is None:
for k in range(hp1n + 1):
buf1[hp1n - k] = hp1 - rh * k
else:
for k in range(hp1n - hp3n):
buf1[hp1n - k] = hp1 - rh * k
if hp3n is None:
for k in range(lp1n + 1):
buf2[lp1n - k] = lp1 + rl * k
else:
for k in range(lp1n - lp3n):
buf2[lp1n - k] = lp1 + rl * k
return {
"rsi": RSI[0:size],
"rsimov": MovRsi[0:size],
"buf1": buf1[0:size],
"buf2": buf2[0:size],
}
RSI平滑化に関して
これは副二次的なものではあるが、この1週間に調べまくった成果で
RSIのダマシを回避するのに、RSI平滑化を取り入れてみた。
- 平滑化前 → RSI計算に、CLOSEを使用
- 平滑化後 → HI,LOW,CLOSEのRSIで平均化
def pix2image(
db: Session,
count_max: int = 100,
mode: str = "A", # 訓練時はAとする。予測の場合はBとする。
target: str = "train", # train or test
size: int = 64,
slide: int = 32,
Period: int = 20,
nPeriod_MA: int =20,
nLine: int = 2
):
models=[Forex_short] #Forex_middle, Forex_long
start = 0
print("start : {}".format(start))
imgs = []
count = 0
min2pow = int(math.pow(2, int(math.log2(size))))
df_length = min2pow+slide
#pdb.set_trace()
price = forex.get_dataframe_all(db=db,model=Forex_short)
RSI_close = calculate_rsi( [x.close for x in price], Period)
RSI_hi = calculate_rsi( [x.high for x in price], Period)
RSI_lo = calculate_rsi( [x.low for x in price], Period)
RSI_ = []
for x, y ,z in zip(RSI_close,RSI_hi,RSI_lo):
# 各タプルに対して、sum関数とlen関数で平均値を計算します。
average = sum((x, y , z)) / len((x, y , z))
# 平均値を新しいリストに追加します。
RSI_.append(average)
MovRsi_ = calculate_sma(RSI_, nPeriod_MA)
kBuf = cal_kairi(RSI_,MovRsi_)
for i,t in enumerate(price):
print(i,RSI_[i])
df11 = RSI_BREAKOUT(i,size, RSI_ ,MovRsi_,kBuf,nLine)
plt.plot(df11['buf1'],label='kBuf')
plt.plot(df11["rsi"],label= "rsi")
plt.plot(df11["rsimov"],label = "rsimov")
plt.plot(df11["buf1"],label= "buf1")
plt.plot(df11["buf2"],label="buf2")
plt.legend()
ax = plt.gca()
ax.invert_xaxis() # Reverse the x-axis
plt.show()
if len(df11) == df_length: # and len(df22) == df_length and len(df33) == df_length:
img = imagemake(df11, size=min2pow, slide=slide,mode=mode)
fname = MLConfig.paths[target] + \
price.dt.strftime('%Y-%m-%d %H_%M_00') + '.png'
img.save(fname)
imgs.append({"img": img, "fname": fname, "date": dt})
count += 1
if count >= count_max:
break
return count,imgs

