23
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Python]multiprocessingを使って遺伝的アルゴリズムを高速化したときのメモ

Posted at

はじめに

Python、並列処理でググると、multiprocessingというモジュールの記事がいくつも出てくるので、以前書いた記事で試してみました。

[Python]FXシストレパラメータの最適化を遺伝的アルゴリズムでやってみる

遺伝的アルゴリズムは、たくさんの個体を評価してその適応度により選択、交叉などの遺伝的処理を行うのですが、それぞれの個体の評価は完全に独立しているので、並列処理に適しています。今回もその個体評価の部分に並列処理を適応させてみます。

もとのコード

並列化させる前の遺伝的アルゴリズムのコードです。上の記事のシストレパラメータの最適化に使ったものなので、汎用的なものではありません。前後のコードは省略します。

def Optimize(ohlc, Prange):
    def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #シフト関数

    SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #長期移動平均
    for i in range(len(Prange[0])):
        SlowMA[i] = ind.iMA(ohlc, Prange[0][i])

    FastMA = np.empty([len(Prange[1]), len(ohlc)]) #短期移動平均
    for i in range(len(Prange[1])):
        FastMA[i] = ind.iMA(ohlc, Prange[1][i])
    
    ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #決済用移動平均
    for i in range(len(Prange[2])):
        ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
    
    Close = ohlc['Close'].values #終値
    
    M = 20 #個体数
    Eval = np.zeros([M, 6])  #評価項目
    Param = InitParam(Prange, M) #パラメータ初期化
    gens = 0 #世代数
    while gens < 100:
        for k in range(M):
            i0 = Param[k,0]
            i1 = Param[k,1]
            i2 = Param[k,2]
            #買いエントリーシグナル
            BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
            #売りエントリーシグナル
            SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
            #買いエグジットシグナル
            BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
            #売りエグジットシグナル
            SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
            #バックテスト
            Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit) 
            Eval[k] = BacktestReport(Trade, PL)
        # 世代の交代
        Param = Evolution(Param, Eval[:,0], Prange)
        gens += 1
        #print(gens, Eval[0,0])
    Slow = Prange[0][Param[:,0]]
    Fast = Prange[1][Param[:,1]]
    Exit = Prange[2][Param[:,2]]
    return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
                         'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
                         columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])

このまま実行させて実行時間を測っておきます。

import time

start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec

map関数に置き換え

もとのコードのなかで、並列化させるのは、for文の間です。各個体のパラメータでバックテストを行い、評価を行う部分で、そこそこ時間のかかるところです。multiprocessingを使う場合、mapメソッドを使うのが簡単なそうなので、まずは、for文をmap関数に置き換えてみます。

そのために、繰り返す部分を関数化する必要がありますが、ここでちょっと注意するところがあります。ただmap関数にするだけなら、Optimize関数のなかで関数を定義した方が便利なのですが、multiprocessingを使う場合、エラーとなってしまいます。なので、Optimize関数の外側でevaluateという名前で関数の定義をしました。

mapに渡す都合上、evaluate関数の引数はkだけにしたいところです。そこで、SlowMAFastMAなどのテクニカル指標の変数はグローバル変数としました。ただし、Paramは関数の引数にしました。

SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #長期移動平均
for i in range(len(SlowMAperiod)):
    SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])

FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #短期移動平均
for i in range(len(FastMAperiod)):
    FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])

ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #決済用移動平均
for i in range(len(ExitMAperiod)):
    ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])

Close = ohlc['Close'].values #終値

#シフト関数
def shift(x, n=1):
    return np.concatenate((np.zeros(n), x[:-n]))

#並列処理させる関数
def evaluate(k,Param):
    i0 = Param[k,0]
    i1 = Param[k,1]
    i2 = Param[k,2]
    #買いエントリーシグナル
    BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
    #売りエントリーシグナル
    SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
    #買いエグジットシグナル
    BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
    #売りエグジットシグナル
    SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
    #バックテスト
    Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit) 
    return BacktestReport(Trade, PL)

for文の代わりにmap関数で置き換えたのが以下のコードです。

import functools

def Optimize(ohlc, Prange):
    M = 20 #個体数
    Eval = np.zeros([M, 4])  #評価項目
    Param = InitParam(Prange, M) #パラメータ初期化
    gens = 0 #世代数
    while gens < 100:
        #for k in range(M): Eval[k] = evaluate(k,Param)
        Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
        # 世代の交代
        Param = Evolution(Param, Eval[:,0], Prange)
        gens += 1
        #print(gens, Eval[0,0])
    Slow = Prange[0][Param[:,0]]
    Fast = Prange[1][Param[:,1]]
    Exit = Prange[2][Param[:,2]]
    return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
                         'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
                         columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])

実際にはmapだけで簡単には書けませんでした。map関数の最初の引数に、繰り返す関数evaluateを入れるのですが、evaluate関数の引数が二つあるので、二つ目の引数ParamParamに固定するようfunctools.partialを使っています。

またmapの戻り値はNumPyのarrayに変換するのですが、その前にlistに変換しなくてはいけないようです。(Pythonのバージョンによって異なるそうです。今回試したのは、Python 3.5.1です。)

これを実行すると、以下のような結果となりました。

elapsed_time = 11.157917446009389 sec

for文をmapに変えても実行時間はたいして変わらないということです。

multiprocessing

map関数に置き換えられれば、multiprocessingを導入するのは簡単です。

import functools
import multiprocessing as mp

def Optimize(ohlc, Prange):
    M = 20 #個体数
    Eval = np.zeros([M, 4])  #評価項目
    Param = InitParam(Prange, M) #パラメータ初期化
    pool = mp.Pool() #プロセスプールの作成
    gens = 0 #世代数
    while gens < 100:
        #for k in range(M): Eval[k] = evaluate(k,Param)
        Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
        # 世代の交代
        Param = Evolution(Param, Eval[:,0], Prange)
        gens += 1
        #print(gens, Eval[0,0])
    Slow = Prange[0][Param[:,0]]
    Fast = Prange[1][Param[:,1]]
    Exit = Prange[2][Param[:,2]]
    return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
                         'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
                         columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])

Poolクラスでプロセスプールを作成して、先ほどのmapの部分をpool.mapに置き換えるだけです。Poolの引数でプロセスの個数を指定します。引数を書かなければ、CPUのスレッドをすべて使うようになります。

簡単な処理だけならスレッドをすべて使ってもいいのですが、他にもコードがあるので、今回は半分強のスレッドを使うのが最も高速化できました。

Corei7の8スレッドだったので、Pool(5)にして走らせた結果が

elapsed_time = 5.766524394366197 sec

です。だいたい2倍くらいの速さになりました。もう少し速くなることを期待したのですが、個体の繰り返し以外にも遺伝的処理があったりしたためでしょう。バックテストにもっと時間のかかるシステムなら、もう少し並列化の効果が上がったかもしれません。

23
28
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
28

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?