14
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

インプライドボラティリティ計算を高速で行う (市場データ処理)

Last updated at Posted at 2017-02-15

インプライド・ボラティリティとかブラックショールズ方程式については、
Black–Scholes model - Wikipedia

使っているデータの読み込みなどについては
日経平均オプション理論価格等情報のPandas読み込み~整形まで - 夜間飛行

を参照のこと

してること

  • オプション価格からインプライド・ボラティリティの計算
  • 計算速度の比較(より正確には実行時間の比較)

ボラティリティ - Wikipedia

$\sigma $ に関する方程式($ C(K,T)$は $ \sigma $の関数であることに注意)
市場価格 $ =C(K,T) $
を解いて得られる $ \sigma $をインプライド・ボラティリティという。

の通り、C(K,T)が市場価格となるσを求める計算をするわけですが、C(K,T)が単純に解けないので、求根アルゴリズムなどを使って、インプライドボラティリティを求めるわけです。

そのうちの方法の一つは、Bisection method - Wikipediaですね。

計算条件

  • 2017/02/10の2017年3月限 日経225オプションデータ
  • 残存日数 19.75日
  • リスクフリーレート = 0.0%

実施項目

  • Scipy使った最適化計算 (数値解) / pandas.apply - Single Process
  • Scipy使った最適化計算 (数値解) / ndarrayスカラー関数最適化 x ループ : 関数のベクトル化 - Single Process
  • Scipy使った最適化計算 (数値解) / ndarrayベクトル関数最適化 : 関数のベクトル化 - Single Process
  • ndarray最適化Cython /ndarrayベクトル関数最適化- Single Process
  • ndarray最適化Cython /ndarrayスカラー関数最適化ベクトル- Single Process
  • Scipy使った最適化計算 (数値解) / pandas.apply + Multi Process
  • ANNを使った逆関数の学習⇒インファレンスエンジンとして利用(Keras)
  • Swig利用(C++実装)/シングルループ - Single Process

実行時間結果

Pythonで色々な工夫をするよりC++で実装したものが圧倒的に速くて、Cythonの努力なんだったの…と思えてくる。
しかもC++は未だスレッド並列化などの余力を残しての、このタイム…っ!

PythonのマルチプロセスはWindwosの場合だと毎回Kerasのライブラリを読みに行ってしまうため、ここが悪さしているのだけど、これをなくした個別実行バージョンでも35秒ほどかかってた。

ANNを使った逆関数は速いが精度が悪い。NNの設計でも限界があると思う。
(たぶんWGAN/StackGANみたいなことしないと無理)

項目 Data Handling Optimize BS_Vectorized Proc Time[sec]
Scipy使った最適化計算 (数値解) / pandas.apply - Single Process Pd.apply minimize_scalar(Scipy) No Single 2.8061
Scipy使った最適化計算 (数値解) / ndarrayスカラー関数最適化 x ループ : 関数のベクトル化 - Single Process Pd->np minimize_scalar(Scipy) Yes Single 3.2068
Scipy使った最適化計算 (数値解) / ndarrayベクトル関数最適化 : 関数のベクトル化 - Single Process Pd->np root(Scipy) Yes Single 0.6706
ndarray最適化Cython /ndarrayベクトル関数最適化- Single Process Pd->np(CyPy) root(Scipy) Yes Single 0.6860
ndarray最適化Cython /ndarrayスカラー関数最適化ベクトル- Single Process Pd->np(CyPy) VectBisection(Cython) Yes Single 0.4848
Scipy使った最適化計算 (数値解) / pandas.apply + Multi Process Pd->Split(Pdx8)->np(CyPy) VectBisection(Cython) Yes MultiProc 128.3856
ANNを使った逆関数の学習⇒インファレンスエンジンとして利用(Keras) Pd->np->ANN N/A Yes Single 0.1526
Swig利用(C++実装)/シングルループ - Single Process Pd->np->C++(Swig) Bisection(C++) No Single 0.0010

ソース

# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np

from keras.models import load_model


from datetime import datetime
import dateutil

import scipy.optimize
from scipy.stats import norm

# Cython版
from BS_Cy import *
# Swig版
from _BS import *

import multiprocessing as mp
import time
import matplotlib.pyplot as plt

maturity = 19.75/245.

def strip(text):
    try:
        return text.strip()
    except AttributeError:
        return text


def BS_norm(F225, Strike, maturity, vol, call):
    volSQM = vol * (maturity ** .5)
    d1 = np.log(F225/Strike) / volSQM + volSQM/2
    d2 = d1 - volSQM

    return (F225   * norm.cdf(d1) - Strike*norm.cdf(d2)) if call else \
            (Strike * norm.cdf(-d2) - F225*norm.cdf(-d1))

