Copyright (c) 2015 Matthias Groncki
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
This disclaimer is taken from the QuantLib license
# import the used libraries
import numpy as np
import matplotlib.pyplot as plt
import QuantLib as ql
%matplotlib inline
# Setting evaluation date
today = ql.Date(7,4,2015)
ql.Settings.instance().setEvaluationDate(today)
# Setup Marketdata
rate = ql.SimpleQuote(0.03)
rate_handle = ql.QuoteHandle(rate)
dc = ql.Actual365Fixed()
yts = ql.FlatForward(today, rate_handle, dc)
yts.enableExtrapolation()
hyts = ql.RelinkableYieldTermStructureHandle(yts)
t0_curve = ql.YieldTermStructureHandle(yts)
euribor6m = ql.Euribor6M(hyts)
cal = ql.TARGET()
# Setup a dummy portfolio with two Swaps
def makeSwap(start, maturity, nominal, fixedRate, index, typ=ql.VanillaSwap.Payer):
"""
creates a plain vanilla swap with fixedLegTenor 1Y
parameter:
start (ql.Date) : Start Date
maturity (ql.Period) : SwapTenor
nominal (float) : Nominal
fixedRate (float) : rate paid on fixed leg
index (ql.IborIndex) : Index
return: tuple(ql.Swap, list<Dates>) Swap and all fixing dates
"""
end = ql.TARGET().advance(start, maturity)
fixedLegTenor = ql.Period("1y")
fixedLegBDC = ql.ModifiedFollowing
fixedLegDC = ql.Thirty360(ql.Thirty360.BondBasis)
spread = 0.0
fixedSchedule = ql.Schedule(start,
end,
fixedLegTenor,
index.fixingCalendar(),
fixedLegBDC,
fixedLegBDC,
ql.DateGeneration.Backward,
False)
floatSchedule = ql.Schedule(start,
end,
index.tenor(),
index.fixingCalendar(),
index.businessDayConvention(),
index.businessDayConvention(),
ql.DateGeneration.Backward,
False)
swap = ql.VanillaSwap(typ,
nominal,
fixedSchedule,
fixedRate,
fixedLegDC,
floatSchedule,
index,
spread,
index.dayCounter())
return swap, [index.fixingDate(x) for x in floatSchedule][:-1]
def makeSwaption(swap, callDates, settlement):
if len(callDates) == 1:
exercise = ql.EuropeanExercise(callDates[0])
else:
exercise = ql.BermudanExercise(callDates)
return ql.Swaption(swap, exercise, settlement)
Create a forward starting plain vanilla swap using the helper function above and create a european swaption which allows us to to enter the swap in one year from today.
settlementDate = today + ql.Period("1Y")
swaps = [makeSwap(settlementDate,
ql.Period("5Y"),
1e6,
0.03047096,
euribor6m)
]
calldates = [euribor6m.fixingDate(settlementDate)]
swaptions = [makeSwaption(swap,
calldates,
ql.Settlement.Physical)
for swap, fd in swaps]
Pricing of the underlying at time 0 using the QuantLib pricing engine
#%%timeit
# Setup pricing engine and calculate the npv of the underlying swap
engine = ql.DiscountingSwapEngine(hyts)
for swap, fixingDates in swaps:
swap.setPricingEngine(engine)
print("Swap NPV at time 0: %.4f" % swap.NPV())
Swap NPV at time 0: 0.0091
Don't worry about calibration, assume we know the calbriated model parameter
# Stochastic Process
# Assume the model is already calibrated either historical or market implied
volas = [ql.QuoteHandle(ql.SimpleQuote(0.0075)),
ql.QuoteHandle(ql.SimpleQuote(0.0075))]
meanRev = [ql.QuoteHandle(ql.SimpleQuote(0.002))]
model = ql.Gsr(t0_curve, [today+100], volas, meanRev, 16.)
process = model.stateProcess()
--------------------------------------------------------------------------- NameError Traceback (most recent call last) <ipython-input-1-199b3d41ba3e> in <module>() 1 # Assume the model is already calibrated either historical or market implied ----> 2 volas = [ql.QuoteHandle(ql.SimpleQuote(0.0075)), 3 ql.QuoteHandle(ql.SimpleQuote(0.0075))] 4 meanRev = [ql.QuoteHandle(ql.SimpleQuote(0.002))] 5 model = ql.Gsr(t0_curve, [today+100], volas, meanRev, 16.) NameError: name 'ql' is not defined
swaptionEngine = ql.Gaussian1dSwaptionEngine(model)
for swaption in swaptions:
swaption.setPricingEngine(swaptionEngine)
print("Swaption NPV : %.2f" % swaption.NPV())
Swaption NPV : 12898.40
Convert all Dates into times in years (using the same DayCounter as in the yieldTermStructure and store all fix cashflows in a numpy array.
mcDC = yts.dayCounter()
def timeFromReferenceFactory(daycounter, ref):
"""
returns a function, that calculate the time in years
from a the reference date *ref* to date *dat*
with respect to the given DayCountConvention *daycounter*
Parameter:
dayCounter (ql.DayCounter)
ref (ql.Date)
Return:
f(np.array(ql.Date)) -> np.array(float)
"""
def impl(dat):
return daycounter.yearFraction(ref, dat)
return np.vectorize(impl)
timeFromReference = timeFromReferenceFactory(mcDC, today)
fixed_leg = swap.leg(0)
n = len(fixed_leg)
fixed_times = np.zeros(n)
fixed_amounts = np.zeros(n)
for i in range(n):
cf = fixed_leg[i]
fixed_times[i] = timeFromReference(cf.date())
fixed_amounts[i] = cf.amount()
float_leg = swap.leg(1)
n = len(float_leg)
float_times = np.zeros(n)
float_dcf = np.zeros(n)
accrual_start_time = np.zeros(n)
accrual_end_time = np.zeros(n)
nominals = np.zeros(n)
for i in range(n):
# convert base classiborstart_idx Cashflow to
# FloatingRateCoupon
cf = ql.as_floating_rate_coupon(float_leg[i])
iborIndex = cf.index()
value_date = cf.referencePeriodStart()
index_mat = cf.referencePeriodEnd()
# year fraction
float_dcf[i] = cf.accrualPeriod()
# calculate the start and end time
accrual_start_time[i] = timeFromReference(value_date)
accrual_end_time[i] = timeFromReference(index_mat)
# payment time
float_times[i] = timeFromReference(cf.date())
# nominals
nominals[i] = cf.nominal()
# Store all times for which we need a discount factor in one array
df_times = np.concatenate([fixed_times,
accrual_start_time,
accrual_end_time,
float_times])
df_times = np.unique(df_times)
# Store indices of fix leg payment times in
# the df_times array
fix_idx = np.in1d(df_times, fixed_times, True)
fix_idx = fix_idx.nonzero()
# Indices of the floating leg payment times
# in the df_times array
float_idx = np.in1d(df_times, float_times, True)
float_idx = float_idx.nonzero()
# Indices of the accrual start and end time
# in the df_times array
accrual_start_idx = np.in1d(df_times, accrual_start_time, True)
accrual_start_idx = accrual_start_idx.nonzero()
accrual_end_idx = np.in1d(df_times, accrual_end_time, True)
accrual_end_idx = accrual_end_idx.nonzero()
# Convert call date to time
callTimes = timeFromReference(calldates)
t = 0
x_t = 0
# Calculate all discount factors
discount = np.vectorize(lambda T: model.zerobond(T, t, x_t))
dfs = discount(df_times)
# Calculate fixed leg npv
fix_leg_npv = np.sum(fixed_amounts * dfs[fix_idx])
# Estimate the index fixings
index_fixings = (dfs[accrual_start_idx] / dfs[accrual_end_idx] - 1)
index_fixings /= float_dcf
# Calculate the floating leg npv
float_leg_npv = np.sum(nominals * index_fixings * float_dcf * dfs[float_idx])
npv = float_leg_npv - fix_leg_npv
npv
print("Swap NPV at time 0: %.4f" % npv)
print("Error : %.8f" % (npv - swap.NPV()))
Swap NPV at time 0: 0.0091 Error : 0.00000000
# Define evaluation grid
date_grid = [today] + calldates
date_grid = np.unique(np.sort(date_grid))
time_grid = np.vectorize(lambda x: ql.Actual365Fixed().yearFraction(today, x))(date_grid)
dt = time_grid[1:] - time_grid[:-1]
# Random number generator
seed = 1
urng = ql.MersenneTwisterUniformRng(seed)
usrg = ql.MersenneTwisterUniformRsg(len(time_grid)-1,urng)
generator = ql.InvCumulativeMersenneTwisterGaussianRsg(usrg)
#%%timeit
# Generate N paths
M = 100000
m = len(time_grid)
x = np.zeros((M, m))
y = np.zeros((M, m))
numeraires = np.zeros((M,m))
dfs = np.zeros((M, m, len(df_times)))
for n in range(0,M):
numeraires[n, 0] = model.numeraire(0, 0)
for n in range(0,M):
dWs = generator.nextSequence().value()
for i in range(1, len(time_grid)):
t0 = time_grid[i-1]
t1 = time_grid[i]
e = process.expectation(t0,
x[n,i-1],
dt[i-1])
std = process.stdDeviation(t0,
x[n,i-1],
dt[i-1])
x[n,i] = e + dWs[i-1] * std
e_0_0 = process.expectation(0,0,t1)
std_0_0 = process.stdDeviation(0,0,t1)
y[n,i] = (x[n,i] - e_0_0) / std_0_0
numeraires[n ,i] = model.numeraire(t1, y[n, i])
dfs[n,i] = np.vectorize(lambda T : model.zerobond(T, t1, y[n,i]))(df_times)
# plot the paths
#for i in range(0, 1000):
# plt.plot(time_grid, x[i,:])
#%%timeit
# Calculate NPV of the underlying swap at expiry
index_fixings = dfs[:,-1, accrual_start_idx][:,0,:] / dfs[:, -1, accrual_end_idx][:,0,:] - 1
index_fixings /= float_dcf
floatLeg_npv = np.sum(index_fixings * float_dcf * dfs[:,-1, float_idx][:,0,:] * nominals,
axis = 1)
fixedLeg_npv = np.sum(fixed_amounts * dfs[:, -1, fix_idx][:,0,:], axis=1)
npv = (floatLeg_npv - fixedLeg_npv)
# Apply payoff function
npv[npv < 0] = 0
# Deflate NPV
npv = npv / numeraires[:,-1]
npv = np.mean(npv) * numeraires[0,0]
print("MC NPV: %.4f" % npv)
print("Error : %.4f " % (npv-swaption.NPV()))