import nengo import numpy as np import matplotlib.pyplot as plt from matplotlib.ticker import MaxNLocator from scipy.stats import mannwhitneyu %matplotlib inline class HilbertCurve(object): # Implementation based on http://en.wikipedia.org/w/index.php?title=Hilbert_curve&oldid=633637210 def __init__(self, n): self.n = n self.n_corners = (2 ** n) ** 2 self.corners = np.zeros((self.n_corners, 2)) self.steps = np.arange(self.n_corners) steps = np.arange(self.n_corners) for s in 2 ** np.arange(n): r = np.empty_like(self.corners, dtype='int') r[:, 0] = 1 & (steps // 2) r[:, 1] = 1 & (steps ^ r[:, 0]) self._rot(s, r) self.corners += s * r steps //= 4 self.corners /= (2 ** n) - 1 def _rot(self, s, r): swap = r[:, 1] == 0 flip = np.all(r == np.array([1, 0]), axis=1) self.corners[flip] = (s - 1 - self.corners[flip]) self.corners[swap] = self.corners[swap, ::-1] def __call__(self, u): step = np.asarray(u * len(self.steps)) return np.vstack(( np.interp(step, self.steps, self.corners[:, 0]), np.interp(step, self.steps, self.corners[:, 1]))).T master_seed = 1298 # Seed for generation of individual seeds for trials n_trials = 50 # Number of trials hc = HilbertCurve(n=4) # Increase n to cover the input space more densly dt = 0.001 # Simulation time step (seconds) duration = 5. # Simulation duration (seconds) wait_duration = 0.5 # Duration (seconds) to wait in the beginning to have stable representation of the initial values. def gen_seeds(size): return np.random.RandomState(master_seed).randint(nengo.utils.numpy.maxint, size=size) seeds = gen_seeds(n_trials) # Random number generator seeds for reproducibility def stimulus_fn(t): return np.squeeze(hc(t / duration).T * 2 - 1) ts = np.linspace(0, duration, duration / dt) inp = stimulus_fn(ts) plt.figure(figsize=(8, 3)) plt.subplot(1, 2, 1, aspect='equal') plt.plot(*inp) plt.title("Bechmark input") plt.xlabel("$x_1$") plt.ylabel("$x_2$") plt.xlim(-1.1, 1.1) plt.ylim(-1.1, 1.1) plt.subplot(1, 2, 2) plt.plot(ts, np.prod(inp, axis=0)) plt.title("Desired output") plt.xlabel("t [s]") plt.ylabel("y"); from nengo.utils.numpy import rmse class Benchmark(object): def __init__(self, seed, create_mult_network, test_config, synapse=None, n_neurons=150, n_eval_points=1000): self.n_neurons = n_neurons self.n_eval_points = n_eval_points self.seed = seed model = nengo.Network(seed=seed) with model: model.config[nengo.Probe].synapse = synapse model.config[nengo.Connection].synapse = synapse stimulus = nengo.Node(output=lambda t: stimulus_fn(max(0., t - wait_duration)), size_out=2) with test_config: self.probe_test = create_mult_network(self.n_neurons, self.n_eval_points, stimulus) ens_direct = nengo.Ensemble(1, dimensions=2, neuron_type=nengo.Direct()) result_direct = nengo.Node(size_in=1) nengo.Connection(stimulus, ens_direct) nengo.Connection(ens_direct, result_direct, function=lambda x: x[0] * x[1], synapse=None) self.probe_direct = nengo.Probe(result_direct) self.sim = nengo.Simulator(model) self.sim.run(duration + wait_duration, progress_bar=False) self.trange = self.sim.trange()[self.sim.trange() > wait_duration] - wait_duration self.output_test = self.sim.data[self.probe_test][self.sim.trange() > wait_duration] self.output_direct = self.sim.data[self.probe_direct][self.sim.trange() > wait_duration] def rmse(self): return rmse(self.output_test, self.output_direct) def plot(self): ax = plt.gca() ax.plot(self.trange, self.output_test, label="Test") ax.plot(self.trange, self.output_direct, label="Direct") ax.legend(loc='best') ax.set_xlabel("t [s]") ax.set_ylabel("y") ax.set_xlim(0., 1.) ax.set_ylim(-1.1, 1.1) def repeated_benchmark(*args, **kwargs): rmses = np.empty(len(seeds)) with nengo.utils.progress.ProgressTracker(len(seeds), True) as progress: for i, s in enumerate(seeds): b = Benchmark(s, *args, **kwargs) rmses[i] = b.rmse() if i == 0: b.plot() progress.step() print "mean RMSE:", np.mean(rmses) print "median RMSE:", np.median(rmses) print "standard deviation of RMSE:", np.std(rmses) return rmses def naive_multiplication(n_neurons, n_eval_points, stimulus): # Note that we set the radius to sqrt(2.) to make sure all possible input values # are within the representational range of the ensemble. (Both multiplicands # could be 1, giving a distance of sqrt(1²+1²) to the origin.) ens = nengo.Ensemble(n_neurons, dimensions=2, radius=np.sqrt(2.), n_eval_points=n_eval_points) result = nengo.Node(size_in=1) nengo.Connection(stimulus, ens) nengo.Connection(ens, result, function=lambda x: x[0] * x[1], synapse=None) return nengo.Probe(result) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) rmse_naive = repeated_benchmark(naive_multiplication, config) def compare(before, after): print "Improvement by {0:.0f}% (p < {1:.3f}).".format( (1. - np.mean(after) / np.mean(before)) * 100., np.ceil(1000. * 2. * mannwhitneyu(before, after)[1]) / 1000.) def radius(phi): return 1. / np.cos((phi + np.pi / 4.) % (np.pi / 2.) - np.pi / 4.) phis = np.linspace(0., 2. * np.pi, 100) ax = plt.subplot(1, 1, 1, polar=True) ax.plot(phis, radius(phis)) def mult_err(phi, encoders=[]): if len(encoders) > 0: cos_sin_psi = np.cos(encoders) * np.sin(encoders) diff = np.maximum(0., np.cos(np.subtract.outer(encoders, phi))) neuron_outputs = cos_sin_psi[:, np.newaxis] * diff ** 2. else: neuron_outputs = np.zeros((0, np.asarray(phi).size)) return radius(phi) ** 6. / 6. * (np.cos(phi) * np.sin(phi) - np.sum(neuron_outputs, axis=0)) ** 2. phis = np.linspace(0., 2. * np.pi, 100) ax = plt.subplot(1, 1, 1, polar=True) ax.plot(phis, mult_err(phis)) ax.set_rmax(0.35) ax.yaxis.set_major_locator(MaxNLocator(3)) encoders = [np.pi / 4., 3. / 4. * np.pi, 5. / 4. * np.pi, 7. / 4. * np.pi] plt.figure(figsize=(20, 8)) for i in range(1, len(encoders) + 1): ax = plt.subplot(1, len(encoders) + 1, i + 1, polar=True) ax.plot(phis, mult_err(phis, encoders=encoders[:i])) ax.set_rmax(0.35) ax.yaxis.set_major_locator(MaxNLocator(3)) ax.set_title("{0} encoders".format(i)) diag_encoders = nengo.dists.Choice(np.array([[1, 1], [1, -1], [-1, -1], [-1, 1]]) / np.sqrt(2.)) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) config[nengo.Ensemble].encoders = diag_encoders rmse_diag_encoders = repeated_benchmark(naive_multiplication, config) compare(rmse_naive, rmse_diag_encoders) plt.subplot(1, 1, 1, aspect='equal') plt.gca().add_artist(plt.Circle([0, 0], radius=np.sqrt(2), fill=False, color='b')) plt.gca().add_artist(plt.Rectangle([-1, -1], 2, 2, fill=False, color='r')) plt.xlabel("$x_1$") plt.ylabel("$x_2$") plt.xlim(-1.5, 1.5) plt.ylim(-1.5, 1.5); plt.title("Input domain and sampling shape") config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) config[nengo.Ensemble].eval_points = nengo.dists.Uniform(-1 / np.sqrt(2.), 1 / np.sqrt(2.)) rmse_square = repeated_benchmark(naive_multiplication, config) compare(rmse_naive, rmse_square) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) config[nengo.Ensemble].encoders = diag_encoders config[nengo.Ensemble].eval_points = nengo.dists.Uniform(-1 / np.sqrt(2.), 1 / np.sqrt(2.)) rmse_square_diag_encoders = repeated_benchmark(naive_multiplication, config) compare(rmse_diag_encoders, rmse_square_diag_encoders) n_samples = 10000 seed = 23987 ep1 = nengo.dists.UniformHypersphere().sample(n_samples, 2, rng=np.random.RandomState(seed)) * np.sqrt(2.) ep2 = nengo.dists.Uniform(-1, 1).sample(n_samples, 2, rng=np.random.RandomState(seed)) encoder = np.array([1, 1]) / np.sqrt(2.) n_bins = 50 plt.hist(np.dot(ep1, encoder), alpha=0.5, bins=n_bins, label='circle') plt.hist(np.dot(ep2, encoder), alpha=0.5, bins=n_bins, label='square') plt.legend(loc='best'); plt.title("Projected evaluation points"); class DiamondDist(nengo.dists.Distribution): def sample(self, n, d, rng=np.random): uniform = nengo.dists.Uniform(-2., 2.) samples = np.empty((n, d)) for i in range(n): samples[i] = uniform.sample(1, d) while np.sum(np.abs(samples[i])) > 2.: samples[i] = uniform.sample(1, d) return samples samples = DiamondDist().sample(500, 2, rng=np.random.RandomState(seed)) plt.subplot(1, 1, 1, aspect='equal') plt.gca().add_artist(plt.Rectangle([-1, -1], 2, 2, fill=False, color='r')) plt.xlabel("$x_1$") plt.ylabel("$x_2$") plt.title("Points sampled from a diamond") plt.xlim(-2., 2.) plt.ylim(-2., 2.); plt.scatter(samples[:, 0], samples[:, 1], marker='+') n_samples = 10000 ep = DiamondDist().sample(n_samples, 2, rng=np.random.RandomState(seed)) encoder = np.array([1, 1]) / np.sqrt(2.) n_bins = 50 plt.hist(np.dot(ep, encoder), alpha=0.5, bins=n_bins) plt.title("Projected evaluation points"); config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) config[nengo.Ensemble].encoders = diag_encoders config[nengo.Ensemble].eval_points = DiamondDist() rmse_diamond_diag_encoders = repeated_benchmark(naive_multiplication, config) compare(rmse_diag_encoders, rmse_diamond_diag_encoders) def two_ens_multiplication(n_neurons, n_eval_points, stimulus): ens1 = nengo.Ensemble(n_neurons // 2, dimensions=1, radius=np.sqrt(2.), n_eval_points=n_eval_points) ens2 = nengo.Ensemble(n_neurons // 2, dimensions=1, radius=np.sqrt(2.), n_eval_points=n_eval_points) nengo.Connection(stimulus, ens1, transform=np.array([[1, 1]]) / np.sqrt(2.)) nengo.Connection(stimulus, ens2, transform=np.array([[1, -1]]) / np.sqrt(2.)) result = nengo.Node(size_in=1) nengo.Connection(ens1, result, transform=0.5, function=np.square, synapse=None) nengo.Connection(ens2, result, transform=-0.5, function=np.square, synapse=None) return nengo.Probe(result) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) rmse_two_ens = repeated_benchmark(two_ens_multiplication, config) compare(rmse_diag_encoders, rmse_two_ens) seeds = gen_seeds(300) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) config[nengo.Ensemble].encoders = diag_encoders rmse_diag_encoders2 = repeated_benchmark(naive_multiplication, config) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].neuron_type = nengo.LIFRate() config[nengo.Connection].solver = nengo.solvers.LstsqL2(reg=0.01) rmse_two_ens2 = repeated_benchmark(two_ens_multiplication, config) seeds = gen_seeds(n_trials) # Reset number of trials compare(rmse_diag_encoders2, rmse_two_ens2) synapse = nengo.Lowpass(0.005) config = nengo.Config(nengo.Connection, nengo.Ensemble) rmse_naive_spiking = repeated_benchmark(naive_multiplication, config, synapse=synapse) config = nengo.Config(nengo.Connection, nengo.Ensemble) config[nengo.Ensemble].encoders = diag_encoders rmse_diag_encoders_spiking = repeated_benchmark(naive_multiplication, config, synapse=synapse) config = nengo.Config(nengo.Connection, nengo.Ensemble) rmse_two_ens_spiking = repeated_benchmark(two_ens_multiplication, config, synapse=synapse) print "naive vs. diagonal encoders:" compare(rmse_naive_spiking, rmse_diag_encoders_spiking) print "diagonal encoders vs. alternative network:" compare(rmse_diag_encoders_spiking, rmse_two_ens_spiking) print "naive vs. alternative network:" compare(rmse_naive_spiking, rmse_two_ens_spiking) import matplotlib import scipy import sys print "Python", sys.version print "Nengo", nengo.version.version print "NumPy", np.version.version print "SciPy", scipy.version.version print "Matplotlib", matplotlib.__version__