def BS_norm_vect(F225, Strike, maturity, vol, call):
    volSQM = vol * (maturity ** .5)
    d1 = np.log(F225/Strike) / volSQM + volSQM/2
    d2 = d1 - volSQM

    Call = F225   * norm.cdf(d1) - Strike*norm.cdf(d2)
    Put  = Strike * norm.cdf(-d2) - F225*norm.cdf(-d1)
    premium = Call * call + Put * np.logical_not(call)
    return premium
    
def myapply(x):
    res = scipy.optimize.minimize_scalar(
            lambda vol:
            (
                BS_norm(x['F225_PRICE'], x['STRIKE'], maturity, vol, x['CALL'])
                                - x['OP_PRICE']
            )**2, 
            method='Bounded', bounds =(0.01, 0.5),
            options={'maxiter': 50, 'xatol': 1e-04})
    x['vol1'] = res.x
    return x
   
def myapply_vect(F225, Strike, price, call):
    x = []
    for i in range(len(F225)):
        res = scipy.optimize.minimize_scalar(
            lambda vol:
            ( BS_norm_vect(F225[i], Strike[i], maturity, vol,call[i]) - price[i] )**2, 
            method='Bounded', bounds =(0.01, 0.5),
            options={'maxiter': 50, 'xatol': 1e-04})
        x.append(res.x)
    return x

def myapply_vect2(F225, Strike, price, call):
    res = scipy.optimize.root(
        lambda vol:( BS_norm_vect(F225, Strike, maturity, vol,call) - price), 
        np.ones_like(F225)*.3)
    return res.x

def pworker(df):
    df['vol6'] = cy_apply2(df['F225_PRICE'].values, df['STRIKE'].values, 
                              df['OP_PRICE'].values,   df['CALL'].values)
    return df

