#!/usr/bin/env python # coding: utf-8 # #Swaption Pricing : Monte-Carlo Methods # *Copyright (c) 2015 Matthias Groncki* # # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # - Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # This disclaimer is taken from the QuantLib license # In[103]: # import the used libraries import numpy as np import matplotlib.pyplot as plt import QuantLib as ql get_ipython().run_line_magic('matplotlib', 'inline') # In[104]: # Setting evaluation date today = ql.Date(7,4,2015) ql.Settings.instance().setEvaluationDate(today) # In[105]: # Setup Marketdata rate = ql.SimpleQuote(0.03) rate_handle = ql.QuoteHandle(rate) dc = ql.Actual365Fixed() yts = ql.FlatForward(today, rate_handle, dc) yts.enableExtrapolation() hyts = ql.RelinkableYieldTermStructureHandle(yts) t0_curve = ql.YieldTermStructureHandle(yts) euribor6m = ql.Euribor6M(hyts) cal = ql.TARGET() # In[106]: # Setup a dummy portfolio with two Swaps def makeSwap(start, maturity, nominal, fixedRate, index, typ=ql.VanillaSwap.Payer): """ creates a plain vanilla swap with fixedLegTenor 1Y parameter: start (ql.Date) : Start Date maturity (ql.Period) : SwapTenor nominal (float) : Nominal fixedRate (float) : rate paid on fixed leg index (ql.IborIndex) : Index return: tuple(ql.Swap, list) Swap and all fixing dates """ end = ql.TARGET().advance(start, maturity) fixedLegTenor = ql.Period("1y") fixedLegBDC = ql.ModifiedFollowing fixedLegDC = ql.Thirty360(ql.Thirty360.BondBasis) spread = 0.0 fixedSchedule = ql.Schedule(start, end, fixedLegTenor, index.fixingCalendar(), fixedLegBDC, fixedLegBDC, ql.DateGeneration.Backward, False) floatSchedule = ql.Schedule(start, end, index.tenor(), index.fixingCalendar(), index.businessDayConvention(), index.businessDayConvention(), ql.DateGeneration.Backward, False) swap = ql.VanillaSwap(typ, nominal, fixedSchedule, fixedRate, fixedLegDC, floatSchedule, index, spread, index.dayCounter()) return swap, [index.fixingDate(x) for x in floatSchedule][:-1] def makeSwaption(swap, callDates, settlement): if len(callDates) == 1: exercise = ql.EuropeanExercise(callDates[0]) else: exercise = ql.BermudanExercise(callDates) return ql.Swaption(swap, exercise, settlement) # ## Lets start with an european swaption # Create a forward starting plain vanilla swap using the helper function above and create a european swaption which allows us to to enter the swap in one year from today. # In[107]: settlementDate = today + ql.Period("1Y") swaps = [makeSwap(settlementDate, ql.Period("5Y"), 1e6, 0.03047096, euribor6m) ] calldates = [euribor6m.fixingDate(settlementDate)] swaptions = [makeSwaption(swap, calldates, ql.Settlement.Physical) for swap, fd in swaps] # Pricing of the underlying at time 0 using the QuantLib pricing engine # In[108]: #%%timeit # Setup pricing engine and calculate the npv of the underlying swap engine = ql.DiscountingSwapEngine(hyts) for swap, fixingDates in swaps: swap.setPricingEngine(engine) print("Swap NPV at time 0: %.4f" % swap.NPV()) # ###Setup the Gaussian Shortrate model (a.k.a Hull White model) # # Don't worry about calibration, assume we know the calbriated model parameter # In[109]: # Stochastic Process # In[1]: # Assume the model is already calibrated either historical or market implied volas = [ql.QuoteHandle(ql.SimpleQuote(0.0075)), ql.QuoteHandle(ql.SimpleQuote(0.0075))] meanRev = [ql.QuoteHandle(ql.SimpleQuote(0.002))] model = ql.Gsr(t0_curve, [today+100], volas, meanRev, 16.) process = model.stateProcess() # ###Calculate the swaption price using an integral pricing engine # In[111]: swaptionEngine = ql.Gaussian1dSwaptionEngine(model) # In[112]: for swaption in swaptions: swaption.setPricingEngine(swaptionEngine) print("Swaption NPV : %.2f" % swaption.NPV()) # ###Pricing with an Monte Carlo method # ####Create a swap path pricer in Python # # Convert all Dates into times in years (using the same DayCounter as in the yieldTermStructure and store all fix cashflows in a numpy array. # In[113]: mcDC = yts.dayCounter() def timeFromReferenceFactory(daycounter, ref): """ returns a function, that calculate the time in years from a the reference date *ref* to date *dat* with respect to the given DayCountConvention *daycounter* Parameter: dayCounter (ql.DayCounter) ref (ql.Date) Return: f(np.array(ql.Date)) -> np.array(float) """ def impl(dat): return daycounter.yearFraction(ref, dat) return np.vectorize(impl) timeFromReference = timeFromReferenceFactory(mcDC, today) # In[114]: fixed_leg = swap.leg(0) n = len(fixed_leg) fixed_times = np.zeros(n) fixed_amounts = np.zeros(n) for i in range(n): cf = fixed_leg[i] fixed_times[i] = timeFromReference(cf.date()) fixed_amounts[i] = cf.amount() # In[115]: float_leg = swap.leg(1) n = len(float_leg) float_times = np.zeros(n) float_dcf = np.zeros(n) accrual_start_time = np.zeros(n) accrual_end_time = np.zeros(n) nominals = np.zeros(n) for i in range(n): # convert base classiborstart_idx Cashflow to # FloatingRateCoupon cf = ql.as_floating_rate_coupon(float_leg[i]) iborIndex = cf.index() value_date = cf.referencePeriodStart() index_mat = cf.referencePeriodEnd() # year fraction float_dcf[i] = cf.accrualPeriod() # calculate the start and end time accrual_start_time[i] = timeFromReference(value_date) accrual_end_time[i] = timeFromReference(index_mat) # payment time float_times[i] = timeFromReference(cf.date()) # nominals nominals[i] = cf.nominal() # In[116]: # Store all times for which we need a discount factor in one array df_times = np.concatenate([fixed_times, accrual_start_time, accrual_end_time, float_times]) df_times = np.unique(df_times) # In[117]: # Store indices of fix leg payment times in # the df_times array fix_idx = np.in1d(df_times, fixed_times, True) fix_idx = fix_idx.nonzero() # Indices of the floating leg payment times # in the df_times array float_idx = np.in1d(df_times, float_times, True) float_idx = float_idx.nonzero() # Indices of the accrual start and end time # in the df_times array accrual_start_idx = np.in1d(df_times, accrual_start_time, True) accrual_start_idx = accrual_start_idx.nonzero() accrual_end_idx = np.in1d(df_times, accrual_end_time, True) accrual_end_idx = accrual_end_idx.nonzero() # In[118]: # Convert call date to time callTimes = timeFromReference(calldates) # ####Test the t0 Swap Pricing # In[119]: t = 0 x_t = 0 # Calculate all discount factors discount = np.vectorize(lambda T: model.zerobond(T, t, x_t)) dfs = discount(df_times) # Calculate fixed leg npv fix_leg_npv = np.sum(fixed_amounts * dfs[fix_idx]) # Estimate the index fixings index_fixings = (dfs[accrual_start_idx] / dfs[accrual_end_idx] - 1) index_fixings /= float_dcf # Calculate the floating leg npv float_leg_npv = np.sum(nominals * index_fixings * float_dcf * dfs[float_idx]) npv = float_leg_npv - fix_leg_npv # In[120]: npv print("Swap NPV at time 0: %.4f" % npv) print("Error : %.8f" % (npv - swap.NPV())) # ###Monte Carlo Simulation # # #### Generate time grid and paths # In[121]: # Define evaluation grid date_grid = [today] + calldates date_grid = np.unique(np.sort(date_grid)) time_grid = np.vectorize(lambda x: ql.Actual365Fixed().yearFraction(today, x))(date_grid) dt = time_grid[1:] - time_grid[:-1] # In[122]: # Random number generator seed = 1 urng = ql.MersenneTwisterUniformRng(seed) usrg = ql.MersenneTwisterUniformRsg(len(time_grid)-1,urng) generator = ql.InvCumulativeMersenneTwisterGaussianRsg(usrg) # In[ ]: #%%timeit # Generate N paths M = 100000 m = len(time_grid) x = np.zeros((M, m)) y = np.zeros((M, m)) numeraires = np.zeros((M,m)) dfs = np.zeros((M, m, len(df_times))) for n in range(0,M): numeraires[n, 0] = model.numeraire(0, 0) for n in range(0,M): dWs = generator.nextSequence().value() for i in range(1, len(time_grid)): t0 = time_grid[i-1] t1 = time_grid[i] e = process.expectation(t0, x[n,i-1], dt[i-1]) std = process.stdDeviation(t0, x[n,i-1], dt[i-1]) x[n,i] = e + dWs[i-1] * std e_0_0 = process.expectation(0,0,t1) std_0_0 = process.stdDeviation(0,0,t1) y[n,i] = (x[n,i] - e_0_0) / std_0_0 numeraires[n ,i] = model.numeraire(t1, y[n, i]) dfs[n,i] = np.vectorize(lambda T : model.zerobond(T, t1, y[n,i]))(df_times) # In[136]: # plot the paths #for i in range(0, 1000): # plt.plot(time_grid, x[i,:]) # In[ ]: #%%timeit # Calculate NPV of the underlying swap at expiry index_fixings = dfs[:,-1, accrual_start_idx][:,0,:] / dfs[:, -1, accrual_end_idx][:,0,:] - 1 index_fixings /= float_dcf floatLeg_npv = np.sum(index_fixings * float_dcf * dfs[:,-1, float_idx][:,0,:] * nominals, axis = 1) fixedLeg_npv = np.sum(fixed_amounts * dfs[:, -1, fix_idx][:,0,:], axis=1) npv = (floatLeg_npv - fixedLeg_npv) # Apply payoff function npv[npv < 0] = 0 # Deflate NPV npv = npv / numeraires[:,-1] npv = np.mean(npv) * numeraires[0,0] # In[ ]: print("MC NPV: %.4f" % npv) print("Error : %.4f " % (npv-swaption.NPV())) # In[ ]: