import numpy as np import pandas from pandas import DataFrame import datetime from quantlib.models.equity.heston_model import ( HestonModelHelper, HestonModel) from quantlib.models.calibration_helper import ImpliedVolError from quantlib.processes.heston_process import HestonProcess from quantlib.pricingengines.api import AnalyticHestonEngine from quantlib.math.optimization import LevenbergMarquardt, EndCriteria from quantlib.settings import Settings from quantlib.time.api import Period, Date, Actual365Fixed, TARGET, Days from quantlib.quotes import SimpleQuote from quantlib.termstructures.yields.zero_curve import ZeroCurve from quantlib.util.converter import pydate_to_qldate, df_to_zero_curve import quantlib.reference.names as nm def heston_helpers(df_option, df_rates): """ Create array of heston options helpers """ trade_date = df_option[nm.TRADE_DATE].ix[0] settlement_date = pydate_to_qldate(trade_date) settings = Settings() settings.evaluation_date = settlement_date calendar = TARGET() # convert data frame (date/value) into zero curve # expect the index to be a date, and 1 column of values risk_free_ts = df_to_zero_curve(df_rates['iRate'], trade_date) dividend_ts = df_to_zero_curve(df_rates[nm.DIVIDEND_YIELD], trade_date) # loop through rows in option data frame, construct # helpers for bid/ask options = [] for index, row in df_option.T.iteritems(): strike = row[nm.STRIKE] spot = row[nm.SPOT] if (strike/spot > 1.3) | (strike/spot < .7): continue expiry_date = row[nm.EXPIRY_DATE] days = (expiry_date - trade_date).days maturity = Period(days, Days) options.append( HestonModelHelper( maturity, calendar, spot, strike, SimpleQuote(row['IVBid']), risk_free_ts, dividend_ts, ImpliedVolError)) options.append( HestonModelHelper( maturity, calendar, spot, strike, SimpleQuote(row['IVAsk']), risk_free_ts, dividend_ts, ImpliedVolError)) return {'options':options, 'spot': spot} def merge_df(df_option, options, model_name): df_output = DataFrame.filter(df_option, items=['dtTrade', 'dtExpiry', 'Type', 'Strike', 'Mid', 'QuickDelta', 'IVBid', 'IVAsk', 'iRate', 'dRate', 'ATMVol', 'Fwd']) model_value = np.zeros(len(df_option)) model_iv = np.zeros(len(df_option)) for i, j in zip(range(len(df_option)), range(0, len(options),2)): model_value[i] = options[j].model_value() model_iv[i] = options[j].impliedVolatility(model_value[i], accuracy=1.e-5, maxEvaluations=5000, minVol=.01, maxVol=10.0) df_output[model_name + '-Value'] = model_value df_output[model_name + '-IV'] = model_iv return df_output def heston_calibration(df_option, ival=None): """ calibrate heston model """ # extract rates and div yields from the data set df_tmp = DataFrame.filter(df_option, items=['dtExpiry', 'iRate', nm.DIVIDEND_YIELD]) grouped = df_tmp.groupby('dtExpiry') def aggregate(serie): return serie[serie.index[0]] df_rates = grouped.agg(aggregate) trade_date = df_option[nm.TRADE_DATE][0] expiry_date = df_option[nm.EXPIRY_DATE][0] spot = df_option[nm.SPOT][0] # build array of option helpers hh = heston_helpers(df_option, df_rates) options = hh['options'] spot = hh['spot'] risk_free_ts = df_to_zero_curve(df_rates[nm.INTEREST_RATE], trade_date) dividend_ts = df_to_zero_curve(df_rates[nm.DIVIDEND_YIELD], trade_date) # initial values for parameters if ival is None: ival = {'v0': 0.1, 'kappa': 1.0, 'theta': 0.1, 'sigma': 0.5, 'rho': -.5} process = HestonProcess( risk_free_ts, dividend_ts, SimpleQuote(spot), ival['v0'], ival['kappa'], ival['theta'], ival['sigma'], ival['rho']) model = HestonModel(process) engine = AnalyticHestonEngine(model, 64) for option in options: option.set_pricing_engine(engine) om = LevenbergMarquardt(1e-8, 1e-8, 1e-8) model.calibrate( options, om, EndCriteria(400, 40, 1.0e-8, 1.0e-8, 1.0e-8) ) print('model calibration results:') print('v0: %f kappa: %f theta: %f sigma: %f rho: %f' % (model.v0, model.kappa, model.theta, model.sigma, model.rho)) calib_error = (1.0/len(options)) * sum( [pow(o.calibration_error()*100.0,2) for o in options]) print('SSE: %f' % calib_error) # merge the fitted volatility and the input data set return merge_df(df_option, options, 'Heston') df_options = pandas.read_pickle('df_options_SPX_24jan2011.pkl') df_heston_cal = heston_calibration(df_options) def calibration_subplot(ax, group, i, model_name): group = group.sort_index(by='Strike') dtExpiry = group.get_value(group.index[0], 'dtExpiry') K = group['Strike'] VB = group['IVBid'] VA = group['IVAsk'] VM = group[model_name + '-IV'] ax.plot(K, VA, 'b.', K,VB,'b.', K,VM,'r-') if i==3: ax.set_xlabel('Strike') if i==0: ax.set_ylabel('Implied Vol') ax.text(.6,.8,'%s' % dtExpiry, transform=ax.transAxes) def calibration_plot(title, df_calibration, model_name): df_calibration = DataFrame.filter(df_calibration, items=['dtExpiry', 'Strike', 'IVBid', 'IVAsk', 'TTM', model_name+'-IV']) # group by maturity grouped = df_calibration.groupby('dtExpiry') all_groups = [(dt, g) for dt, g in grouped] xy = [(0,0), (0,1), (1,0), (1,1)] for k in range(0, len(all_groups),4): if (k+4) >= len(all_groups): break plt.figure() fig, axs = plt.subplots(2, 2, sharex=True, sharey=True) for i in range(4): x,y = xy[i] calibration_subplot(axs[x,y], all_groups[i+k][1],i, model_name) dtTrade = df_options['dtTrade'][0] title = 'Heston Model (%s)' % dtTrade calibration_plot(title, df_heston_cal, 'Heston')