インプライド・ボラティリティとかブラックショールズ方程式については、
Black–Scholes model - Wikipedia
使っているデータの読み込みなどについては
日経平均オプション理論価格等情報のPandas読み込み~整形まで - 夜間飛行
を参照のこと
してること
- オプション価格からインプライド・ボラティリティの計算
- 計算速度の比較(より正確には実行時間の比較)
$\sigma $ に関する方程式($ C(K,T)$は $ \sigma $の関数であることに注意)
市場価格 $ =C(K,T) $
を解いて得られる $ \sigma $をインプライド・ボラティリティという。
の通り、C(K,T)が市場価格となるσを求める計算をするわけですが、C(K,T)が単純に解けないので、求根アルゴリズムなどを使って、インプライドボラティリティを求めるわけです。
そのうちの方法の一つは、Bisection method - Wikipediaですね。
計算条件
- 2017/02/10の2017年3月限 日経225オプションデータ
- 残存日数 19.75日
- リスクフリーレート = 0.0%
実施項目
- Scipy使った最適化計算 (数値解) / pandas.apply - Single Process
- Scipy使った最適化計算 (数値解) / ndarrayスカラー関数最適化 x ループ : 関数のベクトル化 - Single Process
- Scipy使った最適化計算 (数値解) / ndarrayベクトル関数最適化 : 関数のベクトル化 - Single Process
- ndarray最適化Cython /ndarrayベクトル関数最適化- Single Process
- ndarray最適化Cython /ndarrayスカラー関数最適化ベクトル- Single Process
- Scipy使った最適化計算 (数値解) / pandas.apply + Multi Process
- ANNを使った逆関数の学習⇒インファレンスエンジンとして利用(Keras)
- Swig利用(C++実装)/シングルループ - Single Process
実行時間結果
Pythonで色々な工夫をするよりC++で実装したものが圧倒的に速くて、Cythonの努力なんだったの…と思えてくる。
しかもC++は未だスレッド並列化などの余力を残しての、このタイム…っ!
PythonのマルチプロセスはWindwosの場合だと毎回Kerasのライブラリを読みに行ってしまうため、ここが悪さしているのだけど、これをなくした個別実行バージョンでも35秒ほどかかってた。
ANNを使った逆関数は速いが精度が悪い。NNの設計でも限界があると思う。
(たぶんWGAN/StackGANみたいなことしないと無理)
項目 | Data Handling | Optimize | BS_Vectorized | Proc | Time[sec] |
---|---|---|---|---|---|
Scipy使った最適化計算 (数値解) / pandas.apply - Single Process | Pd.apply | minimize_scalar(Scipy) | No | Single | 2.8061 |
Scipy使った最適化計算 (数値解) / ndarrayスカラー関数最適化 x ループ : 関数のベクトル化 - Single Process | Pd->np | minimize_scalar(Scipy) | Yes | Single | 3.2068 |
Scipy使った最適化計算 (数値解) / ndarrayベクトル関数最適化 : 関数のベクトル化 - Single Process | Pd->np | root(Scipy) | Yes | Single | 0.6706 |
ndarray最適化Cython /ndarrayベクトル関数最適化- Single Process | Pd->np(CyPy) | root(Scipy) | Yes | Single | 0.6860 |
ndarray最適化Cython /ndarrayスカラー関数最適化ベクトル- Single Process | Pd->np(CyPy) | VectBisection(Cython) | Yes | Single | 0.4848 |
Scipy使った最適化計算 (数値解) / pandas.apply + Multi Process | Pd->Split(Pdx8)->np(CyPy) | VectBisection(Cython) | Yes | MultiProc | 128.3856 |
ANNを使った逆関数の学習⇒インファレンスエンジンとして利用(Keras) | Pd->np->ANN | N/A | Yes | Single | 0.1526 |
Swig利用(C++実装)/シングルループ - Single Process | Pd->np->C++(Swig) | Bisection(C++) | No | Single | 0.0010 |
ソース
# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np
from keras.models import load_model
from datetime import datetime
import dateutil
import scipy.optimize
from scipy.stats import norm
# Cython版
from BS_Cy import *
# Swig版
from _BS import *
import multiprocessing as mp
import time
import matplotlib.pyplot as plt
maturity = 19.75/245.
def strip(text):
try:
return text.strip()
except AttributeError:
return text
def BS_norm(F225, Strike, maturity, vol, call):
volSQM = vol * (maturity ** .5)
d1 = np.log(F225/Strike) / volSQM + volSQM/2
d2 = d1 - volSQM
return (F225 * norm.cdf(d1) - Strike*norm.cdf(d2)) if call else \
(Strike * norm.cdf(-d2) - F225*norm.cdf(-d1))
def BS_norm_vect(F225, Strike, maturity, vol, call):
volSQM = vol * (maturity ** .5)
d1 = np.log(F225/Strike) / volSQM + volSQM/2
d2 = d1 - volSQM
Call = F225 * norm.cdf(d1) - Strike*norm.cdf(d2)
Put = Strike * norm.cdf(-d2) - F225*norm.cdf(-d1)
premium = Call * call + Put * np.logical_not(call)
return premium
def myapply(x):
res = scipy.optimize.minimize_scalar(
lambda vol:
(
BS_norm(x['F225_PRICE'], x['STRIKE'], maturity, vol, x['CALL'])
- x['OP_PRICE']
)**2,
method='Bounded', bounds =(0.01, 0.5),
options={'maxiter': 50, 'xatol': 1e-04})
x['vol1'] = res.x
return x
def myapply_vect(F225, Strike, price, call):
x = []
for i in range(len(F225)):
res = scipy.optimize.minimize_scalar(
lambda vol:
( BS_norm_vect(F225[i], Strike[i], maturity, vol,call[i]) - price[i] )**2,
method='Bounded', bounds =(0.01, 0.5),
options={'maxiter': 50, 'xatol': 1e-04})
x.append(res.x)
return x
def myapply_vect2(F225, Strike, price, call):
res = scipy.optimize.root(
lambda vol:( BS_norm_vect(F225, Strike, maturity, vol,call) - price),
np.ones_like(F225)*.3)
return res.x
def pworker(df):
df['vol6'] = cy_apply2(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
return df
if __name__ == '__main__':
# # データ準備
# [オプション理論価格等情報 \| 日本取引所グループ](http://www.jpx.co.jp/markets/derivatives/option-price/01.html)から入手した2017年2月10日の終値データ。
# 色々入っているけど2017年3月限の原資産:日経225のオプション取引データだけ使う。
#
# ## Pandasへデータ読み込み
# 1. データ本体と別に配布されているヘッダー情報をもとに自分でヘッダーを起こして名前をつける
# 2. 読み込み中にテキストデータの空白をトリムする
# ヘッダー名 (変数名順)
# 商品コード, 商品タイプ, 限月, 権利行使価格, 予備
# プットオプション: 銘柄コード, 終値, 予備, 理論価格, ボラティリティ
# コールオプション: 銘柄コード, 終値, 予備, 理論価格, ボラティリティ
# 原資産終値, 基準ボラティリティ
colName = ("CODE","TYPE","MATURITY","STRIKE", "RSV",
"PUT_CODE", "PUT_PRICE", "PUT_RSV", "PUT_TPRICE", "PUT_VOLATILITY",
"CALL_CODE","CALL_PRICE","CALL_RSV","CALL_TPRICE","CALL_VOLATILITY",
"F225_PRICE", "Base_VOL")
df = pd.read_csv('./ose20170210tp.csv',names=colName,
converters = {'CODE' : strip,
'TYPE' : strip})
# 2017年3月限の日経225オプションだけ抜き出し。ついでに要らない列を削除してスッキリ。
df = df.query("MATURITY == 201703 & CODE==\"NK225E\"") .drop(['RSV','PUT_RSV','CALL_RSV','PUT_CODE','CALL_CODE','CODE','TYPE','MATURITY'], 1)
# * PUTとCALLが分かれてしまっているので、データを正規化。
# * CALL列がTRUEのときCALLのデータ、FALSEのとき、PUTのデータとする。
# * 行使価格も14000円以上、22000円未満に絞る
df_p = df[["STRIKE","PUT_PRICE","PUT_TPRICE", "PUT_VOLATILITY","F225_PRICE", "Base_VOL"]] .rename(columns={'PUT_PRICE': 'OP_PRICE', 'PUT_TPRICE':'OP_TPRICE', 'PUT_VOLATILITY':'OP_VOL'})
df_p['CALL'] = False
df_c = df[["STRIKE","CALL_PRICE","CALL_TPRICE", "CALL_VOLATILITY","F225_PRICE", "Base_VOL"]] .rename(columns={'CALL_PRICE': 'OP_PRICE', 'CALL_TPRICE':'OP_TPRICE', 'CALL_VOLATILITY':'OP_VOL'})
df_c['CALL'] = True
df = df_p.append(df_c).query("OP_PRICE > 1.0 & STRIKE < 22000 & STRIKE >= 14000")
del (df_p,df_c)
tmp_df = df
loop_num = 10
text = 'Time elapsed: %.2f seconds'
result_time = []
result_Col = np.array([["Data Handling","Optimize","BS_Vectorized","Proc","Time[sec]"]])
result_con = np.array([["Pd.apply",
"Pd->np",
"Pd->np",
"Pd->np(CyPy)",
"Pd->np(CyPy)",
"Pd->Split(Pd x 8)->np(CyPy)",
"Pd->np->ANN",
"Pd->np->C++(Swig)"
]])
result_opt = np.array([["minimize_scalar(Scipy)",
"minimize_scalar(Scipy)",
"root(Scipy)",
"root(Scipy)",
"Vect Bisection(Cython)",
"Vect Bisection(Cython)",
"N/A",
"Bisection(C++)"
]])
result_Vect = np.array([["No",
"Yes",
"Yes",
"Yes",
"Yes",
"Yes",
"Yes",
"No"
]])
result_Proc = np.array([["Single",
"Single",
"Single",
"Single",
"Single",
"Multi Proc",
"Single",
"Single"
]])
# 1. Scipy使った最適化計算 (数値解) / pandas.apply - Single Process
time_start = time.time()
for i in range(loop_num):
tmp_df = df.apply(myapply, axis=1)
result_time.append((time.time() - time_start))
# 2. Scipy使った最適化計算 (数値解) / ndarrayループ : 関数のベクトル化 - Single Process
time_start = time.time()
for i in range(loop_num):
tmp_df['vol2'] = myapply_vect(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 3. Scipy使った最適化計算 (数値解) / ndarrayベクトル : 関数のベクトル化 - Single Process
time_start = time.time()
for i in range(loop_num):
tmp_df['vol3'] = myapply_vect2(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 4. Cython - Root
time_start = time.time()
for i in range(loop_num):
tmp_df['vol4'] = cy_apply1(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 5. Cython - My Bisection
time_start = time.time()
for i in range(loop_num):
tmp_df['vol5'] = cy_apply2(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values)
result_time.append((time.time() - time_start))
# 6. Multi Process
time_start = time.time()
for i in range(loop_num):
p = mp.Pool(processes=8)
split_dfs = np.array_split(df,8)
pool_results = p.map(pworker, split_dfs)
p.close()
p.join()
tmp_df = pd.concat(pool_results, axis=0)
result_time.append((time.time() - time_start))
# 7. ANN
model = load_model('./model.h5')
a = np.array([df['STRIKE'].values/df['F225_PRICE'].values])
b = np.array([np.ones_like(df['F225_PRICE'].values)*maturity/(40./245.)])
c = np.array([(df['OP_PRICE'].values/df['F225_PRICE'].values/0.25)**0.25])
X = np.vstack((a,b,c)).transpose()
time_start = time.time()
for i in range(loop_num):
tmp_df['vol7'] = 0.4*model.predict(X)+0.1
result_time.append((time.time() - time_start))
# 8. Swig C++
tmpmpmp=np.ones_like(df['F225_PRICE'].values).astype(np.float32)
time_start = time.time()
for i in range(loop_num):
tmpmpmp = Swig_Apply_PY(df['F225_PRICE'].values, df['STRIKE'].values,
df['OP_PRICE'].values, df['CALL'].values.astype(dtype=np.int32),tmpmpmp.shape[0])
tmp_df['vol8'] = tmpmpmp
result_time.append((time.time() - time_start))
result_time = np.array([result_time])
print(np.vstack((result_con, result_opt, result_Vect, result_Proc,result_time)).transpose())