はじめに
Python、並列処理でググると、multiprocessingというモジュールの記事がいくつも出てくるので、以前書いた記事で試してみました。
[Python]FXシストレパラメータの最適化を遺伝的アルゴリズムでやってみる
遺伝的アルゴリズムは、たくさんの個体を評価してその適応度により選択、交叉などの遺伝的処理を行うのですが、それぞれの個体の評価は完全に独立しているので、並列処理に適しています。今回もその個体評価の部分に並列処理を適応させてみます。
もとのコード
並列化させる前の遺伝的アルゴリズムのコードです。上の記事のシストレパラメータの最適化に使ったものなので、汎用的なものではありません。前後のコードは省略します。
def Optimize(ohlc, Prange):
def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #シフト関数
SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #長期移動平均
for i in range(len(Prange[0])):
SlowMA[i] = ind.iMA(ohlc, Prange[0][i])
FastMA = np.empty([len(Prange[1]), len(ohlc)]) #短期移動平均
for i in range(len(Prange[1])):
FastMA[i] = ind.iMA(ohlc, Prange[1][i])
ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #決済用移動平均
for i in range(len(Prange[2])):
ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
Close = ohlc['Close'].values #終値
M = 20 #個体数
Eval = np.zeros([M, 6]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
gens = 0 #世代数
while gens < 100:
for k in range(M):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#買いエントリーシグナル
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#売りエントリーシグナル
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#買いエグジットシグナル
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#売りエグジットシグナル
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#バックテスト
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
Eval[k] = BacktestReport(Trade, PL)
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
このまま実行させて実行時間を測っておきます。
import time
start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec
map関数に置き換え
もとのコードのなかで、並列化させるのは、for文の間です。各個体のパラメータでバックテストを行い、評価を行う部分で、そこそこ時間のかかるところです。multiprocessingを使う場合、mapメソッドを使うのが簡単なそうなので、まずは、for文をmap関数に置き換えてみます。
そのために、繰り返す部分を関数化する必要がありますが、ここでちょっと注意するところがあります。ただmap関数にするだけなら、Optimize関数のなかで関数を定義した方が便利なのですが、multiprocessingを使う場合、エラーとなってしまいます。なので、Optimize関数の外側でevaluateという名前で関数の定義をしました。
mapに渡す都合上、evaluate関数の引数はkだけにしたいところです。そこで、SlowMA、FastMAなどのテクニカル指標の変数はグローバル変数としました。ただし、Paramは関数の引数にしました。
SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #長期移動平均
for i in range(len(SlowMAperiod)):
SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])
FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #短期移動平均
for i in range(len(FastMAperiod)):
FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])
ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #決済用移動平均
for i in range(len(ExitMAperiod)):
ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])
Close = ohlc['Close'].values #終値
# シフト関数
def shift(x, n=1):
return np.concatenate((np.zeros(n), x[:-n]))
# 並列処理させる関数
def evaluate(k,Param):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#買いエントリーシグナル
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#売りエントリーシグナル
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#買いエグジットシグナル
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#売りエグジットシグナル
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#バックテスト
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
return BacktestReport(Trade, PL)
for文の代わりにmap関数で置き換えたのが以下のコードです。
import functools
def Optimize(ohlc, Prange):
M = 20 #個体数
Eval = np.zeros([M, 4]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
gens = 0 #世代数
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
実際にはmapだけで簡単には書けませんでした。map関数の最初の引数に、繰り返す関数evaluateを入れるのですが、evaluate関数の引数が二つあるので、二つ目の引数ParamはParamに固定するようfunctools.partialを使っています。
またmapの戻り値はNumPyのarrayに変換するのですが、その前にlistに変換しなくてはいけないようです。(Pythonのバージョンによって異なるそうです。今回試したのは、Python 3.5.1です。)
これを実行すると、以下のような結果となりました。
elapsed_time = 11.157917446009389 sec
for文をmapに変えても実行時間はたいして変わらないということです。
multiprocessing
map関数に置き換えられれば、multiprocessingを導入するのは簡単です。
import functools
import multiprocessing as mp
def Optimize(ohlc, Prange):
M = 20 #個体数
Eval = np.zeros([M, 4]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
pool = mp.Pool() #プロセスプールの作成
gens = 0 #世代数
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Poolクラスでプロセスプールを作成して、先ほどのmapの部分をpool.mapに置き換えるだけです。Poolの引数でプロセスの個数を指定します。引数を書かなければ、CPUのスレッドをすべて使うようになります。
簡単な処理だけならスレッドをすべて使ってもいいのですが、他にもコードがあるので、今回は半分強のスレッドを使うのが最も高速化できました。
Corei7の8スレッドだったので、Pool(5)にして走らせた結果が
elapsed_time = 5.766524394366197 sec
です。だいたい2倍くらいの速さになりました。もう少し速くなることを期待したのですが、個体の繰り返し以外にも遺伝的処理があったりしたためでしょう。バックテストにもっと時間のかかるシステムなら、もう少し並列化の効果が上がったかもしれません。