if __name__ == '__main__':
    # # データ準備
    # [オプション理論価格等情報 \| 日本取引所グループ](http://www.jpx.co.jp/markets/derivatives/option-price/01.html)から入手した2017年2月10日の終値データ。
    # 色々入っているけど2017年3月限の原資産:日経225のオプション取引データだけ使う。
    # 
    # ## Pandasへデータ読み込み
    # 1. データ本体と別に配布されているヘッダー情報をもとに自分でヘッダーを起こして名前をつける
    # 2. 読み込み中にテキストデータの空白をトリムする

    # ヘッダー名 (変数名順)
    #     商品コード, 商品タイプ, 限月, 権利行使価格, 予備
    #     プットオプション: 銘柄コード, 終値, 予備, 理論価格, ボラティリティ
    #     コールオプション: 銘柄コード, 終値, 予備, 理論価格, ボラティリティ
    #     原資産終値, 基準ボラティリティ
    colName = ("CODE","TYPE","MATURITY","STRIKE", "RSV", 
               "PUT_CODE", "PUT_PRICE", "PUT_RSV", "PUT_TPRICE", "PUT_VOLATILITY",
               "CALL_CODE","CALL_PRICE","CALL_RSV","CALL_TPRICE","CALL_VOLATILITY",
               "F225_PRICE", "Base_VOL")

    df = pd.read_csv('./ose20170210tp.csv',names=colName,
                     converters = {'CODE' : strip,
                                   'TYPE' : strip})
    # 2017年3月限の日経225オプションだけ抜き出し。ついでに要らない列を削除してスッキリ。
    df = df.query("MATURITY == 201703 & CODE==\"NK225E\"")    .drop(['RSV','PUT_RSV','CALL_RSV','PUT_CODE','CALL_CODE','CODE','TYPE','MATURITY'], 1)

    # * PUTとCALLが分かれてしまっているので、データを正規化。
    # * CALL列がTRUEのときCALLのデータ、FALSEのとき、PUTのデータとする。
    # * 行使価格も14000円以上、22000円未満に絞る
    df_p = df[["STRIKE","PUT_PRICE","PUT_TPRICE", "PUT_VOLATILITY","F225_PRICE", "Base_VOL"]]    .rename(columns={'PUT_PRICE': 'OP_PRICE', 'PUT_TPRICE':'OP_TPRICE', 'PUT_VOLATILITY':'OP_VOL'})
    df_p['CALL'] = False
    df_c = df[["STRIKE","CALL_PRICE","CALL_TPRICE", "CALL_VOLATILITY","F225_PRICE", "Base_VOL"]]    .rename(columns={'CALL_PRICE': 'OP_PRICE', 'CALL_TPRICE':'OP_TPRICE', 'CALL_VOLATILITY':'OP_VOL'})
    df_c['CALL'] = True
    df = df_p.append(df_c).query("OP_PRICE > 1.0 & STRIKE < 22000 & STRIKE >= 14000")
    del (df_p,df_c)

    tmp_df = df
    loop_num = 10
    text = 'Time elapsed: %.2f seconds'

    result_time = []
    result_Col  = np.array([["Data Handling","Optimize","BS_Vectorized","Proc","Time[sec]"]])
    result_con  = np.array([["Pd.apply",
                   "Pd->np",
                   "Pd->np",
                   "Pd->np(CyPy)",
                   "Pd->np(CyPy)",
                   "Pd->Split(Pd x 8)->np(CyPy)",
                   "Pd->np->ANN",
                   "Pd->np->C++(Swig)"
                   ]])
    result_opt  = np.array([["minimize_scalar(Scipy)",
                   "minimize_scalar(Scipy)",
                   "root(Scipy)",
                   "root(Scipy)",
                   "Vect Bisection(Cython)",
                   "Vect Bisection(Cython)",
                   "N/A",
                   "Bisection(C++)"
                    ]])
    result_Vect = np.array([["No",
                   "Yes",
                   "Yes",
                   "Yes",
                   "Yes",
                   "Yes",
                   "Yes",
                   "No"
                    ]])
    result_Proc = np.array([["Single",
                   "Single",
                   "Single",
                   "Single",
                   "Single",
                   "Multi Proc",
                   "Single",
                   "Single"
                    ]])
    
    # 1. Scipy使った最適化計算 (数値解) / pandas.apply - Single Process
    time_start = time.time()
    for i in range(loop_num):
        tmp_df = df.apply(myapply, axis=1)
    result_time.append((time.time() - time_start))


    # 2. Scipy使った最適化計算 (数値解) / ndarrayループ : 関数のベクトル化 - Single Process
    time_start = time.time()
    for i in range(loop_num):
        tmp_df['vol2'] = myapply_vect(df['F225_PRICE'].values, df['STRIKE'].values, 
                              df['OP_PRICE'].values,   df['CALL'].values)
    result_time.append((time.time() - time_start))

    # 3. Scipy使った最適化計算 (数値解) / ndarrayベクトル : 関数のベクトル化 - Single Process
    time_start = time.time()
    for i in range(loop_num):
        tmp_df['vol3'] = myapply_vect2(df['F225_PRICE'].values, df['STRIKE'].values, 
                              df['OP_PRICE'].values,   df['CALL'].values)
    result_time.append((time.time() - time_start))
    
    # 4. Cython - Root
    time_start = time.time()
    for i in range(loop_num):
        tmp_df['vol4'] = cy_apply1(df['F225_PRICE'].values, df['STRIKE'].values, 
                              df['OP_PRICE'].values,   df['CALL'].values)
    result_time.append((time.time() - time_start))

    # 5. Cython - My Bisection
    time_start = time.time()
    for i in range(loop_num):
        tmp_df['vol5'] = cy_apply2(df['F225_PRICE'].values, df['STRIKE'].values, 
                              df['OP_PRICE'].values,   df['CALL'].values)
    result_time.append((time.time() - time_start))

    # 6. Multi Process
    time_start = time.time()
    for i in range(loop_num):
        p = mp.Pool(processes=8)
        split_dfs = np.array_split(df,8)
        pool_results = p.map(pworker, split_dfs)
        p.close()
        p.join()

        tmp_df = pd.concat(pool_results, axis=0)
    result_time.append((time.time() - time_start))

    # 7. ANN
    model = load_model('./model.h5')
    
    a = np.array([df['STRIKE'].values/df['F225_PRICE'].values])
    b = np.array([np.ones_like(df['F225_PRICE'].values)*maturity/(40./245.)])
    c = np.array([(df['OP_PRICE'].values/df['F225_PRICE'].values/0.25)**0.25])
    
    X = np.vstack((a,b,c)).transpose()

    time_start = time.time()
    for i in range(loop_num):
        tmp_df['vol7'] = 0.4*model.predict(X)+0.1
    result_time.append((time.time() - time_start))    
        
    # 8. Swig C++
    tmpmpmp=np.ones_like(df['F225_PRICE'].values).astype(np.float32)
    time_start = time.time()
    for i in range(loop_num):
        tmpmpmp = Swig_Apply_PY(df['F225_PRICE'].values, df['STRIKE'].values, 
                              df['OP_PRICE'].values,  df['CALL'].values.astype(dtype=np.int32),tmpmpmp.shape[0])
        tmp_df['vol8'] =  tmpmpmp
    result_time.append((time.time() - time_start))

    result_time = np.array([result_time])
    print(np.vstack((result_con, result_opt, result_Vect, result_Proc,result_time)).transpose())
14
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?