!pip install --upgrade quantecon import matplotlib.pyplot as plt %matplotlib inline from scipy.optimize import fsolve, fmin import numpy as np class CRRAutility: def __init__(self, β=0.9, σ=2, γ=2, π=0.5*np.ones((2, 2)), G=np.array([0.1, 0.2]), Θ=np.ones(2), transfers=False): self.β, self.σ, self.γ = β, σ, γ self.π, self.G, self.Θ, self.transfers = π, G, Θ, transfers # Utility function def U(self, c, n): σ = self.σ if σ == 1.: U = np.log(c) else: U = (c**(1 - σ) - 1) / (1 - σ) return U - n**(1 + self.γ) / (1 + self.γ) # Derivatives of utility function def Uc(self, c, n): return c**(-self.σ) def Ucc(self, c, n): return -self.σ * c**(-self.σ - 1) def Un(self, c, n): return -n**self.γ def Unn(self, c, n): return -self.γ * n**(self.γ - 1) import numpy as np from scipy.optimize import root from quantecon import MarkovChain class SequentialAllocation: ''' Class that takes CESutility or BGPutility object as input returns planner's allocation as a function of the multiplier on the implementability constraint μ. ''' def __init__(self, model): # Initialize from model object attributes self.β, self.π, self.G = model.β, model.π, model.G self.mc, self.Θ = MarkovChain(self.π), model.Θ self.S = len(model.π) # Number of states self.model = model # Find the first best allocation self.find_first_best() def find_first_best(self): ''' Find the first best allocation ''' model = self.model S, Θ, G = self.S, self.Θ, self.G Uc, Un = model.Uc, model.Un def res(z): c = z[:S] n = z[S:] return np.hstack([Θ * Uc(c, n) + Un(c, n), Θ * n - c - G]) res = root(res, 0.5 * np.ones(2 * S)) if not res.success: raise Exception('Could not find first best') self.cFB = res.x[:S] self.nFB = res.x[S:] # Multiplier on the resource constraint self.ΞFB = Uc(self.cFB, self.nFB) self.zFB = np.hstack([self.cFB, self.nFB, self.ΞFB]) def time1_allocation(self, μ): ''' Computes optimal allocation for time t >= 1 for a given μ ''' model = self.model S, Θ, G = self.S, self.Θ, self.G Uc, Ucc, Un, Unn = model.Uc, model.Ucc, model.Un, model.Unn def FOC(z): c = z[:S] n = z[S:2 * S] Ξ = z[2 * S:] # FOC of c return np.hstack([Uc(c, n) - μ * (Ucc(c, n) * c + Uc(c, n)) - Ξ, Un(c, n) - μ * (Unn(c, n) * n + Un(c, n)) \ + Θ * Ξ, # FOC of n Θ * n - c - G]) # Find the root of the first-order condition res = root(FOC, self.zFB) if not res.success: raise Exception('Could not find LS allocation.') z = res.x c, n, Ξ = z[:S], z[S:2 * S], z[2 * S:] # Compute x I = Uc(c, n) * c + Un(c, n) * n x = np.linalg.solve(np.eye(S) - self.β * self.π, I) return c, n, x, Ξ def time0_allocation(self, B_, s_0): ''' Finds the optimal allocation given initial government debt B_ and state s_0 ''' model, π, Θ, G, β = self.model, self.π, self.Θ, self.G, self.β Uc, Ucc, Un, Unn = model.Uc, model.Ucc, model.Un, model.Unn # First order conditions of planner's problem def FOC(z): μ, c, n, Ξ = z xprime = self.time1_allocation(μ)[2] return np.hstack([Uc(c, n) * (c - B_) + Un(c, n) * n + β * π[s_0] @ xprime, Uc(c, n) - μ * (Ucc(c, n) * (c - B_) + Uc(c, n)) - Ξ, Un(c, n) - μ * (Unn(c, n) * n + Un(c, n)) + Θ[s_0] * Ξ, (Θ * n - c - G)[s_0]]) # Find root res = root(FOC, np.array( [0, self.cFB[s_0], self.nFB[s_0], self.ΞFB[s_0]])) if not res.success: raise Exception('Could not find time 0 LS allocation.') return res.x def time1_value(self, μ): ''' Find the value associated with multiplier μ ''' c, n, x, Ξ = self.time1_allocation(μ) U = self.model.U(c, n) V = np.linalg.solve(np.eye(self.S) - self.β * self.π, U) return c, n, x, V def Τ(self, c, n): ''' Computes Τ given c, n ''' model = self.model Uc, Un = model.Uc(c, n), model.Un(c, n) return 1 + Un / (self.Θ * Uc) def simulate(self, B_, s_0, T, sHist=None): ''' Simulates planners policies for T periods ''' model, π, β = self.model, self.π, self.β Uc = model.Uc if sHist is None: sHist = self.mc.simulate(T, s_0) cHist, nHist, Bhist, ΤHist, μHist = np.zeros((5, T)) RHist = np.zeros(T - 1) # Time 0 μ, cHist[0], nHist[0], _ = self.time0_allocation(B_, s_0) ΤHist[0] = self.Τ(cHist[0], nHist[0])[s_0] Bhist[0] = B_ μHist[0] = μ # Time 1 onward for t in range(1, T): c, n, x, Ξ = self.time1_allocation(μ) Τ = self.Τ(c, n) u_c = Uc(c, n) s = sHist[t] Eu_c = π[sHist[t - 1]] @ u_c cHist[t], nHist[t], Bhist[t], ΤHist[t] = c[s], n[s], x[s] / u_c[s], \ Τ[s] RHist[t - 1] = Uc(cHist[t - 1], nHist[t - 1]) / (β * Eu_c) μHist[t] = μ return np.array([cHist, nHist, Bhist, ΤHist, sHist, μHist, RHist]) import numpy as np from scipy.optimize import fmin_slsqp from scipy.optimize import root from quantecon import MarkovChain class RecursiveAllocationAMSS: def __init__(self, model, μgrid, tol_diff=1e-4, tol=1e-4): self.β, self.π, self.G = model.β, model.π, model.G self.mc, self.S = MarkovChain(self.π), len(model.π) # Number of states self.Θ, self.model, self.μgrid = model.Θ, model, μgrid self.tol_diff, self.tol = tol_diff, tol # Find the first best allocation self.solve_time1_bellman() self.T.time_0 = True # Bellman equation now solves time 0 problem def solve_time1_bellman(self): ''' Solve the time 1 Bellman equation for calibration model and initial grid μgrid0 ''' model, μgrid0 = self.model, self.μgrid π = model.π S = len(model.π) # First get initial fit from Lucas Stokey solution. # Need to change things to be ex ante pp = SequentialAllocation(model) interp = interpolator_factory(2, None) def incomplete_allocation(μ_, s_): c, n, x, V = pp.time1_value(μ_) return c, n, π[s_] @ x, π[s_] @ V cf, nf, xgrid, Vf, xprimef = [], [], [], [], [] for s_ in range(S): c, n, x, V = zip(*map(lambda μ: incomplete_allocation(μ, s_), μgrid0)) c, n = np.vstack(c).T, np.vstack(n).T x, V = np.hstack(x), np.hstack(V) xprimes = np.vstack([x] * S) cf.append(interp(x, c)) nf.append(interp(x, n)) Vf.append(interp(x, V)) xgrid.append(x) xprimef.append(interp(x, xprimes)) cf, nf, xprimef = fun_vstack(cf), fun_vstack(nf), fun_vstack(xprimef) Vf = fun_hstack(Vf) policies = [cf, nf, xprimef] # Create xgrid x = np.vstack(xgrid).T xbar = [x.min(0).max(), x.max(0).min()] xgrid = np.linspace(xbar[0], xbar[1], len(μgrid0)) self.xgrid = xgrid # Now iterate on Bellman equation T = BellmanEquation(model, xgrid, policies, tol=self.tol) diff = 1 while diff > self.tol_diff: PF = T(Vf) Vfnew, policies = self.fit_policy_function(PF) diff = np.abs((Vf(xgrid) - Vfnew(xgrid)) / Vf(xgrid)).max() print(diff) Vf = Vfnew # Store value function policies and Bellman Equations self.Vf = Vf self.policies = policies self.T = T def fit_policy_function(self, PF): ''' Fits the policy functions ''' S, xgrid = len(self.π), self.xgrid interp = interpolator_factory(3, 0) cf, nf, xprimef, Tf, Vf = [], [], [], [], [] for s_ in range(S): PFvec = np.vstack([PF(x, s_) for x in self.xgrid]).T Vf.append(interp(xgrid, PFvec[0, :])) cf.append(interp(xgrid, PFvec[1:1 + S])) nf.append(interp(xgrid, PFvec[1 + S:1 + 2 * S])) xprimef.append(interp(xgrid, PFvec[1 + 2 * S:1 + 3 * S])) Tf.append(interp(xgrid, PFvec[1 + 3 * S:])) policies = fun_vstack(cf), fun_vstack( nf), fun_vstack(xprimef), fun_vstack(Tf) Vf = fun_hstack(Vf) return Vf, policies def Τ(self, c, n): ''' Computes Τ given c and n ''' model = self.model Uc, Un = model.Uc(c, n), model.Un(c, n) return 1 + Un / (self.Θ * Uc) def time0_allocation(self, B_, s0): ''' Finds the optimal allocation given initial government debt B_ and state s_0 ''' PF = self.T(self.Vf) z0 = PF(B_, s0) c0, n0, xprime0, T0 = z0[1:] return c0, n0, xprime0, T0 def simulate(self, B_, s_0, T, sHist=None): ''' Simulates planners policies for T periods ''' model, π = self.model, self.π Uc = model.Uc cf, nf, xprimef, Tf = self.policies if sHist is None: sHist = simulate_markov(π, s_0, T) cHist, nHist, Bhist, xHist, ΤHist, THist, μHist = np.zeros((7, T)) # Time 0 cHist[0], nHist[0], xHist[0], THist[0] = self.time0_allocation(B_, s_0) ΤHist[0] = self.Τ(cHist[0], nHist[0])[s_0] Bhist[0] = B_ μHist[0] = self.Vf[s_0](xHist[0]) # Time 1 onward for t in range(1, T): s_, x, s = sHist[t - 1], xHist[t - 1], sHist[t] c, n, xprime, T = cf[s_, :](x), nf[s_, :]( x), xprimef[s_, :](x), Tf[s_, :](x) Τ = self.Τ(c, n)[s] u_c = Uc(c, n) Eu_c = π[s_, :] @ u_c μHist[t] = self.Vf[s](xprime[s]) cHist[t], nHist[t], Bhist[t], ΤHist[t] = c[s], n[s], x / Eu_c, Τ xHist[t], THist[t] = xprime[s], T[s] return np.array([cHist, nHist, Bhist, ΤHist, THist, μHist, sHist, xHist]) class BellmanEquation: ''' Bellman equation for the continuation of the Lucas-Stokey Problem ''' def __init__(self, model, xgrid, policies0, tol, maxiter=1000): self.β, self.π, self.G = model.β, model.π, model.G self.S = len(model.π) # Number of states self.Θ, self.model, self.tol = model.Θ, model, tol self.maxiter = maxiter self.xbar = [min(xgrid), max(xgrid)] self.time_0 = False self.z0 = {} cf, nf, xprimef = policies0 for s_ in range(self.S): for x in xgrid: self.z0[x, s_] = np.hstack([cf[s_, :](x), nf[s_, :](x), xprimef[s_, :](x), np.zeros(self.S)]) self.find_first_best() def find_first_best(self): ''' Find the first best allocation ''' model = self.model S, Θ, Uc, Un, G = self.S, self.Θ, model.Uc, model.Un, self.G def res(z): c = z[:S] n = z[S:] return np.hstack([Θ * Uc(c, n) + Un(c, n), Θ * n - c - G]) res = root(res, 0.5 * np.ones(2 * S)) if not res.success: raise Exception('Could not find first best') self.cFB = res.x[:S] self.nFB = res.x[S:] IFB = Uc(self.cFB, self.nFB) * self.cFB + \ Un(self.cFB, self.nFB) * self.nFB self.xFB = np.linalg.solve(np.eye(S) - self.β * self.π, IFB) self.zFB = {} for s in range(S): self.zFB[s] = np.hstack( [self.cFB[s], self.nFB[s], self.π[s] @ self.xFB, 0.]) def __call__(self, Vf): ''' Given continuation value function next period return value function this period return T(V) and optimal policies ''' if not self.time_0: def PF(x, s): return self.get_policies_time1(x, s, Vf) else: def PF(B_, s0): return self.get_policies_time0(B_, s0, Vf) return PF def get_policies_time1(self, x, s_, Vf): ''' Finds the optimal policies ''' model, β, Θ, G, S, π = self.model, self.β, self.Θ, self.G, self.S, self.π U, Uc, Un = model.U, model.Uc, model.Un def objf(z): c, n, xprime = z[:S], z[S:2 * S], z[2 * S:3 * S] Vprime = np.empty(S) for s in range(S): Vprime[s] = Vf[s](xprime[s]) return -π[s_] @ (U(c, n) + β * Vprime) def cons(z): c, n, xprime, T = z[:S], z[S:2 * S], z[2 * S:3 * S], z[3 * S:] u_c = Uc(c, n) Eu_c = π[s_] @ u_c return np.hstack([ x * u_c / Eu_c - u_c * (c - T) - Un(c, n) * n - β * xprime, Θ * n - c - G]) if model.transfers: bounds = [(0., 100)] * S + [(0., 100)] * S + \ [self.xbar] * S + [(0., 100.)] * S else: bounds = [(0., 100)] * S + [(0., 100)] * S + \ [self.xbar] * S + [(0., 0.)] * S out, fx, _, imode, smode = fmin_slsqp(objf, self.z0[x, s_], f_eqcons=cons, bounds=bounds, full_output=True, iprint=0, acc=self.tol, iter=self.maxiter) if imode > 0: raise Exception(smode) self.z0[x, s_] = out return np.hstack([-fx, out]) def get_policies_time0(self, B_, s0, Vf): ''' Finds the optimal policies ''' model, β, Θ, G = self.model, self.β, self.Θ, self.G U, Uc, Un = model.U, model.Uc, model.Un def objf(z): c, n, xprime = z[:-1] return -(U(c, n) + β * Vf[s0](xprime)) def cons(z): c, n, xprime, T = z return np.hstack([ -Uc(c, n) * (c - B_ - T) - Un(c, n) * n - β * xprime, (Θ * n - c - G)[s0]]) if model.transfers: bounds = [(0., 100), (0., 100), self.xbar, (0., 100.)] else: bounds = [(0., 100), (0., 100), self.xbar, (0., 0.)] out, fx, _, imode, smode = fmin_slsqp(objf, self.zFB[s0], f_eqcons=cons, bounds=bounds, full_output=True, iprint=0) if imode > 0: raise Exception(smode) return np.hstack([-fx, out]) import numpy as np from scipy.interpolate import UnivariateSpline class interpolate_wrapper: def __init__(self, F): self.F = F def __getitem__(self, index): return interpolate_wrapper(np.asarray(self.F[index])) def reshape(self, *args): self.F = self.F.reshape(*args) return self def transpose(self): self.F = self.F.transpose() def __len__(self): return len(self.F) def __call__(self, xvec): x = np.atleast_1d(xvec) shape = self.F.shape if len(x) == 1: fhat = np.hstack([f(x) for f in self.F.flatten()]) return fhat.reshape(shape) else: fhat = np.vstack([f(x) for f in self.F.flatten()]) return fhat.reshape(np.hstack((shape, len(x)))) class interpolator_factory: def __init__(self, k, s): self.k, self.s = k, s def __call__(self, xgrid, Fs): shape, m = Fs.shape[:-1], Fs.shape[-1] Fs = Fs.reshape((-1, m)) F = [] xgrid = np.sort(xgrid) # Sort xgrid for Fhat in Fs: F.append(UnivariateSpline(xgrid, Fhat, k=self.k, s=self.s)) return interpolate_wrapper(np.array(F).reshape(shape)) def fun_vstack(fun_list): Fs = [IW.F for IW in fun_list] return interpolate_wrapper(np.vstack(Fs)) def fun_hstack(fun_list): Fs = [IW.F for IW in fun_list] return interpolate_wrapper(np.hstack(Fs)) def simulate_markov(π, s_0, T): sHist = np.empty(T, dtype=int) sHist[0] = s_0 S = len(π) for t in range(1, T): sHist[t] = np.random.choice(np.arange(S), p=π[sHist[t - 1]]) return sHist u = CRRAutility() def min_Φ(Φ): g1, g2 = u.G # Government spending in s=0 and s=1 # Solve Φ(c) def equations(unknowns, Φ): c1, c2 = unknowns # First argument of .Uc and second argument of .Un are redundant # Set up simultaneous equations eq = lambda c, g: (1 + Φ) * (u.Uc(c, 1) - -u.Un(1, c + g)) + \ Φ * ((c + g) * u.Unn(1, c + g) + c * u.Ucc(c, 1)) # Return equation evaluated at s=1 and s=2 return np.array([eq(c1, g1), eq(c2, g2)]).flatten() global c1 # Update c1 globally global c2 # Update c2 globally c1, c2 = fsolve(equations, np.ones(2), args=(Φ)) uc = u.Uc(np.array([c1, c2]), 1) # uc(n - g) # ul(n) = -un(c + g) ul = -u.Un(1, np.array([c1 + g1, c2 + g2])) * [c1 + g1, c2 + g2] # Solve for x x = np.linalg.solve(np.eye((2)) - u.β * u.π, uc * [c1, c2] - ul) global b # Update b globally b = x / uc loss = (b[0] - b[1])**2 return loss Φ_star = fmin(min_Φ, .1, ftol=1e-14) b_bar = b[0] b_bar def solve_cb(unknowns, Φ, b_bar, s=1): c0, b0 = unknowns g0 = u.G[s-1] R_0 = u.β * u.π[s] @ [u.Uc(c1, 1) / u.Uc(c0, 1), u.Uc(c2, 1) / u.Uc(c0, 1)] R_0 = 1 / R_0 τ_0 = 1 + u.Un(1, c0 + g0) / u.Uc(c0, 1) eq1 = τ_0 * (c0 + g0) + b_bar / R_0 - b0 - g0 eq2 = (1 + Φ) * (u.Uc(c0, 1) + u.Un(1, c0 + g0)) \ + Φ * (c0 * u.Ucc(c0, 1) + (c0 + g0) * u.Unn(1, c0 + g0)) \ - Φ * u.Ucc(c0, 1) * b0 return np.array([eq1, eq2], dtype='float64') c0, b0 = fsolve(solve_cb, np.array([1., -1.], dtype='float64'), args=(Φ_star, b[0], 1), xtol=1.0e-12) c0, b0 μ_grid = np.linspace(-0.09, 0.1, 100) log_example = CRRAutility() log_example.transfers = True # Government can use transfers log_sequential = SequentialAllocation(log_example) # Solve sequential problem log_bellman = RecursiveAllocationAMSS(log_example, μ_grid, tol_diff=1e-10, tol=1e-12) T = 20 sHist = np.array([0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0]) sim_seq = log_sequential.simulate(-1.03869841, 0, T, sHist) sim_bel = log_bellman.simulate(-1.03869841, 0, T, sHist) titles = ['Consumption', 'Labor Supply', 'Government Debt', 'Tax Rate', 'Government Spending', 'Output'] # Government spending paths sim_seq[4] = log_example.G[sHist] sim_bel[4] = log_example.G[sHist] # Output paths sim_seq[5] = log_example.Θ[sHist] * sim_seq[1] sim_bel[5] = log_example.Θ[sHist] * sim_bel[1] fig, axes = plt.subplots(3, 2, figsize=(14, 10)) for ax, title, seq, bel in zip(axes.flatten(), titles, sim_seq, sim_bel): ax.plot(seq, '-ok', bel, '-^b') ax.set(title=title) ax.grid() axes[0, 0].legend(('Complete Markets', 'Incomplete Markets')) plt.tight_layout() plt.show() T = 2000 # Set T to 200 periods sim_seq_long = log_sequential.simulate(0.5, 0, T) sHist_long = sim_seq_long[-3] sim_bel_long = log_bellman.simulate(0.5, 0, T, sHist_long) titles = ['Government Debt', 'Tax Rate'] fig, axes = plt.subplots(2, 1, figsize=(14, 10)) for ax, title, id in zip(axes.flatten(), titles, [2, 3]): ax.plot(sim_seq_long[id], '-k', sim_bel_long[id], '-.b', alpha=0.5) ax.set(title=title) ax.grid() axes[0].legend(('Complete Markets', 'Incomplete Markets')) plt.tight_layout() plt.show() def mean(x): '''Returns mean for x given initial state''' x = np.array(x) return x @ u.π[s] def variance(x): x = np.array(x) return x**2 @ u.π[s] - mean(x)**2 def covariance(x, y): x, y = np.array(x), np.array(y) return x * y @ u.π[s] - mean(x) * mean(y) u = CRRAutility() s = 0 c = [0.940580824225584, 0.8943592757759343] # Vector for c g = u.G # Vector for g n = c + g # Total population τ = lambda s: 1 + u.Un(1, n[s]) / u.Uc(c[s], 1) R_s = lambda s: u.Uc(c[s], n[s]) / (u.β * (u.Uc(c[0], n[0]) * u.π[0, 0] \ + u.Uc(c[1], n[1]) * u.π[1, 0])) X_s = lambda s: u.Uc(c[s], n[s]) * (g[s] - τ(s) * n[s]) R = [R_s(0), R_s(1)] X = [X_s(0), X_s(1)] print(f"R, X = {R}, {X}") bstar = -covariance(R, X) / variance(R) div = u.β * (u.Uc(c[0], n[0]) * u.π[s, 0] + u.Uc(c[1], n[1]) * u.π[s, 1]) bhat = bstar / div bhat bhat, b_bar bhat - b_bar Jmin = variance(R) * bstar**2 + 2 * bstar * covariance(R, X) + variance(X) Jmin den2 = 1 + (u.β**2) * variance(R) speedrever = 1/den2 print(f'Mean reversion speed = {speedrever}') ttime = np.log(.01) / np.log(speedrever) print(f"Time to get within .01 of limit = {ttime}")