0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AIシステムトレード PIX2PIX編 その2

Last updated at Posted at 2023-05-20

以前に検討していた取り組みの続報である。

新規内容

今回のコンセプトでは、以下の点を改めることとした。

  • テクニカル指標RSIを直接の予想に使う → 前回はチャートの終値を使用していた
  • RSI-BREAKOUTインジケータを追加する → サポートラインとの位置関係も学習させる
  • チャート予想をマルチタイム化する → 予測時は、週足、日足、時間足、15分足を併用する

詳細

PIX2PIXを使う発想は前回とは変えていない。
RSI-BREAKOUT 説明書 → https://github.com/chanmoto/rsi_breakout/blob/master/RSI_BREAKOUT%E3%80%80%E5%8F%96%E6%89%B1%E8%AA%AC%E6%98%8E%E6%9B%B8.pdf
(web版)  →  https://chanmoto.github.io/
(リポジトリ) → https://github.com/chanmoto/chanmoto.github.io/tree/master

データマッピング

PIX2PIXはRGBの3CHにマッピングできるため、終値、サポート(下値支持線)、レジスタンス(上値抵抗線)を使う

黒 → 終値
赤 → サポート(下値支持線)
青 → レジスタンス(上値抵抗線)

image.png

PIX2PIXでは重複部分をスライドさせて、各々をマッピングするのは、前回と同様である。

プログラミング

まずはRSI-BREAKOUTのソースコードをpythonに移植する。

indicator.py
def calculate_rsi(prices, period):
    # Calculate the initial up/down movements.
    up_moves = []
    down_moves = []
    prev_price = prices[0]
    for price in prices[1:]:
        if price > prev_price:
            up_moves.append(price - prev_price)
            down_moves.append(0)
        else:
            up_moves.append(0)
            down_moves.append(prev_price - price)
        prev_price = price
    
    # Calculate the initial up/down averages.
    up_avg = sum(up_moves[:period]) / period
    down_avg = sum(down_moves[:period]) / period
    
    # Calculate the first RSI value.
    rs = up_avg / down_avg
    rsi = [100 - 100 / (1 + rs)]
    
    # Calculate the remaining RSI values.
    for i in range(period, len(prices)):
        up_chg = up_moves[i-1]
        down_chg = down_moves[i-1]
        up_avg = (up_avg * (period - 1) + up_chg) / period
        down_avg = (down_avg * (period - 1) + down_chg) / period
        rs = up_avg / down_avg
        rsi.append(100 - 100 / (1 + rs))
    
    return rsi


def calculate_sma(Price, Period):
    Price.reverse()
    sma = [0] * len(Price)
    sma[Period - 1] = sum(Price[:Period]) / Period

    for i in range(Period, len(Price)):
        sma[i] = sma[i - 1] + (Price[i] - Price[i - Period]) / Period

    Price.reverse()
    sma.reverse()
    return sma

def cal_kairi(RSI,MovRsi):

    kBuf = []
    for i in range(len(RSI) - 1):
        if MovRsi[i] !=0:
            kBuf.append((RSI[i] - MovRsi[i]) / MovRsi[i])
        else:
            kBuf.append(0)
    return kBuf
    

RSI_BREAKOUT

  • RSI-breakoutの関数は、移動平均の乖離率から極大、極小を求めてトレンドラインを引いている
  • 実体線は過去の実績から計算されるトレンドライン。最高値や最安値をそれぞれ結ぶことで、乖離率最大のラインを形成する
  • 予想線は実体線を延長したもの。トレンドラインは最高値や最安値を結んだ同一ラインであり、区間のトレンドラインの延長上に形成されやすい特徴がある

戻り値

buf1 上側の実体線 nLine=0 の時だけ 予想線
buf2 下側の実体線 nLine=0 の時だけ 予想線

image.png

rsi_breakout.py

