import os
import scipy.integrate
import matplotlib.pyplot as plt
import numpy as np
from numpy import f2py
import copy
%matplotlib inline
# local modules
from periodic import PeriodicProcess, PeriodicScheduler, \
nested_dict_to_namedtuple, Logger
import uorb
from sympy_utils import rhs_to_scipy_ode, \
save_sympy_expr, load_sympy_expr
if f2py.compile(source=open('pendulum.f90','r').read(), modulename='pendulum',
source_fn='pendulum.f90'):
raise RuntimeError('compile failed, see console')
import pendulum
class PendulumDynamics(PeriodicProcess):
def __init__(self, period, x0, uorb_manager):
super(PendulumDynamics, self).__init__(period)
self.x0 = x0
self.ode = scipy.integrate.ode(
pendulum.compute_f, pendulum.compute_a)
#self.ode.set_integrator('dopri5')
self.uorb_manager = uorb_manager
self.process_noise_power = 1e-5
def initialize(self, t):
super(PendulumDynamics, self).initialize(t)
self.ode.set_initial_value(self.x0, t)
# publish sim state
self.sim = uorb.Publication(
self.uorb_manager, 'sim_state',
uorb.Topic_sim_state(*((0,)*16)))
self.sim.publish()
# subscribe to actuators
self.actuators = uorb.Subscription(
self.uorb_manager, 'actuator_outputs')
def run(self, t):
# update actuator info
self.actuators.update()
process_noise_stdev = np.sqrt(self.process_noise_power/self.period)
process_noise = process_noise_stdev*np.random.randn()
ode = self.ode
if t == ode.t:
return
u = [self.actuators.data.output[0] + process_noise]
m = 1
g = 9.8
l = 1
ode.set_f_params(u, m, g, l)
ode.set_jac_params(u, m, g, l)
ode.integrate(t)
if not ode.successful():
raise ValueError('ode integration failed')
# acceleration
# TODO, should make sympy output this
x = ode.y
y_accel = pendulum.compute_g(t, x, u, m, g, l)
# publish simulation data
self.sim.data.timestamp=int(1e6*t)
self.sim.data.pitch=ode.y[0]
self.sim.data.pitchspeed=ode.y[1]
self.sim.data.xacc=y_accel[0,0]
self.sim.data.yacc=y_accel[1,0]
self.sim.data.zacc=y_accel[2,0]
self.sim.publish()
@property
def x(self):
return self.ode.y
class Sensor(PeriodicProcess):
def __init__(self, period, uorb_manager):
super(Sensor, self).__init__(period)
self.uorb_manager = uorb_manager
self.gyro_noise_power = 1e-5**np.ones(3)
self.gyro_bias = np.zeros(3)
self.accel_noise_power = 1e-4*np.ones(3)
self.mag_noise_power = 1e-4*np.ones(3)
def initialize(self, t):
super(Sensor, self).initialize(t)
#initialize topic
timestamp = int(t*1e6)
self.sensor = uorb.Publication(
self.uorb_manager, 'sensor_combined',
uorb.Topic_sensor_combined(*((0,)*42)))
self.sensor.data.gyro1_rad_s = timestamp
self.sensor.data.gyro_rad_s = np.array([0,0,0])
self.sensor.data.accelerometer_timestamp = timestamp
self.sensor.data.accelerometer_m_s2 = np.array([0,0,0])
self.sensor.magnetometer_timestamp = timestamp
self.sensor.magnetometer_ga = np.array([0,0,0])
self.sensor.publish()
# get sim data
self.sim = uorb.Subscription(
self.uorb_manager, 'sim_state')
def run(self, t):
# get new simulation data
self.sim.update()
# noise generator
randn = np.random.randn
timestamp = int(t*1e6)
self.sensor.data.timestamp = timestamp
# gyro
gyro_ideal = np.array([self.sim.data.rollspeed,
self.sim.data.pitchspeed,
self.sim.data.yawspeed])
gyro_stddev = np.sqrt(self.gyro_noise_power/self.period)
gyro_noise = gyro_stddev*randn(3)
gyro = gyro_ideal + gyro_noise
# accelerometer
accel_ideal = np.array([self.sim.data.xacc,self.sim.data.yacc,self.sim.data.zacc])
accel_stddev = np.sqrt(self.accel_noise_power/self.period)
accel_noise = accel_stddev*randn(3)
accel = accel_ideal + accel_noise
# magnetometer
mag_ideal = np.array([0,0,0])
mag_stddev = np.sqrt(self.mag_noise_power/self.period)
mag_noise = mag_stddev*randn(3)
mag = mag_ideal + mag_noise
# publish
self.sensor.data.gyro1_timestamp = timestamp
self.sensor.data.gyro_rad_s = gyro
self.sensor.data.accelerometer_timestamp = timestamp
self.sensor.data.accelerometer_m_s2 = accel
self.sensor.magnetometer_timestamp = timestamp
self.sensor.magnetometer_ga = mag
self.sensor.publish()
class Estimator(PeriodicProcess):
def __init__(self, period, uorb_manager):
super(Estimator, self).__init__(period)
self.uorb_manager = uorb_manager
self.xh = np.array([0,0])
self.P0 = np.eye(2)
def initialize(self, t):
super(Estimator, self).initialize(t)
# initialize position publication
self.pos = uorb.Publication(
self.uorb_manager,
'vehicle_global_position',
uorb.Topic_vehicle_global_position(*((0,)*11)))
# initialize attitude publication
self.att = uorb.Publication(
self.uorb_manager,
'vehicle_attitude',
uorb.Topic_vehicle_attitude(*((0,)*16)))
# intialize sensor subscription
self.sensor = uorb.Subscription(
self.uorb_manager,
'sensor_combined')
# TODO, this is a hack, we make the estimator
# just give the sim state for now
self.sim = uorb.Subscription(
self.uorb_manager,
'sim_state')
# initialize timestamps for checking for sensor updates
self.accel_timestamp_last = -1
self.l = 1
self.g = 9.8
def run(self, t):
# get new sensor data
# TODO use this to estimate
self.sensor.update()
if self.sensor.data.accelerometer_timestamp != self.accel_timestamp_last:
self.accel_timestamp_last = self.sensor.data.accelerometer_timestamp
y = self.sensor.data.accelerometer_m_s2
theta = self.xh[0]
theta_dot = self.xh[1]
theta_ddot = 1 # TODO
yh = np.array([self.l*theta_ddot + self.g*np.sin(theta), 0, self.g*np.cos(theta)])
yh = []
# publish new estimated position
# TODO don't cheat using sim state
self.pos.data.timestamp = int(1e6*t)
self.pos.data.lat = self.sim.data.lat
self.pos.data.lon = self.sim.data.lon
self.pos.data.alt = self.sim.data.alt
self.pos.data.vel_n = self.sim.data.vx
self.pos.data.vel_e = self.sim.data.vy
self.pos.data.vel_d = self.sim.data.vz
self.pos.data.yaw = self.sim.data.yaw
self.pos.publish()
# publish new estimated attitude
self.att.data.timestamp = int(1e6*t)
self.att.data.roll = self.sim.data.roll
self.att.data.pitch = float(self.sim.data.pitch)
self.att.data.yaw = self.sim.data.yaw
self.att.data.rollspeed = self.sensor.data.gyro_rad_s[0]
self.att.data.pitchspeed = self.sensor.data.gyro_rad_s[1]
self.att.data.yawspeed = self.sensor.data.gyro_rad_s[2]
self.att.publish()
class Controller(PeriodicProcess):
def __init__(self, period, uorb_manager):
super(Controller, self).__init__(period)
self.uorb_manager = uorb_manager
self.actuators = uorb.Publication(
self.uorb_manager, 'actuator_outputs',
uorb.Topic_actuator_outputs(0,
output=np.array([0,0,0,0,0,0,0,0]).astype(float),
noutputs=1))
self.actuators.publish()
def initialize(self, t):
super(Controller, self).initialize(t)
# publish actuator topic
self.actuators.data = uorb.Topic_actuator_outputs(int(t*1e6),
output=np.array([0,0,0,0,0,0,0,0]).astype(float))
self.actuators.publish()
# subscribe to vehicle global position topic
self.pos = uorb.Subscription(
self.uorb_manager, 'vehicle_global_position')
self.att = uorb.Subscription(
self.uorb_manager, 'vehicle_attitude')
def run(self, t):
# get new position/attitude estimation
self.pos.update()
self.att.update()
u = -20*self.att.data.pitch - 6*self.att.data.pitchspeed
# publish new actuator controls
self.actuators.data.timestamp = int(t*1e6)
self.actuators.data.output = [u,0,0,0,0,0,0,0]
self.actuators.publish()
First we declare the uORB manager and all processes that will be running. Next, we create the scheduler and pass the list or processes to it. Finally, we run the scheudler.
def run_sim():
# constants
tf = 4
# declare uorb manager, by using pointer copying we can avoid
# copying actual data and make it run much faster, however
# we have to becareful doing this with multi-threading
uorb_manager = uorb.Manager(copy_type='pointer')
# declare periodic processes
controller = Controller(0.02, uorb_manager)
estimator = Estimator(0.002, uorb_manager)
dynamics = PendulumDynamics(0.001, [1,0], uorb_manager)
sensor = Sensor(0.001, uorb_manager)
logger = Logger(0.001, tf,
['sim_state',
'vehicle_global_position',
'vehicle_attitude',
'sensor_combined',
'actuator_outputs'],
uorb_manager)
# declare scheduler
scheduler = PeriodicScheduler(False, 0, tf, 0.001)
scheduler.process_list = [
dynamics, sensor, estimator, controller, logger]
# run the scheduler
scheduler.run()
return nested_dict_to_namedtuple(logger.log)
We can profile the simulation to see what is taking the most time.
stats = %prun -q -r log = run_sim()
stats.sort_stats('tottime').print_stats(20);
302502 function calls in 0.608 seconds Ordered by: internal time List reduced from 134 to 20 due to restriction <20> ncalls tottime percall cumtime percall filename:lineno(function) 3999 0.123 0.000 0.125 0.000 _ode.py:730(run) 3999 0.090 0.000 0.162 0.000 <ipython-input-4-5d4f043927be>:33(run) 3999 0.086 0.000 0.129 0.000 __init__.py:111(run) 3999 0.051 0.000 0.209 0.000 <ipython-input-3-ea772f26b4d7>:26(run) 13926 0.047 0.000 0.047 0.000 {numpy.core.multiarray.array} 15996 0.031 0.000 0.031 0.000 {method 'randn' of 'mtrand.RandomState' objects} 1833 0.027 0.000 0.044 0.000 <ipython-input-5-6fb25091c0a5>:39(run) 11865 0.021 0.000 0.027 0.000 _base.py:57(publish) 30205 0.018 0.000 0.056 0.000 _base.py:100(update) 20000 0.018 0.000 0.565 0.000 __init__.py:35(update) 30205 0.013 0.000 0.022 0.000 _base.py:97(updated) 18272 0.012 0.000 0.016 0.000 _base.py:83(copy) 1 0.010 0.010 0.603 0.603 __init__.py:59(run) 30205 0.010 0.000 0.010 0.000 _base.py:79(updated) 35872 0.009 0.000 0.009 0.000 {method 'keys' of 'dict' objects} 11865 0.007 0.000 0.034 0.000 _base.py:116(publish) 30137 0.007 0.000 0.007 0.000 _base.py:42(_copy_type) 3999 0.006 0.000 0.132 0.000 _ode.py:376(integrate) 1 0.004 0.004 0.025 0.025 __init__.py:123(finalize) 7 0.003 0.000 0.005 0.001 collections.py:288(namedtuple)
If we aren't profiling it is much faster.
%%time
log = run_sim();
CPU times: user 513 ms, sys: 3.6 ms, total: 517 ms Wall time: 515 ms
It is easy to plot any of the various uorb topics.
plt.plot(log.log.t, log.sim_state.pitch);
plt.plot(log.log.t, log.actuator_outputs.output[:,0], 'r');
plt.plot(log.log.t, log.vehicle_attitude.pitch);
plt.plot(log.log.t, log.sensor_combined.gyro_rad_s);
plt.plot(log.log.t, log.sensor_combined.accelerometer_m_s2);