先日作成したプログラムに食わせるインジケータを作成したので。
python 3.5.6
Platform: Windows-10-10.0.17134-SP0
Chainer: 5.0.0
NumPy: 1.15.2
CuPy: Not Available
iDeep: Not Available
ta-lib
使用に際して食わせるデータは以下。
ndarray,float32
close_float->終値
high_float->高値
low_float->安値
length->ローソク足の使用する本数
例)200期間のsmaを使う
sma200 = util.SMA(200,close_float)
とやって終値を渡してやると、以下の処理を済ませた値が返ってきます。
200期間の移動平均を算出
↓
冒頭部分に作られる[nan]が返された部分を削除
↓
残った部分の数値を使って正規化
↓
削除されて寸足らずになった部分をゼロ埋めしてclose_floatと同じ長さにする
↓
正規化されてスキマをゼロ埋めして200期間になったsmaを返す
しかしこれ…全部正規化する必要あるのかな…?
util.py
import numpy as np
import talib
from sklearn import preprocessing
#-------------各種インジケーター-------------
def ADX(length,high_float, low_float, close_float):
#ADX - Average Directional Movement Index
h_f = high_float.copy()
l_f = low_float.copy()
c_f = close_float.copy()
cache = np.c_[c_f, talib.ADX(h_f, l_f, c_f,timeperiod = length)]
adx = []
for i in cache:
if np.isnan(i[1]) != True:
adx.append(i[1])
adx = np.array(adx,dtype='float32')
adx = preprocessing.minmax_scale(adx)
while len(adx) < len(close_float):
adx = np.insert(adx,0,0)
return adx
def PDI(length,high_float, low_float, close_float):
#PLUS_DI - Plus Directional Indicator
h_f = high_float.copy()
l_f = low_float.copy()
c_f = close_float.copy()
cache = np.c_[c_f, talib.PLUS_DI(h_f, l_f, c_f,timeperiod = length)]
pdi = []
for i in cache:
if np.isnan(i[1]) != True:
pdi.append(i[1])
pdi = np.array(pdi,dtype='float32')
pdi = preprocessing.minmax_scale(pdi)
while len(pdi) < len(close_float):
pdi = np.insert(pdi,0,0)
return pdi
def MDI(length,high_float, low_float, close_float):
#MINUS_DI - Minus Directional Indicator
h_f = high_float.copy()
l_f = low_float.copy()
c_f = close_float.copy()
cache = np.c_[c_f, talib.MINUS_DI(h_f, l_f, c_f,timeperiod = length)]
mdi = []
for i in cache:
if np.isnan(i[1]) != True:
mdi.append(i[1])
mdi = np.array(mdi,dtype='float32')
mdi = preprocessing.minmax_scale(mdi)
while len(mdi) < len(close_float):
mdi = np.insert(mdi,0,0)
return mdi
def SMA(length,close_float):
# 単純移動平均(SMA -> Simple Moving Average)
c_f = close_float.copy()
cache = np.c_[c_f, talib.SMA(c_f,timeperiod = length)]
sma = []
for i in cache:
if np.isnan(i[1]) != True:
sma.append(i[1])
sma = np.array(sma,dtype='float32')
sma = preprocessing.minmax_scale(sma)
while len(sma) < len(close_float):
sma = np.insert(sma,0,0)
return sma
def RSI(length, close_float):
# RSI: Relative Strength Index
c_f = close_float.copy()
cache = np.c_[c_f, talib.RSI(c_f,timeperiod = length)]
rsi = []
for i in cache:
if np.isnan(i[1]) != True:
rsi.append(i[1])
rsi = np.array(rsi,dtype='float32')
rsi = preprocessing.minmax_scale(rsi)
while len(rsi) < len(close_float):
rsi = np.insert(rsi,0,0)
return rsi
def MACD(len_short, len_long, len_signal, close_float,):
# MACD: Moving Average Convergence/Divergence
#戻り値
#return_1 -> 短期平均(len_short) - 長期平均(len_long)
#return_2 -> シグナル(len_signal)
#return_3 -> return_1 - return_2
output = c_f = close_float.copy()
cache = talib.MACD(c_f,len_short,len_long,len_signal)
return_1 = []
return_2 = []
return_3 = []
for i in range(len(cache[0])):
if np.isnan(cache[0][i]) != True:
return_1.append(cache[0][i])
return_2.append(cache[1][i])
return_3.append(cache[2][i])
return_1 = np.array(return_1,dtype='float32')
return_1 = preprocessing.minmax_scale(return_1)
return_2 = np.array(return_2,dtype='float32')
return_2 = preprocessing.minmax_scale(return_2)
return_3 = np.array(return_3,dtype='float32')
return_3 = preprocessing.minmax_scale(return_3)
while len(return_1) < len(close_float):
return_1 = np.insert(return_1,0,0)
return_2 = np.insert(return_2,0,0)
return_3 = np.insert(return_3,0,0)
return return_1,return_2,return_3