import pandas import dateutil import re import datetime import os import numpy as np from pandas import DataFrame from scipy.interpolate import interp1d from scipy.stats import norm import quantlib.reference.names as nm import quantlib.pricingengines.blackformula from quantlib.pricingengines.blackformula import blackFormulaImpliedStdDev def Compute_IV(optionDataFrame, tMin=0, nMin=0, QDMin=0, QDMax=1, keepOTMData=True): """ Pre-processing of a standard European option quote file. - Calculation of implied risk-free rate and dividend yield - Calculation of implied volatility - Estimate ATM volatility for each expiry - Compute implied volatility and Quick Delta for each quote Options for filtering the input data set: - maturities with less than nMin strikes are ignored - maturities shorter than tMin (ACT/365 daycount) are ignored - strikes with Quick Delta < qdMin or > qdMax are ignored """ grouped = optionDataFrame.groupby(nm.EXPIRY_DATE) isFirst = True for spec, group in grouped: print('processing group %s' % spec) # implied vol for this type/expiry group indx = group.index dtTrade = group[nm.TRADE_DATE][indx[0]] dtExpiry = group[nm.EXPIRY_DATE][indx[0]] spot = group[nm.SPOT][indx[0]] daysToExpiry = (dtExpiry-dtTrade).days timeToMaturity = daysToExpiry/365.0 # exclude groups with too short time to maturity if timeToMaturity < tMin: continue # exclude groups with too few data points df_call = group[group[nm.OPTION_TYPE] == nm.CALL_OPTION] df_put = group[group[nm.OPTION_TYPE] == nm.PUT_OPTION] if (len(df_call) < nMin) | (len(df_put) < nMin): continue # calculate forward, implied interest rate and implied div. yield df_C = DataFrame((df_call['PBid']+df_call['PAsk'])/2, columns=['PremiumC']) df_C.index = df_call['Strike'].values df_P = DataFrame((df_put['PBid']+df_put['PAsk'])/2, columns=['PremiumP']) df_P.index = df_put['Strike'].values # use 'inner' join because some strikes are not quoted for C and P df_all = df_C.join(df_P, how='inner') df_all['Strike'] = df_all.index df_all['C-P'] = df_all['PremiumC'] - df_all['PremiumP'] y = np.array(df_all['C-P']) x = np.array(df_all['Strike']) A = np.vstack((x, np.ones(x.shape))).T b = np.linalg.lstsq(A, y)[0] # intercept is last coef iRate = -np.log(-b[0])/timeToMaturity dRate = np.log(spot/b[1])/timeToMaturity discountFactor = np.exp(-iRate*timeToMaturity) Fwd = spot * np.exp((iRate-dRate)*timeToMaturity) print('Fwd: %f int rate: %f div yield: %f' % (Fwd, iRate, dRate)) # mid-market ATM volatility def impvol(cp, strike, premium): try: vol = blackFormulaImpliedStdDev(cp, strike, forward=Fwd, blackPrice=premium, discount=discountFactor, TTM=timeToMaturity) except: vol = np.nan return vol/np.sqrt(timeToMaturity) # implied bid/ask vol for all options df_call['IVBid'] = [impvol('C', strike, price) for strike, price in zip(df_call['Strike'], df_call['PBid'])] df_call['IVAsk'] = [impvol('C', strike, price) for strike, price in zip(df_call['Strike'], df_call['PAsk'])] df_call['IVMid'] = (df_call['IVBid'] + df_call['IVAsk'])/2 df_put['IVBid'] = [impvol('P', strike, price) for strike, price in zip(df_put['Strike'], df_put['PBid'])] df_put['IVAsk'] = [impvol('P', strike, price) for strike, price in zip(df_put['Strike'], df_put['PAsk'])] df_put['IVMid'] = (df_put['IVBid'] + df_put['IVAsk'])/2 f_call = interp1d(df_call['Strike'].values, df_call['IVMid'].values) f_put = interp1d(df_put['Strike'].values, df_put['IVMid'].values) atmVol = (f_call(Fwd)+f_put(Fwd))/2 print('ATM vol: %f' % atmVol) # Quick Delta, computed with ATM vol rv = norm() df_call['QuickDelta'] = [rv.cdf(np.log(Fwd/strike)/(atmVol*np.sqrt(timeToMaturity))) \ for strike in df_call['Strike']] df_put['QuickDelta'] = [rv.cdf(np.log(Fwd/strike)/(atmVol*np.sqrt(timeToMaturity))) \ for strike in df_put['Strike']] # keep data within QD range df_call = df_call[(df_call['QuickDelta'] >= QDMin) & \ (df_call['QuickDelta'] <= QDMax) ] df_put = df_put[ (df_put['QuickDelta'] >= QDMin) & \ (df_put['QuickDelta'] <= QDMax) ] # final assembly... df_cp = df_call.append(df_put, ignore_index=True) df_cp[nm.INTEREST_RATE] = iRate df_cp[nm.DIVIDEND_YIELD] = dRate df_cp[nm.ATMVOL] = atmVol df_cp[nm.FORWARD] = Fwd # keep only OTM data ? if keepOTMData: df_cp = df_cp[((df_cp[nm.STRIKE]>=Fwd) & (df_cp[nm.OPTION_TYPE] == nm.CALL_OPTION)) | ((df_cp[nm.STRIKE]