はじめに
Python、並列処理でググると、multiprocessingというモジュールの記事がいくつも出てくるので、以前書いた記事で試してみました。
[Python]FXシストレパラメータの最適化を遺伝的アルゴリズムでやってみる
遺伝的アルゴリズムは、たくさんの個体を評価してその適応度により選択、交叉などの遺伝的処理を行うのですが、それぞれの個体の評価は完全に独立しているので、並列処理に適しています。今回もその個体評価の部分に並列処理を適応させてみます。
もとのコード
並列化させる前の遺伝的アルゴリズムのコードです。上の記事のシストレパラメータの最適化に使ったものなので、汎用的なものではありません。前後のコードは省略します。
def Optimize(ohlc, Prange):
def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #シフト関数
SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #長期移動平均
for i in range(len(Prange[0])):
SlowMA[i] = ind.iMA(ohlc, Prange[0][i])
FastMA = np.empty([len(Prange[1]), len(ohlc)]) #短期移動平均
for i in range(len(Prange[1])):
FastMA[i] = ind.iMA(ohlc, Prange[1][i])
ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #決済用移動平均
for i in range(len(Prange[2])):
ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
Close = ohlc['Close'].values #終値
M = 20 #個体数
Eval = np.zeros([M, 6]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
gens = 0 #世代数
while gens < 100:
for k in range(M):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#買いエントリーシグナル
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#売りエントリーシグナル
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#買いエグジットシグナル
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#売りエグジットシグナル
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#バックテスト
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
Eval[k] = BacktestReport(Trade, PL)
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
このまま実行させて実行時間を測っておきます。
import time
start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))
elapsed_time = 11.180512751173708 sec
map関数に置き換え
もとのコードのなかで、並列化させるのは、for
文の間です。各個体のパラメータでバックテストを行い、評価を行う部分で、そこそこ時間のかかるところです。multiprocessingを使う場合、map
メソッドを使うのが簡単なそうなので、まずは、for
文をmap
関数に置き換えてみます。
そのために、繰り返す部分を関数化する必要がありますが、ここでちょっと注意するところがあります。ただmap
関数にするだけなら、Optimize
関数のなかで関数を定義した方が便利なのですが、multiprocessingを使う場合、エラーとなってしまいます。なので、Optimize
関数の外側でevaluate
という名前で関数の定義をしました。
map
に渡す都合上、evaluate
関数の引数はk
だけにしたいところです。そこで、SlowMA
、FastMA
などのテクニカル指標の変数はグローバル変数としました。ただし、Param
は関数の引数にしました。
SlowMA = np.empty([len(SlowMAperiod), len(ohlc)]) #長期移動平均
for i in range(len(SlowMAperiod)):
SlowMA[i] = ind.iMA(ohlc, SlowMAperiod[i])
FastMA = np.empty([len(FastMAperiod), len(ohlc)]) #短期移動平均
for i in range(len(FastMAperiod)):
FastMA[i] = ind.iMA(ohlc, FastMAperiod[i])
ExitMA = np.empty([len(ExitMAperiod), len(ohlc)]) #決済用移動平均
for i in range(len(ExitMAperiod)):
ExitMA[i] = ind.iMA(ohlc, ExitMAperiod[i])
Close = ohlc['Close'].values #終値
#シフト関数
def shift(x, n=1):
return np.concatenate((np.zeros(n), x[:-n]))
#並列処理させる関数
def evaluate(k,Param):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#買いエントリーシグナル
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#売りエントリーシグナル
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#買いエグジットシグナル
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#売りエグジットシグナル
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#バックテスト
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
return BacktestReport(Trade, PL)
for
文の代わりにmap
関数で置き換えたのが以下のコードです。
import functools
def Optimize(ohlc, Prange):
M = 20 #個体数
Eval = np.zeros([M, 4]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
gens = 0 #世代数
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(map(functools.partial(evaluate, Param=Param), np.arange(M))))
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
実際にはmap
だけで簡単には書けませんでした。map
関数の最初の引数に、繰り返す関数evaluate
を入れるのですが、evaluate
関数の引数が二つあるので、二つ目の引数Param
はParam
に固定するようfunctools.partial
を使っています。
またmap
の戻り値はNumPyのarrayに変換するのですが、その前にlistに変換しなくてはいけないようです。(Pythonのバージョンによって異なるそうです。今回試したのは、Python 3.5.1です。)
これを実行すると、以下のような結果となりました。
elapsed_time = 11.157917446009389 sec
for
文をmap
に変えても実行時間はたいして変わらないということです。
multiprocessing
map
関数に置き換えられれば、multiprocessingを導入するのは簡単です。
import functools
import multiprocessing as mp
def Optimize(ohlc, Prange):
M = 20 #個体数
Eval = np.zeros([M, 4]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
pool = mp.Pool() #プロセスプールの作成
gens = 0 #世代数
while gens < 100:
#for k in range(M): Eval[k] = evaluate(k,Param)
Eval = np.array(list(pool.map(functools.partial(evaluate, Param=Param), np.arange(M))))
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
#print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
Pool
クラスでプロセスプールを作成して、先ほどのmap
の部分をpool.map
に置き換えるだけです。Pool
の引数でプロセスの個数を指定します。引数を書かなければ、CPUのスレッドをすべて使うようになります。
簡単な処理だけならスレッドをすべて使ってもいいのですが、他にもコードがあるので、今回は半分強のスレッドを使うのが最も高速化できました。
Corei7の8スレッドだったので、Pool(5)
にして走らせた結果が
elapsed_time = 5.766524394366197 sec
です。だいたい2倍くらいの速さになりました。もう少し速くなることを期待したのですが、個体の繰り返し以外にも遺伝的処理があったりしたためでしょう。バックテストにもっと時間のかかるシステムなら、もう少し並列化の効果が上がったかもしれません。