def RSI_BREAKOUT(start,size, RSI_,MovRsi_,kBuf_,nLine):

    RSI=RSI_[start:start + size*10]
    MovRsi=MovRsi_[start:start + size*10]
    kBuf=kBuf_[start:start + size*10]

    min_gap = 5
    j = 0
    ii = []
    for i in range(0, len(kBuf)):
        print(i)
        if kBuf[i] * kBuf[i + 1] <= 0:
            ii.append(i)
            j += 1
            if j > 2:
                if ii[j - 1] - ii[j - 3] < min_gap:
                    j -= 2

    HiStack = []
    LoStack = []

    if kBuf[ii[0]] < 0:
        for linel in range(nLine + 3):
            a1 = ii[0 + linel * 2] + 1
            a2 = ii[1 + linel * 2] + 1
            b1 = ii[1 + linel * 2] + 1
            b2 = ii[2 + linel * 2] + 1

            HiStack.append(RSI.index(max(RSI[a1:a2]), a1))
            LoStack.append(RSI.index(min(RSI[b1:b2]), b1))
    else:
        for linel in range(nLine + 3):
            a1 = ii[1 + linel * 2] + 1
            a2 = ii[2 + linel * 2] + 1
            b1 = ii[0 + linel * 2] + 1
            b2 = ii[1 + linel * 2] + 1

            HiStack.append(RSI.index(max(RSI[a1:a2]), a1))
            LoStack.append(RSI.index(min(RSI[b1:b2]), b1))

    buf1 = [float("nan")] * (size*5)
    buf2 = [float("nan")] * (size*5)

    for linel in range(nLine):
        hp1 = RSI[HiStack[linel]]
        hp2 = RSI[HiStack[linel + 1]]
        hp1n = HiStack[linel]
        hp2n = HiStack[linel + 1]
        hp3n = HiStack[linel - 1] if linel - 1 >= 0 else None

        if hp2n != hp1n:
            rh = (hp2 - hp1) / (hp2n - hp1n)

        lp1 = RSI[LoStack[linel]]
        lp2 = RSI[LoStack[linel + 1]]
        lp1n = LoStack[linel]
        lp2n = LoStack[linel + 1]
        lp3n = LoStack[linel - 1] if linel - 1 >= 0 else None

        if lp1n != lp2n:
            rl = (lp1 - lp2) / (lp2n - lp1n)

        for k in range(hp2n - hp1n):
            buf1[hp1n + k] = hp1 + rh * k
        for k in range(lp2n - lp1n):
            buf2[lp1n + k] = lp1 - rl * k

        if linel ==0: #nLine=0 の時だけ 予想線
            if hp3n is None:
                for k in range(hp1n + 1):
                    buf1[hp1n - k] = hp1 - rh * k
            else:
                for k in range(hp1n - hp3n):
                    buf1[hp1n - k] = hp1 - rh * k

            if hp3n is None:
                for k in range(lp1n + 1):
                    buf2[lp1n - k] = lp1 + rl * k
            else:
                for k in range(lp1n - lp3n):
                    buf2[lp1n - k] = lp1 + rl * k

    return {
        "rsi": RSI[0:size],
        "rsimov": MovRsi[0:size],
        "buf1": buf1[0:size],
        "buf2": buf2[0:size],
    }

RSI平滑化に関して

これは副二次的なものではあるが、この1週間に調べまくった成果で
RSIのダマシを回避するのに、RSI平滑化を取り入れてみた。

  • 平滑化前 → RSI計算に、CLOSEを使用
  • 平滑化後 → HI,LOW,CLOSEのRSIで平均化
pix2image.py

def pix2image(
    db: Session,
    count_max: int = 100,
    mode: str = "A",  # 訓練時はAとする。予測の場合はBとする。
    target: str = "train",  # train or test
    size: int = 64,
    slide: int = 32,
    Period: int = 20,
    nPeriod_MA: int =20,
    nLine: int = 2
):

    models=[Forex_short] #Forex_middle, Forex_long
    start = 0
    print("start : {}".format(start))

    imgs = []
    count = 0
    min2pow = int(math.pow(2, int(math.log2(size))))
    df_length = min2pow+slide
    #pdb.set_trace()

    price = forex.get_dataframe_all(db=db,model=Forex_short)
    RSI_close = calculate_rsi( [x.close for x in price], Period)
    RSI_hi = calculate_rsi( [x.high for x in price], Period)
    RSI_lo = calculate_rsi( [x.low for x in price], Period)
    RSI_ = []
    for x, y ,z in zip(RSI_close,RSI_hi,RSI_lo):
        # 各タプルに対して、sum関数とlen関数で平均値を計算します。
        average = sum((x, y , z)) / len((x, y , z))
        # 平均値を新しいリストに追加します。
        RSI_.append(average)

    MovRsi_ = calculate_sma(RSI_, nPeriod_MA)
    kBuf = cal_kairi(RSI_,MovRsi_)
    
    for i,t in enumerate(price):
        print(i,RSI_[i])
        df11 = RSI_BREAKOUT(i,size, RSI_ ,MovRsi_,kBuf,nLine)
        plt.plot(df11['buf1'],label='kBuf')
        plt.plot(df11["rsi"],label= "rsi")
        plt.plot(df11["rsimov"],label = "rsimov")
        plt.plot(df11["buf1"],label= "buf1")
        plt.plot(df11["buf2"],label="buf2")
        plt.legend()
        ax = plt.gca()
        ax.invert_xaxis() # Reverse the x-axis
        plt.show()

        if len(df11) == df_length: # and len(df22) == df_length and len(df33) == df_length:
            img = imagemake(df11, size=min2pow, slide=slide,mode=mode)
            fname = MLConfig.paths[target] + \
            price.dt.strftime('%Y-%m-%d %H_%M_00') + '.png'

            img.save(fname)
            imgs.append({"img": img, "fname": fname, "date": dt})
            count += 1
            if count >= count_max:
                break
        
    return count,imgs
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?