Welcome! If you are new to Google Colab/Jupyter notebooks, you might take a look at this notebook first.
I recommend you run the first code cell of this notebook immediately, to start provisioning drake on the cloud machine, then you can leave this window open as you read the textbook.
The following cell will:
/opt/drake
, install Drake's prerequisites via apt
, and add pydrake to sys.path
. This will take approximately two minutes on the first time it runs (to provision the machine), but should only need to reinstall once every 12 hours. If you navigate between notebooks using Colab's "File->Open" menu, then you can avoid provisioning a separate machine for each notebook.You will need to rerun this cell if you restart the kernel, but it should be fast (even on Colab) because the machine will already have drake installed.
import importlib
import sys
from urllib.request import urlretrieve
# Install drake (and underactuated).
if 'google.colab' in sys.modules and importlib.util.find_spec('underactuated') is None:
urlretrieve(f"http://underactuated.csail.mit.edu/scripts/setup/setup_underactuated_colab.py",
"setup_underactuated_colab.py")
from setup_underactuated_colab import setup_underactuated
setup_underactuated(underactuated_sha='ffe2b28ed89637889c04405e5d7d2d98be3df5b6', drake_version='0.27.0', drake_build='release')
server_args = []
if 'google.colab' in sys.modules:
server_args = ['--ngrok_http_tunnel']
from meshcat.servers.zmqserver import start_zmq_server_as_subprocess
proc, zmq_url, web_url = start_zmq_server_as_subprocess(server_args=server_args)
import numpy as np
from ipywidgets import FloatSlider, ToggleButton
from IPython.display import display, SVG
import pydot
from underactuated.jupyter import running_as_notebook
np.set_printoptions(suppress=True)
Adding only a very small offset (my default is 0.01 radians, or approximately half a degree for each joint) can lead to quite large steady-state errors.
TODO: Make a plot of offset magnitude to steady-state error.
import pydrake.all
from pydrake.all import (
DiagramBuilder, LinearQuadraticRegulator, Saturation, SceneGraph, Simulator,
WrapToSystem, AddMultibodyPlantSceneGraph, Parser, Adder, ConstantVectorSource,
)
from pydrake.examples.acrobot import (AcrobotInput, AcrobotGeometry,
AcrobotPlant, AcrobotState)
def acrobot_balancing_example(offsets=[0.01,0.01]):
builder = DiagramBuilder()
def UprightState():
state = AcrobotState()
state.set_theta1(np.pi)
state.set_theta2(0.)
state.set_theta1dot(0.)
state.set_theta2dot(0.)
return state
def BalancingLQR():
# Design an LQR controller for stabilizing the Acrobot around the upright.
# Returns a (static) AffineSystem that implements the controller (in
# the original AcrobotState coordinates).
acrobot = AcrobotPlant()
context = acrobot.CreateDefaultContext()
input = AcrobotInput()
input.set_tau(0.)
acrobot.get_input_port(0).FixValue(context, input)
context.get_mutable_continuous_state_vector()\
.SetFromVector(UprightState().CopyToVector())
Q = np.diag((10., 10., 1., 1.))
R = [1]
return LinearQuadraticRegulator(acrobot, context, Q, R)
acrobot = builder.AddSystem(AcrobotPlant())
acrobot.set_name('acrobot')
saturation = builder.AddSystem(Saturation(min_value=[-10], max_value=[10]))
saturation.set_name('saturation')
builder.Connect(saturation.get_output_port(0), acrobot.get_input_port(0))
wrapto = builder.AddSystem(WrapToSystem(4))
wrapto.set_name('wrap_angles')
wrapto.set_interval(0, 0, 2. * np.pi)
wrapto.set_interval(1, -np.pi, np.pi)
builder.Connect(acrobot.get_output_port(0), wrapto.get_input_port(0))
controller = builder.AddSystem(BalancingLQR())
controller.set_name("lqr controller")
# I'll add the offsets in here:
adder = builder.AddSystem(Adder(2, 4))
adder.set_name("adder")
builder.Connect(wrapto.get_output_port(), adder.get_input_port(0))
builder.Connect(adder.get_output_port(), controller.get_input_port())
offset = builder.AddSystem(ConstantVectorSource(np.array(offsets + [0, 0])))
offset.set_name("state calibration offsets")
builder.Connect(offset.get_output_port(), adder.get_input_port(1))
builder.Connect(controller.get_output_port(), saturation.get_input_port())
# Setup visualization
scene_graph = builder.AddSystem(SceneGraph())
AcrobotGeometry.AddToBuilder(builder, acrobot.get_output_port(0), scene_graph)
visualizer = pydrake.systems.meshcat_visualizer.ConnectMeshcatVisualizer(
builder,
scene_graph=scene_graph,
zmq_url=zmq_url)
visualizer.vis.delete()
visualizer.set_planar_viewpoint(xmin=-3.0, xmax=3.0, ymin=-3.0, ymax=4.0)
diagram = builder.Build()
display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))
# Set up a simulator to run this diagram
simulator = Simulator(diagram)
simulator.set_target_realtime_rate(1.0)
context = simulator.get_mutable_context()
# Simulate
duration = 4.0 if running_as_notebook else 0.1 # sets a shorter duration during testing
for i in range(5):
context.SetTime(0.)
context.SetContinuousState(UprightState().CopyToVector() +
0.05 * np.random.randn(4,))
simulator.Initialize()
simulator.AdvanceTo(duration)
acrobot_balancing_example()
Recall that the dynamics of our cart-pole system were
(mc+mp)¨x+mpl¨θcosθ−mpl˙θ2sinθ=fxmpl¨xcosθ+mpl2¨θ+mpglsinθ=0Note: This example requires https://github.com/RobotLocomotion/drake/pull/14863 , which is not available in the drake binaries quite yet.
import matplotlib.pyplot as plt
from pandas import DataFrame
from pydrake.all import (
MultibodyPlant, Parser, MakeVectorVariable, MultibodyForces_, SpatialInertia_, UnitInertia_,
Expression, Variable, Polynomial, Variables, DecomposeLumpedParameters, Evaluate,
PiecewisePolynomial, TrajectorySource,
DiagramBuilder, Simulator, LogOutput
)
from underactuated import FindResource
def cartpole_generate_data():
fig, ax = plt.subplots(2, 2)
ax[0,0].set_xlabel('t')
ax[0,0].set_ylabel('x')
ax[1,0].set_xlabel('t')
ax[1,0].set_ylabel('theta')
ax[0,1].set_xlabel('t')
ax[0,1].set_ylabel('xdot')
ax[1,1].set_xlabel('t')
ax[1,1].set_ylabel('thetadot')
data = []
for i in range(4):
ts = np.arange(0, 4, 0.1)
utraj = PiecewisePolynomial.CubicShapePreserving(ts, 10*np.sin((i+1)*ts).reshape((1,-1)))
builder = DiagramBuilder()
plant = builder.AddSystem(MultibodyPlant(0.0))
file_name = FindResource("models/cartpole.urdf")
Parser(plant).AddModelFromFile(file_name)
plant.Finalize()
usys = builder.AddSystem(TrajectorySource(utraj))
builder.Connect(usys.get_output_port(), plant.get_actuation_input_port())
logger = LogOutput(plant.get_state_output_port(), builder)
diagram = builder.Build()
simulator = Simulator(diagram)
simulator.AdvanceTo(ts[-1])
data.append(DataFrame({
't': logger.sample_times(),
'u': utraj.vector_values(logger.sample_times())[0,:],
'x': logger.data()[0,:],
'theta': logger.data()[1,:],
'xdot': logger.data()[2,:],
'thetadot': logger.data()[3,:],
}))
ax[0,0].plot(data[i].t, data[i].x.T)
ax[1,0].plot(data[i].t, data[i].theta.T)
ax[0,1].plot(data[i].t, data[i].xdot.T)
ax[1,1].plot(data[i].t, data[i].thetadot.T)
return data
data = cartpole_generate_data()
def cartpole_estimate_lumped_parameters(data):
plant = MultibodyPlant(0.0)
file_name = FindResource("models/cartpole.urdf")
Parser(plant).AddModelFromFile(file_name)
plant.Finalize()
sym_plant = plant.ToSymbolic()
sym_context = sym_plant.CreateDefaultContext()
# State variables
q = MakeVectorVariable(2, "q")
v = MakeVectorVariable(2, "v")
vd = MakeVectorVariable(2, "vd")
tau = MakeVectorVariable(1, "u")
# Parameters
mc = Variable("mc")
mp = Variable("mp")
l = Variable("l")
sym_plant.get_actuation_input_port().FixValue(sym_context, tau)
sym_plant.SetPositions(sym_context, q)
sym_plant.SetVelocities(sym_context, v)
cart = sym_plant.GetBodyByName("cart")
cart.SetMass(sym_context, mc)
pole = sym_plant.GetBodyByName("pole")
inertia = SpatialInertia_[Expression](mp, [0, 0, -l], UnitInertia_[Expression](l*l,l*l,0))
pole.SetSpatialInertiaInBodyFrame(sym_context, inertia)
# TODO: Set the joint damping pending resolution of drake #14405
# x = sym_plant.GetJointByName("x")
# theta = sym_plant.GetJointByName("theta")
forces = MultibodyForces_[Expression](sym_plant)
sym_plant.CalcForceElementsContribution(sym_context, forces)
error = sym_plant.CalcInverseDynamics(sym_context, vd, forces) - sym_plant.MakeActuationMatrix() @ tau
print('Symbolic dynamics: ')
print(error[0].Expand())
print(error[1].Expand())
print('')
W, alpha, w0 = DecomposeLumpedParameters(error, [mc, mp, l])
# Form the data matrix
# TODO: Update this to the vectorized version pending drake #8495
# env = {
# q[0]: np.concatenate([d.x[:-1] for d in data]),
# q[1]: np.concatenate([d.theta[:-1] for d in data]),
# v[0]: np.concatenate([d.xdot[:-1] for d in data]),
# v[1]: np.concatenate([d.thetadot[:-1] for d in data]),
# vd[0]: np.concatenate([(d.xdot[1:]-d.xdot[:-1])/h for d in data]),
# vd[1]: np.concatenate([(d.thetadot[1:]-d.thetadot[:-1])/h for d in data]),
# tau[0]:np.concatenate([d.u[:-1] for d in data])
# }
# Wdata = Evaluate(W, env)
M = data[0].t.shape[0] - 1
MM = 2*M*len(data)
N = alpha.size
Wdata = np.zeros((MM, N))
w0data = np.zeros((MM, 1))
offset = 0
for d in data:
for i in range(M):
h = d.t[i+1]-d.t[i]
env = {
q[0]: d.x[i],
q[1]: d.theta[i],
v[0]: d.xdot[i],
v[1]: d.thetadot[i],
vd[0]: (d.xdot[i+1]-d.xdot[i])/h,
vd[1]: (d.thetadot[i+1]-d.thetadot[i])/h,
tau[0]: d.u[i],
}
Wdata[offset:offset+2,:] = Evaluate(W, env)
w0data[offset:offset+2] = Evaluate(w0, env)
offset += 2
alpha_fit = np.linalg.lstsq(Wdata, -w0data, rcond=None)[0]
alpha_true = Evaluate(alpha, {mc: 10, mp: 1, l: .5})
print('Estimated Lumped Parameters:')
for i in range(len(alpha)):
print(f"{alpha[i]}. URDF: {alpha_true[i,0]}, Estimated: {alpha_fit[i,0]}")
cartpole_estimate_lumped_parameters(data)
Let's do this in a few steps. First the setup...
import matplotlib.pyplot as plt
from pydrake.all import (
DiagramBuilder, Simulator, LogOutput,
Adder, Gain, PiecewisePolynomial, TrajectorySource,
AddMultibodyPlantSceneGraph, Parser, SpatialInertia, UnitInertia,
LinearQuadraticRegulator, ConnectMeshcatVisualizer,
)
from underactuated import FindResource
from pandas import DataFrame
def UprightState():
state = [0, np.pi, 0, 0]
return state
def BalancingLQR(plant, context):
# Set u0 = 0
plant.get_actuation_input_port().FixValue(context, [0])
# Set x0 = UprightState()
plant.SetPositionsAndVelocities(context, UprightState())
Q = np.diag([10., 10., 1., 1.])
R = [1]
return LinearQuadraticRegulator(plant, context,
Q, R, input_port_index=int(plant.get_actuation_input_port().get_index()))
def cartpole_balancing_example(change_params=False, input_excitation=False, controller=None):
builder = DiagramBuilder()
plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.1)
file_name = FindResource("models/cartpole.urdf")
Parser(plant).AddModelFromFile(file_name)
plant.Finalize()
context = plant.CreateDefaultContext()
if (change_params):
# urdf has mc=10, mp=1, l=.5
mc = 6.0
mp = 1.5
l = 0.5
cart = plant.GetBodyByName("cart")
cart.SetMass(context, mc)
pole = plant.GetBodyByName("pole")
inertia = SpatialInertia(mp, [0, 0, -l], UnitInertia(l*l,l*l,0))
pole.SetSpatialInertiaInBodyFrame(context, inertia)
# TODO: Set the joint damping pending resolution of drake #14405
x_log = LogOutput(plant.get_state_output_port(), builder)
if controller:
controller = builder.AddSystem(controller)
else:
controller = builder.AddSystem(BalancingLQR(plant, context))
builder.Connect(plant.get_state_output_port(), controller.get_input_port(0))
if input_excitation:
# Add an uncorrelated (sine wave) input to make the dataset more rich
ts = np.arange(0, 8.0, 0.1)
pp = PiecewisePolynomial.CubicShapePreserving(ts, .1*np.sin(ts).reshape((1,-1)))
noise = builder.AddSystem(TrajectorySource(pp))
adder = builder.AddSystem(Adder(2, 1))
builder.Connect(noise.get_output_port(), adder.get_input_port(0))
builder.Connect(controller.get_output_port(), adder.get_input_port(1))
builder.Connect(adder.get_output_port(), plant.get_actuation_input_port())
u_log = LogOutput(adder.get_output_port(), builder)
else:
builder.Connect(controller.get_output_port(0),
plant.get_actuation_input_port())
u_log = LogOutput(controller.get_output_port(), builder)
diagram = builder.Build()
# Set up a simulator to run this diagram
simulator = Simulator(diagram)
context = simulator.get_mutable_context()
simulator.get_mutable_integrator().set_fixed_step_mode(True)
simulator.get_mutable_integrator().set_maximum_step_size(0.1)
fig, ax = plt.subplots(1, 2)
ax[0].set_xlabel('time (sec)')
ax[0].set_ylabel('x (m)')
ax[1].set_xlabel('time (sec)')
ax[1].set_ylabel('theta (rad)')
data = []
# Simulate
duration = 8.0 if running_as_notebook else 0.1 # sets a shorter duration during testing
for i in range(15):
u_log.reset()
x_log.reset()
context.SetTime(0.)
plant.SetPositionsAndVelocities(
plant.GetMyContextFromRoot(context),
UprightState() + 0.01 * np.random.randn(4,)
)
simulator.Initialize()
simulator.AdvanceTo(duration)
data.append(DataFrame({
't': x_log.sample_times(),
'u': u_log.data()[0,:],
'x': x_log.data()[0,:],
'theta': x_log.data()[1,:],
'xdot': x_log.data()[2,:],
'thetadot': x_log.data()[3,:],
}))
ax[0].plot(data[-1].t, data[-1].x)
ax[1].plot(data[-1].t, data[-1].theta)
plt.show()
return data
Now, let's run the original cart-pole LQR controller (with a known model) again, to remember what the solutions look like.
data = cartpole_balancing_example(change_params=False)
Let's run it again, but this time with the LQR controller designed using the wrong parameters for the cart-pole. You can see that it still keeps the cart-pole near the fixed point, but it's not converging very nicely!
data = cartpole_balancing_example(change_params=True, input_excitation=True)
Now let's run (linear) system identification on the data that used the wrong controller.
def linear_system_id(data):
N = np.array([d.t.size for d in data])
X = np.zeros((4,sum(N-1)))
Xn = np.zeros((4,sum(N-1)))
U = np.zeros((1,sum(N-1)))
offset = 0
for d in data:
N = d.t.size
assert(np.allclose(np.diff(d.t), 0.1)) # make sure no extra time-samples snuck in
X[0, offset:offset+N-1] = d.x[:-1]
X[1, offset:offset+N-1] = d.theta[:-1]
X[2, offset:offset+N-1] = d.xdot[:-1]
X[3, offset:offset+N-1] = d.thetadot[:-1]
U[0, offset:offset+N-1] = d.u[:-1]
Xn[0, offset:offset+N-1] = d.x[1:]
Xn[1, offset:offset+N-1] = d.theta[1:]
Xn[2, offset:offset+N-1] = d.xdot[1:]
Xn[3, offset:offset+N-1] = d.thetadot[1:]
offset += N-1
u0 = 0
x0 = UprightState()
U = U - u0
for i in range(4):
X[i,:] = X[i,:] - x0[i]
Xn[i,:] = Xn[i,:] - x0[i]
# Sanity check... (can make this go to zero if I narrow the initial conditions)
#A, B = linearize_cartpole() # currently defined in the cell below...
#print(np.linalg.norm(A @ X + B @ U - Xn, axis=1))
# Xn = [A,B][X;U] or [X',U']*[A';B'] = Xn'
ATBT, residuals, rank, s = np.linalg.lstsq(np.hstack((X.T,U.T)), Xn.T, rcond=None)
A = ATBT[:4,:].T
B = ATBT[-1,:].reshape((4,1))
print(f'Least squares rank (out of possible 5): {rank}')
print(f'residuals: {np.linalg.norm(A @ X + B @ U - Xn, axis=1)}')
return A, B
Ahat, Bhat = linear_system_id(data)
Try this: Try generating data without adding noise to the controller. You'll see that the least-squares solution drops rank (and the controller will be garbage).
Let's compare this with the true linearized dynamics of the cart-pole.
from pydrake.all import MultibodyPlant, Linearize
def linearize_cartpole():
# Note the discrete time here matches the sample time above.
plant = MultibodyPlant(time_step=0.1)
file_name = FindResource("models/cartpole.urdf")
Parser(plant).AddModelFromFile(file_name)
plant.Finalize()
context = plant.CreateDefaultContext()
# Set u0 = 0
plant.get_actuation_input_port().FixValue(context, [0])
# Set x0 = UprightState()
plant.SetPositionsAndVelocities(context, UprightState())
linear_sys = Linearize(plant, context,
input_port_index=plant.get_actuation_input_port().get_index(),
output_port_index=plant.get_state_output_port().get_index())
return linear_sys.A(), linear_sys.B()
A, B = linearize_cartpole()
np.set_printoptions(formatter={'float': lambda x: "{0:0.4f}".format(x)})
print("A = ")
print(A)
print("Â = ")
print(Ahat)
print("")
print("B = ")
print(B)
print("B̂ = ")
print(Bhat)
Finally, let's run the cart-pole in closed-loop with an LQR controller designed from the estimated A and B matrices.
from pydrake.all import AffineSystem, DiscreteTimeLinearQuadraticRegulator
def cartpole_lqr_with_identified_model(A, B):
Q = np.diag([10., 10., 1., 1.])
R = [1]
K, S = DiscreteTimeLinearQuadraticRegulator(A, B, Q, R)
# u = u0 - K*(x-x0) = (u0 + K*x0) - K*x
u0 = [0]
x0 = UprightState()
controller = AffineSystem(
A=np.zeros((0,0)), B=np.zeros((0,4)), f0=np.zeros((0,1)),
C=np.zeros((1,0)), D=-K, y0=u0+K @ x0)
cartpole_balancing_example(controller=controller)
cartpole_lqr_with_identified_model(Ahat, Bhat)
Let's generate input-output data with the cart-pole with some additional random (Gaussian iid) input excitation. This time, instead of logging the true state, I'll include an additional block which outputs a number of keypoints on the cart and the pole.
import numpy as np
from pydrake.all import (
DiagramBuilder, Simulator, LogOutput,
Adder, ConstantVectorSource, RandomSource, RandomDistribution, Gain,
AddMultibodyPlantSceneGraph, Parser, RigidTransform_,
LinearQuadraticRegulator,
TemplateSystem, LeafSystem_, AbstractValue, BasicVector_,
)
from underactuated import FindResource
from underactuated.jupyter import running_as_notebook, ToLatex
from IPython.display import display, Latex, SVG
import pydot
def UprightState():
state = [0, np.pi, 0, 0]
return state
def BalancingLQR(plant, context):
# Set u0 = 0
plant.get_actuation_input_port().FixValue(context, [0])
# Set x0 = UprightState()
plant.SetPositionsAndVelocities(context, UprightState())
Q = np.diag([10., 10., 1., 1.])
R = [1]
return LinearQuadraticRegulator(plant, context,
Q, R, input_port_index=int(plant.get_actuation_input_port().get_index()))
@TemplateSystem.define("KeypointSystem_")
def KeypointSystem_(T):
class Impl(LeafSystem_[T]):
def _construct(self, plant, converter=None):
LeafSystem_[T].__init__(self, converter)
# Kc are the cart keypoints
self.p_CKc = np.array([[.3, 0, .4],
[.3, 0, .1],
[-.3, 0, .1],
[-.3, 0, .4],
[-.15, 0, .025],
[.15, 0, .025]]).T
# Kp are the pole keypoints
l = np.linspace(start=-.5, stop=0, num=3)
self.p_PKp = np.hstack((
np.vstack((0*l+0.01, 0*l, l)),
np.vstack((0*l-0.01, 0*l, l))
))
self.num_keypoints = self.p_CKc.shape[1] + self.p_PKp.shape[1]
self.cart_index = int(plant.GetBodyByName("cart").index())
self.pole_index = int(plant.GetBodyByName("pole").index())
self.DeclareAbstractInputPort(
"body_poses", AbstractValue.Make([RigidTransform_[T]()]*3))
self.DeclareVectorOutputPort(
"keypoints",
BasicVector_[T](2 * self.num_keypoints),
self.CalcKeypoints)
# Save only for scalar copy construction
self.plant_ = plant
def _construct_copy(self, other, converter=None):
Impl._construct(self, other.plant_, converter=converter)
def CalcKeypoints(self, context, output):
body_poses = self.get_input_port().Eval(context)
X_WC = body_poses[self.cart_index]
X_WP = body_poses[self.pole_index]
p_WK = np.hstack((X_WC.multiply(self.p_CKc), X_WP.multiply(self.p_PKp)))
output.SetFromVector((p_WK[[0,2],:]).reshape(-1,1))
return Impl
KeypointSystem = KeypointSystem_[None] # Default instantiation
def cartpole_balancing_keypoints():
builder = DiagramBuilder()
plant, scene_graph = AddMultibodyPlantSceneGraph(builder, time_step=0.1)
file_name = FindResource("models/cartpole.urdf")
Parser(plant).AddModelFromFile(file_name)
plant.Finalize()
# Add the LQR controller.
context = plant.CreateDefaultContext()
controller = builder.AddSystem(BalancingLQR(plant, context))
controller.set_name("LQR controller")
builder.Connect(plant.get_state_output_port(), controller.get_input_port(0))
# Add an iid Gaussian input for identification
excitation = builder.AddSystem(RandomSource(RandomDistribution.kGaussian, 1, 0.1))
excitation.set_name("excitation")
adder = builder.AddSystem(Adder(2, 1))
gain1 = builder.AddSystem(Gain(.025, 1))
gain2 = builder.AddSystem(Gain(100.0, 1))
builder.Connect(excitation.get_output_port(), gain1.get_input_port())
builder.Connect(gain1.get_output_port(), gain2.get_input_port())
builder.Connect(gain2.get_output_port(), adder.get_input_port(0))
# builder.Connect(excitation.get_output_port(), adder.get_input_port(0))
builder.Connect(controller.get_output_port(), adder.get_input_port(1))
builder.Connect(adder.get_output_port(), plant.get_actuation_input_port())
# We will identify the map from this input to the sensors (lumping the LQR controller into the identified dynamics)
u_log = LogOutput(gain1.get_output_port(), builder)
u_log.set_name("input log")
# Add and log keypoint outputs
keypoints = builder.AddSystem(KeypointSystem(plant))
keypoints.set_name("keypoints")
builder.Connect(plant.get_body_poses_output_port(), keypoints.get_input_port())
y_log = LogOutput(keypoints.get_output_port(), builder)
y_log.set_name("output log")
diagram = builder.Build()
display(SVG(pydot.graph_from_dot_data(diagram.GetGraphvizString())[0].create_svg()))
# Set up a simulator to run this diagram
simulator = Simulator(diagram)
context = simulator.get_mutable_context()
simulator.get_mutable_integrator().set_fixed_step_mode(True)
simulator.get_mutable_integrator().set_maximum_step_size(0.1)
data = []
context.SetTime(0.)
plant.SetPositionsAndVelocities(plant.GetMyContextFromRoot(context), UprightState())
simulator.Initialize()
y0 = keypoints.get_output_port().Eval(keypoints.GetMyContextFromRoot(context))
u0 = 0
# Simulate
duration = 15.0 if running_as_notebook else .5 # sets a shorter duration during testing
for i in range(25):
u_log.reset()
y_log.reset()
context.SetTime(0.)
plant.SetPositionsAndVelocities(
plant.GetMyContextFromRoot(context),
UprightState() + 0.1 * np.random.randn(4,)
)
simulator.Initialize()
simulator.AdvanceTo(duration)
y_out = np.copy(y_log.data())
for i in range(y_out.shape[0]):
y_out[i,:] -= y0[i]
data.append({'t': np.copy(y_log.sample_times()),
'u': np.copy(u_log.data()),
'y': y_out})
return data, u0, y0
data, u0, y0 = cartpole_balancing_keypoints()
Here's a simple animation of the keypoints, so we understand what we're asking the algorithm to do. (Rendering the actual animation to jshtml is surprisingly slow...)
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import display, HTML
plt.rcParams.update({"savefig.transparent": True})
def draw_keypoints(data, y0, num_trials=-1):
fig, ax = plt.subplots(figsize=(8,3))
ax.axis('equal')
ax.set(xlim=(-2, 2), ylim=(-.5, 1.0))
frames = []
for trial in data[:num_trials]:
for n in range(trial['t'].size):
y = (trial['y'][:, n] + y0).reshape(2,-1)
frames.append(ax.plot(y[0,:], y[1,:], 'b.'))
ani = animation.ArtistAnimation(fig, frames, interval=100, blit=True, repeat=True)
plt.close('all')
display(HTML(ani.to_jshtml()))
draw_keypoints(data, y0, num_trials=3)
Let's plot the y data, to make sure it seems reasonable...
import matplotlib.pyplot as plt
fig,ax = plt.subplots()
for trial in data:
ax.plot(trial['y'].T)
Now let's implement the actual Ho-Kalman algorithm
import matplotlib.pyplot as plt
from pydrake.all import MultibodyPlant, Linearize
def EstimateG(data, N):
# N=0 fits y[k] = Du[k]
# N=1 fits y[k] = CBu[k-1] + Du[k]
# ...
nu = data[0]['u'].shape[0]
ny = data[0]['y'].shape[0]
num_data = np.sum([trial['t'].size - N for trial in data])
U = np.ones((nu*(N+1), num_data)) # U[:,i] = [u[k-N]; ...; u[k]]
Y = np.zeros((ny, num_data))
offset = 0
for trial in data:
nt = trial['t'].size
this_u = trial['u']
this_y = trial['y']
for n in range(nt - N):
# y[k] depends on u[k-N] to u[k]
k = n+N
U[:, n+offset] = this_u[:, n:k+1].reshape((-1,),order='F')
Y[:, n+offset] = this_y[:, k]
offset += nt - N
# Least squares: Y = G*U => Y.T = U.T*G.T => a=U.T, b=Y.T, x=G.T
GhatT, residuals, rank, s = np.linalg.lstsq(a=U.T, b=Y.T, rcond=None)
print(f'Least squares rank (out of possible {N+1}): {rank}')
return GhatT.T
N = 100 if running_as_notebook else 2
Ghat = EstimateG(data, N)
Let's compare our estimatation, ˆG to the G that we get from linearizing the plant (with autodiff), and computing the true impulse response. I've plotted the results for two of the keypoints.
def linearize_cartpole():
builder = DiagramBuilder()
# Note the discrete time here matches the sample time above.
plant = builder.AddSystem(MultibodyPlant(time_step=0.1))
file_name = FindResource("models/cartpole.urdf")
Parser(plant).AddModelFromFile(file_name)
plant.Finalize()
controller = builder.AddSystem(BalancingLQR(plant, plant.CreateDefaultContext()))
builder.Connect(plant.get_state_output_port(), controller.get_input_port(0))
adder = builder.AddSystem(Adder(2, 1))
gain = builder.AddSystem(Gain(100.0, 1))
builder.Connect(gain.get_output_port(), adder.get_input_port(0))
builder.Connect(controller.get_output_port(), adder.get_input_port(1))
builder.Connect(adder.get_output_port(), plant.get_actuation_input_port())
keypoints = builder.AddSystem(KeypointSystem(plant))
builder.Connect(plant.get_body_poses_output_port(), keypoints.get_input_port())
builder.ExportInput(gain.get_input_port())
builder.ExportOutput(keypoints.get_output_port())
diagram = builder.Build()
context = diagram.CreateDefaultContext()
plant_context = plant.GetMyContextFromRoot(context)
# Set u0 = 0
diagram.get_input_port().FixValue(context, [0])
# Set x0 = UprightState()
plant.SetPositionsAndVelocities(plant_context, UprightState())
linear_sys = Linearize(diagram, context)
return linear_sys.A(), linear_sys.B(), linear_sys.C(), linear_sys.D()
def BuildG(A, B, C, D, nt):
ny, nu = D.shape
nx = A.shape[0]
G = np.zeros((ny, (nt+1)*nu))
G[:, -nu] = D.reshape(-1,)
AkB = B
for k in range(nt):
G[:, -(k+2)*nu:-(k+1)*nu] = C @ AkB
AkB = A @ AkB
return G
A, B, C, D = linearize_cartpole()
G = BuildG(A,B,C,D,N)
keypoint_inds = [1, 9]
fig, ax = plt.subplots(1,len(keypoint_inds))
for i in range(len(keypoint_inds)):
ax[i].plot(np.flip(Ghat[keypoint_inds[i],:]),label='estimated')
ax[i].plot(np.flip(G[keypoint_inds[i],:]),label='actual')
ax[i].set_title(f'keypoint {keypoint_inds[i]} impulse response')
ax[i].legend()
#display(Latex(f'\\begin{{align*}} G =& {ToLatex(G[keypoint_inds,:])} \\\\ \\hat{{G}} =& {ToLatex(Ghat[keypoint_inds,:])} \\end{{align*}}'))
Now let's extract the estimated ˆA,ˆB,ˆC,ˆD matrices
def HoKalman(G, nu, nx, m, n):
# Ho-Kalman algorithm, aka Eigensystem Realization Algorithm (ERA)
# G are the Markov parameters [CA^{N-1}B, ..., CB, D]
# nu is the number of inputs
# nx is the desired number of states in the realization
# m,n are the number of (block) rows/columns in the Hankel matrix
# with m*nu >= n*ny and m+n <= N
ny = G.shape[0]
# Construct the block Hankel data matrix
H = np.zeros((m*ny, n*nu))
H2 = np.zeros((m*ny, n*nu))
for i in range(m):
Hrows = i*ny + np.arange(ny)
for j in range(n):
Hcols = j*nu + np.arange(nu)
Gcols = -((j+i+2)*nu) + np.arange(nu)
H[np.ix_(Hrows, Hcols)] = G[:, Gcols]
Gcols2 = -((j+i+3)*nu) + np.arange(nu)
H2[np.ix_(Hrows, Hcols)] = G[:, Gcols2]
U, s, VT = np.linalg.svd(H, full_matrices=False)
sxhalf = np.sqrt(s[:nx])
Ux = U[:,:nx]
VxT = VT[:nx, :]
B = np.diag(sxhalf) @ VxT[:,:nu]
C = Ux[:ny,:] @ np.diag(sxhalf)
Sminushalf = np.diag(1/sxhalf)
A = Sminushalf @ Ux.T @ H2 @ VxT.T @ Sminushalf
D = G[:,-nu + np.arange(nu)]
return A, B, C, D, s[:nx]
m = round(N/2)
Ahat, Bhat, Chat, Dhat, sigma = HoKalman(Ghat, nu=1, nx=10, m=m, n=N-m)
fig, ax = plt.subplots(figsize=(6,2))
ax.plot(sigma,'.')
plt.yscale("log")
ax.set_title("Hankel singular values (log scale)")
keypoint_inds = range(5,11)
display(Latex(f'\\begin{{align*}} \hat{{A}} =& {ToLatex(Ahat)} & \hat{{B}} =& {ToLatex(Bhat)} \\\\ \hat{{C}} =& {ToLatex(Chat[keypoint_inds,:])} & \hat{{D}} =& {ToLatex(Dhat[keypoint_inds,:])} \\end{{align*}}'))
It's interesting that the magnitude of the Hankel singular values drop sharply after the first 4 when we use the true G obtained from linearization. ˆG doesn't have noise, but it was generated by simulating the nonlinear dynamics, so it has a somewhat similar effect.
_, _, _, _, sigma = HoKalman(G, nu=1, nx=10, m=m, n=N-m) # I've used G here instead of Ghat
fig, ax = plt.subplots(figsize=(6,2))
ax.plot(sigma,'.')
plt.yscale("log")
ax.set_title("Hankel singular values (log scale)");
Remember that ˆA,ˆB,ˆC,ˆD will only approximate the original A,B,C,D up to a similarity transform. But they should describe the same input-output behavior.
Ghat_reconstructed = BuildG(Ahat,Bhat,Chat,Dhat,N)
keypoint_inds = [1, 9]
fig, ax = plt.subplots(1,len(keypoint_inds))
for i in range(len(keypoint_inds)):
ax[i].plot(np.flip(Ghat[keypoint_inds[i],:]),label='estimated')
ax[i].plot(np.flip(G[keypoint_inds[i],:]),label='actual')
ax[i].plot(np.flip(Ghat_reconstructed[keypoint_inds[i],:]),label='reconstructed')
ax[i].set_title(f'keypoint {keypoint_inds[i]} impulse response')
ax[i].legend()