#!/usr/bin/env python # coding: utf-8 # # Solving BPDN$_\infty$ for quantized Compressed sensing via the cvxopt package # # $\newcommand{\eps}{\varepsilon}$ # $\newcommand{\bR}{\mathbb{R}}$ # $\newcommand{\bZ}{\mathbb{Z}}$ # $\newcommand{\1}{{\rm 1}\kern-0.24em{\rm I}}$ # $\newcommand{\inr}[1]{\bigl< #1 \bigr>}$ # This notebooks is a simulations support for Section~7 of paper # # > Sjoerd, Dirksen and Guillaume, LecuĂ© and Holger, Rauhut, "ON THE GAP BETWEEN RIP-PROPERTIES AND SPARSE RECOVERY CONDITIONS" # # We refer the reader to paper for more details # In[1]: from __future__ import division import time get_ipython().run_line_magic('pylab', 'inline') # # Import cvxopt library for solving our optimization problem # In[2]: from cvxopt import solvers, matrix, spdiag, log, spmatrix, sparse # writting '%pylab inline' after those lines will cause trouble in the matrix method solvers.options['show_progress'] = False # # Solving BPDN$_\infty$ via linear programming # # Given a measurements matrix $A\in\bR^{m\times n}$ and $y=A\hat x + e$ where $||e||_\infty\leq \eps$, the Basis Pursuit Denoising program BPDN$_\infty$ is the procedure # # $$# min\big( ||t||_1: ||At-y||_\infty\leq \eps\big) \hspace{2cm} (P). #$$ # # This procedure can be recast as a linear program by introducing slack variables $z^+,z^-\in\bR^n$: problem~(P) is equivalent to # # > $$\min_{z^+,z^-\in\bR^n} \sum_{j=1}^n z_i^+ + z_i^-$$ # # > subject to $$-\eps\leq [A|-A]\left[\begin{array}{c} z^+\\ z^-\end{array}\right]-y\leq \eps$$ # # > and $$\left[\begin{array}{c} z^+\\ z^-\end{array}\right]\geq 0$$ # # This is a linear program: # # > $$\min_{\left[\begin{array}{c} z^+\\ z^-\end{array}\right]\in\bR^{2n}} \inr{a,\left[\begin{array}{c} z^+\\ z^-\end{array}\right]}$$ # # > subject to $$M \left[\begin{array}{c} z^+\\ z^-\end{array}\right] \leq b$$ # # where # # $$a = \left[\begin{array}{c} \1 \\ \1 \end{array}\right],\hspace{1cm} M = \left[\begin{array}{c}[A|-A]\\ [-A|A]\\ [-I_n|0]\\ [0|-I_n] \end{array}\right],\hspace{1cm} b = \left[\begin{array}{c}y + \eps \1\\ -y+\eps\1 \\ 0 \\ 0 \end{array}\right]$$ # # Solution to (P) is recovered via $t= z^+-z^-$. # ### Construction of cvxopt matrices $a,M,b$ # Given the measurement matrix $A\in\bR^{m\times n}$, the vector of measures $y\in\bR^m$ and a $\ell_\infty$ bound $\eps$ on the noise, we construct cvxopt matrices $a,M,b$ as in the Linear Programming formulation of BPDN$_\infty$. # In[3]: def cvx_mat(A, y, eps): '''A, y: numpy array or cvx matrices eps : positive real number''' A = matrix(A) y = matrix(y) m, n = matrix(A).size # matrix a a = matrix(ones(2*n)) # matrix M I_n = spdiag([1]*n) z_n = spdiag([0]*n) M = matrix([[A,-A, -I_n, z_n],[-A, A, z_n, -I_n]]) # matrix b un_m = matrix(ones(m)) zero_n = matrix(zeros(n)) b = matrix([y + eps*un_m, -y + eps*un_m, zero_n, zero_n]) return a, M, b # ### Construction of sparse signals in $\bR^n$ and noisy measurements in $\bR^m$ # We construct signals with sparsity given by *sparsity*, with a randomly chosen support in $\{1,\ldots,n\}$ and with none zero coefficients equal to $1$. # # We construct two types of measures: # # 1. measures are obtained by $y_i= \inr{A_{i,\cdot},signal} + e$ where $e$ is a variable chosen randomly in $[-\eps,\eps]$ # 2. measures are obtained by quantization; that is $y_i$ is equal to the closest point in $\eps\bZ$ of $\inr{A_{i,\cdot},signal}$ # In[4]: def signal(n, sparsity): sel = random.permutation(n) sel = sel[0:sparsity] # indices of the nonzero elements of xsharp xsharp = zeros(n) xsharp[sel] = 1 return xsharp def measures(A, signal, eps): m, n = A.shape e = random.uniform(-eps, eps, (m,1)) return [sum(a) for a in zip(dot(A, signal), e)] def measures_quantized(A, signal, eps): y = dot(A, signal) if eps>0: y_q = [int(ele/eps)*eps for ele in y] else: y_q = y return y_q # ### cvxopt linear solver cvx lp # ####First try exact reconstruction by setting $\eps=0$ # In[9]: m , n, sparsity, eps = 15, 50, 3, 0 A, x_hat = randn(m, n), signal(n, sparsity) y = measures(A, x_hat, eps) # In[10]: a, M, b = cvx_mat(A, y, eps) #print(a, M, b) # In[11]: sol = solvers.lp(a, M, b) sol = sol['x'] x_recover = sol[0:n] - sol[n:2*n] # In[12]: norm(matrix(x_hat) - x_recover,2) # ####Then try reconstruction from noisy measurements (i.e. noise level $\eps>0$) # In[13]: m , n, sparsity, eps = 10, 40, 3, 0.1 A, x_hat = randn(m, n), signal(n, sparsity) # random noise # In[14]: y = measures(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) # In[15]: sol = solvers.lp(a, M, b) sol = sol['x'] x_recover = sol[0:n] - sol[n:2*n] norm(matrix(x_hat) - x_recover,2) # quantized measurements # In[16]: y_quant = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y_quant, eps) # In[17]: sol = solvers.lp(a, M, b) sol = sol['x'] x_recover = sol[0:n] - sol[n:2*n] norm(matrix(x_hat) - x_recover,2) # # Phase transition diagram for BPDN$_\infty$ # # We construct two types of phase diagram: # # 1. One where the number of success is counted. We say that the reconstruction is a success when $||x_{hat}-x_{recover}||_2\leq 20\eps + 0.001$ (where *signal = $x_{hat}$*) # 2. One where the errors $||x_{hat}-x_{recover}||_2$ are added for every pixel of the phase transition matrix (we don't have to decide wheither it is a succes or a failure) # ####We start with a phase transition diagram where successes are counted; and for random uniform noise # In[18]: def dist(x_hat, sol): n = len(x_hat) x_recover = sol[0:n] - sol[n:2*n] return norm(matrix(x_hat) - x_recover,2) # In[19]: def phase_transition_mat(n, eps, nbtest): """return a n.n/2 matrix with the number of reconstruction success for every 1\leq m \leq n measurements and sparsity 1\leq sparsity \leq n/2 n : ambiant dimension of the signals eps : infinite norm of the additive noise (= twice the size of cells in CS quantization) nbtest : number of tests for each pixel""" PTM = zeros((n,int(n/2))) for m in range(1,n+1):#construct one line of the Phase transition matrix for a given number of measurements m if (m % 20) == 0: print("line number {} done".format(m)) A = randn(m,n) / sqrt(m) for sparsity in range(1,min(m+1, int(n/2))+1): nb_success = 0 for i in range(nbtest): x_hat = signal(n, sparsity) y = measures(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] if dist(x_hat, sol) <= 20*eps+0.001: nb_success = nb_success + 1 PTM[m-1, sparsity-1] = nb_success return PTM # In[20]: def frontier(mat, nbtest): """construction of the phase transition frontier, i.e. first time the number of success goes below nbtest/2""" L = [] n = len(mat) for s in range(int(n/2)): m = 0 while mat[m,s] nbtest*(20*eps+1) and m nb_curves: number of phase transition curves constructed. Those curves are then averaged to "smooth" the effect of randomness in phase transition and get a "stable" phase transition # In[32]: n, eps, nbtest, nb_curves = 40, 0.1, 10, 5 L = zeros(int(n/2)) start = time.time() for i in range(nb_curves): if (i % 10) == 0: print('step {} done'.format(i)) mat = phase_transition_mat_sum_quant(n, eps, nbtest) F = frontier_sum(mat, eps, nbtest) L = [sum(a) for a in zip(L,F)] L_gauss = [i/nb_curves for i in L] print('{} seconds'.format(time.time()-start)) # In[33]: #For next use, save the Gaussian phase transition frontier with pickle #import pickle #filename = 'L_gauss_n_{}_eps_{}.p'.format(n, eps) #Save L_gauss #with open(filename, 'wb') as fp: # pickle.dump(L_gauss, fp) #Load L_gauss #filename = 'L_gauss_100_eps_0.1.p' #with open(filename, 'rb') as fp: # L_gauss = pickle.load(fp) # ### Draw the phase transition frontier for quantized CS for Gaussian measurements # (it took 38664.542408 seconds to get it for n, eps, nbtest, nb_curves = 100, 0.1, 15, 40) # In[34]: X = range(len(L_gauss)) plot(X,L_gauss, linewidth=4, color = 'blue', label="Gaussian phase transition") titre = "Quantized CS - Gaussian measurements" plt.title(titre) plt.xlabel('sparsity') plt.ylabel('number of measurements') plt.legend(loc=4) #filename = 'gaussian_phase_transition_quantized_cs_{}_eps_{}.png'.format(n, eps) #plt.savefig(filename,bbox_inches='tight') # ###Phase transition curves for $\Psi_\alpha$ variables # In[35]: def mat_exp_power(m, n, alpha): A = randn(m, n)/ sqrt(m) return np.multiply(np.power(np.absolute(A),int(2/alpha)), sign(A)) # In[36]: def phase_transition_mat_sum_quant_exp_power(n, eps, nbtest, alpha): PTM = zeros((n,int(n/2))) for m in range(1,n+1):#construct one line of the Phase transition matrix for a given number of measurements m if (m % 20) == 0: print("line number {} done".format(m)) A = mat_exp_power(m, n, alpha) for sparsity in range(1, int(n/2)+1): sum_errors = 0 for i in range(nbtest): x_hat = signal(n, sparsity) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] sum_errors = sum_errors + dist(x_hat, sol) PTM[m-1, sparsity-1] = sum_errors return PTM # In[37]: n, eps, nbtest, nb_curves = 50, 0.1, 10, 5 L = zeros(int(n/2)) dict_power_quant_cs = {2: L, 1.5: L, 1: L, 0.5: L}# note that for alpha<0.5 cvxopt solver fails #dict_power_quant_cs = {2: L, 1.8: L, 1.5: L, 1.3: L, 1: L, 0.8: L, 0.5: L} for alpha in dict_power_quant_cs.keys(): print('exp power {} running'.format(alpha)) for i in range(nb_curves): mat = phase_transition_mat_sum_quant_exp_power(n, eps, nbtest, alpha) F = frontier_sum(mat, eps, nbtest) dict_power_quant_cs[alpha] = [sum(a) for a in zip(dict_power_quant_cs[alpha], F)] dict_power_quant_cs[alpha] = [ele/nb_curves for ele in dict_power_quant_cs[alpha]] # In[38]: #save the dictionary of phase transitions curves to speed up later investigation #import pickle #filename = 'dictionnaries_quantized_cs_power_n_{}_eps_{}.p'.format(n, eps) #with open(filename, 'wb') as fp: # pickle.dump(dict_power_quant_cs, fp) #To load the dictionary #with open(filename, 'rb') as fp: # dict_power_quant_cs = pickle.load(fp) # In[40]: X = range(int(n/2)) plt.figure(figsize=(8,6)) for key in dict_power_quant_cs.keys(): if key !=2: L = dict_power_quant_cs[key] text = 'exp power {}'.format(key) plot(X, L, label = text) plt.xlabel('sparsity') plt.ylabel('number of measurements') plt.title('phase transition curves - quantized CS - exponential moments') #Gaussian phase transition #n_gauss = len(L_gauss) #X_gauss = range(n_gauss) #plot(X_gauss, L_gauss, 'r--', linewidth=3, label="Gaussian phase transition") #plot(X, L_gauss[0:int(n/2)], 'r--', linewidth=3, label="Gaussian phase transition") #plt.legend(loc=4) #filename = "phase_transition_curves_quant_cs_n_{}_eps_{}.png".format(n, eps) #plt.savefig(filename, bbox_inches='tight') # ###Phase transition curves for Student variables # In[41]: def mat_power(m, n, p): return random.standard_t(p, size=(m, n)) # In[42]: def phase_transition_mat_sum_quant_student(n, eps, nbtest, p): PTM = zeros((n,int(n/2))) for m in range(1,n+1):#construct one line of the Phase transition matrix for a given number of measurements m if (m % 20) == 0: print("line number {} done".format(m)) A = mat_power(m, n, p) for sparsity in range(1, int(n/2)+1): sum_errors = 0 for i in range(nbtest): x_hat = signal(n, sparsity) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] sum_errors = sum_errors + dist(x_hat, sol) PTM[m-1, sparsity-1] = sum_errors return PTM # In[43]: n, eps, nbtest, nb_curves = 40, 0.1, 10, 5 L = zeros(int(n/2)) #dict_quant_cs_student = {30: L, 20: L, 15: L, 10: L, 5: L, 4: L, 3: L, 2: L} dict_quant_cs_student = {30: L, 20: L} for p in dict_quant_cs_student.keys(): print('power {} running'.format(p)) for i in range(nb_curves): mat = phase_transition_mat_sum_quant_student(n, eps, nbtest, p) F = frontier_sum(mat, eps, nbtest) dict_quant_cs_student[p] = [sum(a) for a in zip(dict_quant_cs_student[p], F)] dict_quant_cs_student[p] = [ele/nb_curves for ele in dict_quant_cs_student[p]] # In[333]: #save the dictionary of phase transitions curves to speed up later investigation #import pickle #filename = 'dictionnaries_quantized_cs_student_n_{}_eps_{}.p'.format(n, eps) #with open(filename, 'wb') as fp: # pickle.dump(dict_quant_cs_student, fp) #To load the dictionary #with open(filename, 'rb') as fp: # dict_quant_cs_student = pickle.load(fp) # In[372]: X = range(int(n/2)) plt.figure(figsize=(8,6)) for key in dict_quant_cs_student.keys(): if key != 15: L = dict_quant_cs_student[key] text = 'degree {}'.format(key) plot(X, L, label = text) plt.xlabel('sparsity') plt.ylabel('number of measurements') plt.title('phase transition curves - quantized CS - Student variables') #Gaussian phase transition #n_gauss = len(L_gauss) #X_gauss = range(n_gauss) plot(X, L_gauss[0:int(n/2)], 'r--', linewidth=3, label="Gaussian phase transition") plt.legend(loc=4) #filename = "phase_transition_curves_quant_cs_student_n_{}_eps_{}.png".format(n, eps) #plt.savefig(filename, bbox_inches='tight') # #Reconstruction quality of BPDN$_\infty$ for large $n=1024$ # # Construction of phase transition diagrams may take a very long time in high dimensions (large values of $n$). That is the reason why we did not explore values of $n$ larger than $80$ (which is not high-dimensional). # # In this last section, we explore a larger dimension $n=1024$. We consider the same simulation setup as the one in paper # Dequantizing Compressed sensing from L. Jacques, D.K. Hammond and J.M. Fadili: # # 1. $$SNR(signal, signal_{reconstruct}) = 20*\log_{10}\Big(\frac{||signal||_2}{||signal-signal_{reconstruct}||}_2\Big)$$ # 2. eps : size of bins in CS quantization equal to ||A signal||_\infty/40 # 3. $n=1024$, sparsity = 16, nbtest = 500 # 4. A coefficient is satisfying QC when # $$|(A signal_{reconstruct})_i-y_i|\leq eps/2$$ # In[22]: def signal_gauss(n, sparsity): sel = random.permutation(n) sel = sel[0:sparsity] x = zeros(n) x[sel] = randn(sparsity) return x def SNR(x_hat, sol): n = len(x_hat) x_recover = sol[0:n] - sol[n:2*n] return 20*log10(norm(x_hat,2)/norm(matrix(x_hat) - x_recover,2)) def QC(A, y, sol, eps): minus_x_recover = sol[n:2*n] - sol[0:n] pred = dot(A, minus_x_recover) list_QC = [abs(sum(ele)) <= eps/2 for ele in zip(pred, y)] return sum(list_QC)/len(y) # In[36]: def SNR_gauss_quant_cs(n, sparsity, nbtest, list_ratio): """Return a list of the average SNR(x_hat, sol) for different ratio m/sparsity n : ambiant dimension of the signals sparsity : sparsity of signal nbtest : number of tests for point""" list_SNR = [] list_SNR_std = [] list_QC = [] list_measures = [ele*sparsity for ele in list_ratio] for m in list_measures: print("measurement {} running".format(m)) A = randn(m,n) sum_snr = 0 sum_snr_square = 0 sum_QC = 0 for i in range(nbtest): x_hat = signal_gauss(n, sparsity) eps = float(max(abs(dot(A,x_hat)))/40) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] sum_snr = sum_snr + SNR(x_hat, sol) sum_snr_square = sum_snr_square + SNR(x_hat, sol)**2 sum_QC = sum_QC + QC(A, y, sol, eps) list_SNR.append(sum_snr/nbtest) list_SNR_std.append(sqrt(sum_snr_square/nbtest-(sum_snr/nbtest)**2)) list_QC.append(sum_QC/nbtest) return list_SNR, list_SNR_std, list_QC # In[37]: n, sparsity, nbtest, list_ratio = 1024, 16, 100, [10, 15 , 20, 25, 30, 35 , 40] start = time.time() list_SNR_gauss, list_SNR_std_gauss, list_QC_gauss = SNR_gauss_quant_cs(n, sparsity, nbtest, list_ratio) print('{} seconds'.format(time.time()-start)) # In[80]: import pickle #list_SNR_gauss_quant_cs = [list_SNR_gauss, list_SNR_std_gauss, list_QC_gauss] #filename = 'snr_quant_gauss_n_{}_sparsity_{}_nbtest_{}.p'.format(n, sparsity, nbtest) #with open(filename, 'wb') as fp: # pickle.dump(list_SNR_gauss_quant_cs, fp) #To load the list_SNR #filename = 'snr_quant_gauss_n_1024_sparsity_16_nbtest.p' with open(filename, 'rb') as fp: list_SNR_gauss_quant_cs = pickle.load(fp) list_SNR_gauss = list_SNR_gauss_quant_cs[0] list_SNR_std_gauss = list_SNR_gauss_quant_cs[1] list_QC_gauss = list_SNR_gauss_quant_cs[2] # In[157]: #plot(list_ratio, list_SNR_gauss, color='blue', marker='*', markersize=15, label='Gaussian measurements')#marker='^', 'o', '8', 'H', '+', 'x' #plt.legend(loc = 4) #plt.xlabel('m/s') #plt.ylabel('SNR (dB)') #plt.title('Quality of BPDN$\infty$ - quantized CS - Gaussian measurements') #filename = 'snr_quant_gauss_n_{}_sparsity_{}.png'.format(n, sparsity) #plt.savefig(filename, bbox_inches='tight') # In[160]: #plot(list_ratio, list_SNR_std_gauss, color='blue', marker='*', markersize=15, label='Gaussian measurements') #plt.legend(loc = 4) #plt.xlabel('m/s') #plt.ylabel('SNR (dB)') #plt.title('Standard deviation SNR of BPDN$\infty$') #plt.legend(loc = 1) #filename = 'snr_std_quant_gauss_n_{}_sparsity_{}.png'.format(n, sparsity) #plt.savefig(filename, bbox_inches='tight') # In[164]: #plot(list_ratio, list_QC_gauss, color='blue', marker='*', markersize=15, label='Gaussian measurements') #plt.xlabel('m/s') #plt.ylabel('Average QC fraction') #plt.title('QC fraction of BPDN$\infty$ -- Gaussian measurements') #plt.legend(loc = 4) #filename = 'snr_QC_quant_gauss_n_{}_sparsity_{}.png'.format(n, sparsity) #plt.savefig(filename, bbox_inches='tight') # In[81]: plt.figure(figsize=(15, 5)) subplot(131) plot(list_ratio, list_SNR_gauss, color='blue', marker='*', markersize=15, label='Gaussian measurements')#marker='^', 'o', '8', 'H', '+', 'x' plt.xlabel('m/s') plt.ylabel('SNR (dB)') plt.title('mean SNR') plt.legend(loc = 4) subplot(132) plot(list_ratio, list_SNR_std_gauss, color='blue', marker='*', markersize=15, label='Gaussian measurements')#marker='^', 'o', '8', 'H', '+', 'x' plt.xlabel('m/s') plt.ylabel('SNR (dB)') plt.title('standard deviation SNR') subplot(133) plot(list_ratio, list_QC_gauss, color='blue', marker='*', markersize=15, label='Gaussian measurements') plt.xlabel('m/s') plt.ylabel('Average QC fraction') plt.title('Fraction of coefficient satisfying QC') #filename = 'snr_subplots_quant_gauss_n_{}_sparsity_{}.png'.format(n, sparsity) #plt.savefig(filename, bbox_inches='tight') # ####Histogram of residuals for Gaussian measurements ($m=40*sparsity$) # In[201]: def SNR_gauss_quant_cs_hist(n, sparsity, nbtest): """Return a list of the (normalized) prediction error (residuals) (A signal_reconstruct-y)_i n : ambiant dimension of the signals sparsity : sparsity of signal nbtest : number of tests for point""" m = 40*sparsity list_residue_avg = zeros(m) A = randn(m,n)/sqrt(m) for i in range(nbtest): x_hat = signal_gauss(n, sparsity) eps = float(max(abs(dot(A,x_hat)))/40) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] minus_x_recover = sol[n:2*n] - sol[0:n] pred = dot(A, minus_x_recover) list_residue = [sum(ele)/eps for ele in zip(pred, y)] list_residue_avg = list_residue_avg + list_residue return list_residue_avg # In[202]: n, sparsity, nbtest = 100, 4, 10#1024, 16, 100 start = time.time() list_hist = SNR_gauss_quant_cs_hist(n, sparsity, nbtest) print('{} seconds'.format(time.time()-start)) # In[198]: #filename = "list_hist_gauss_n_{}_sparsity_{}_nbtest_{}.p".format(n, sparsity, nbtest) #with open(filename, 'wb') as fp: # pickle.dump(list_hist, fp) #with open(filename, 'rb') as fp: # pickleist_hist = pickle.load(fp) # In[203]: plt.hist(list_hist)#, bins=100, normed=True) plt.axvline(x = 1/2)#, 'r--')#, linewidth=4) plt.axvline(x = -1/2)#, 'r--')#, linewidth=4) #filename = "list_hist_gauss_n_{}_sparsity_{}_nbtest_{}.png".format(n, sparsity, nbtest) #plt.savefig(filename, bbox_inches='tight') # ###SNR for exponential power measurements matrices # In[41]: def mat_exp_power(m, n, alpha): A = randn(m, n)/ sqrt(m) return np.multiply(np.power(np.absolute(A),int(2/alpha)), sign(A)) # In[42]: def SNR_exp_power_quant_cs(n, sparsity, nbtest, list_ratio, list_exp_power): """Return a list of the average SNR(x_hat, sol) for different ratio m/sparsity n : ambiant dimension of the signals sparsity : sparsity of signal nbtest : number of tests for point""" dict_SNR_exp_power = {} list_measures = [ele*sparsity for ele in list_ratio] for alpha in list_exp_power: print("----- power {} running".format(alpha)) list_SNR = [] list_SNR_std = [] list_QC = [] for m in list_measures: print("measurement {} running".format(m)) A = mat_exp_power(m, n, alpha) sum_snr = 0 sum_snr_square = 0 sum_QC = 0 for i in range(nbtest): x_hat = signal_gauss(n, sparsity) eps = float(max(abs(dot(A,x_hat)))/40) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] sum_snr = sum_snr + SNR(x_hat, sol) sum_snr_square = sum_snr_square + SNR(x_hat, sol)**2 sum_QC = sum_QC + QC(A, y, sol, eps) list_SNR.append(sum_snr/nbtest) list_SNR_std.append(sqrt(sum_snr_square/nbtest-(sum_snr/nbtest)**2)) list_QC.append(sum_QC/nbtest) dict_SNR_exp_power[alpha] = [list_SNR, list_SNR_std, list_QC] return dict_SNR_exp_power # In[43]: n, sparsity, nbtest, list_ratio, list_exp_power = 1024, 16, 100, [10, 15 , 20, 25, 30, 35 , 40], [2, 1.5, 1, 0.5] dict_SNR_exp_power = SNR_exp_power_quant_cs(n, sparsity, nbtest, list_ratio, list_exp_power) # In[44]: #import pickle #filename = "dict_SNR_exp_power_n_{}_sparsity_{}_nbtest_{}.p".format(n, sparsity, nbtest) #with open(filename, 'wb') as fp: # pickle.dump(dict_SNR_exp_power, fp) #with open(filename, 'rb') as fp: # dict_SNR_exp_power = pickle.load(fp) # In[61]: marker = ['v', '+', '.', 'o', '*'] plt.figure(figsize=(15, 5)) subplot(131) ind = 0 for key in dict_SNR_exp_power.keys(): L = dict_SNR_exp_power[key][0] text = 'power {}'.format(key) plot(list_ratio, L, marker = marker[ind], markersize=5, label=text) ind = ind + 1 plt.xlabel('m/s') plt.ylabel('SNR (dB)') plt.title('mean SNR') plt.legend(loc = 4) subplot(132) ind = 0 for key in dict_SNR_exp_power.keys(): L = dict_SNR_exp_power[key][1] text = 'power {}'.format(key) plot(list_ratio, L, marker = marker[ind], markersize=5, label=text) ind+=1 plt.xlabel('m/s') plt.ylabel('SNR (dB)') plt.title('standard deviation SNR') subplot(133) ind = 0 for key in dict_SNR_exp_power.keys(): L = dict_SNR_exp_power[key][2] text = 'power {}'.format(key) plot(list_ratio, L, marker = marker[ind], markersize=5, label=text) ind+=1 plt.xlabel('m/s') plt.ylabel('Average QC fraction') plt.title('Fraction of coefficient satisfying QC') #filename = 'snr_subplots_quant_exp_power_n_{}_sparsity_{}_nbtest_{}.png'.format(n, sparsity, nbtest) #plt.savefig(filename, bbox_inches='tight') # ###SNR for Student measurements matrices # In[64]: def mat_power(m, n, p): return random.standard_t(p, size=(m, n)) # In[67]: def SNR_student_quant_cs(n, sparsity, nbtest, list_ratio, list_degrees): """Return a list of the average SNR(x_hat, sol) for different ratio m/sparsity n : ambiant dimension of the signals sparsity : sparsity of signal nbtest : number of tests for each point""" dict_SNR_student = {} list_measures = [ele*sparsity for ele in list_ratio] start = time.time() for p in list_degrees: print("---- degree {} running -- {} seconds".format(p, time.time()-start)) list_SNR = [] list_SNR_std = [] list_QC = [] for m in list_measures: print("measurement {} running".format(m)) A = mat_power(m, n, p) sum_snr = 0 sum_snr_square = 0 sum_QC = 0 for i in range(nbtest): x_hat = signal_gauss(n, sparsity) eps = float(max(abs(dot(A,x_hat)))/40) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] sum_snr = sum_snr + SNR(x_hat, sol) sum_snr_square = sum_snr_square + SNR(x_hat, sol)**2 sum_QC = sum_QC + QC(A, y, sol, eps) list_SNR.append(sum_snr/nbtest) list_SNR_std.append(sqrt(sum_snr_square/nbtest-(sum_snr/nbtest)**2)) list_QC.append(sum_QC/nbtest) dict_SNR_student[p] = [list_SNR, list_SNR_std, list_QC] return dict_SNR_student # In[68]: n, sparsity, nbtest, list_ratio, list_degrees = 1024, 16, 200, [10, 15 , 20, 25, 30, 35 , 40], [2, 3, 5, 10, 20] dict_SNR_student = SNR_student_quant_cs(n, sparsity, nbtest, list_ratio, list_degrees) # In[69]: #import pickle #filename = "dict_SNR_student_n_{}_sparsity_{}_nbtest_{}.p".format(n, sparsity, nbtest) #with open(filename, 'wb') as fp: # pickle.dump(dict_SNR_student, fp) #with open(filename, 'rb') as fp: # dict_SNR_student = pickle.load(fp) # In[83]: plt.figure(figsize=(15, 5)) marker = ['v', '+', '.', 'o', '*'] subplot(131) ind = 0 for key in dict_SNR_student.keys(): L = dict_SNR_student[key][0] text = 'degree {}'.format(key) plot(list_ratio, L, marker = marker[ind], label=text) ind+=1 plot(list_ratio, list_SNR_gauss , label='Gaussian measures') plt.xlabel('m/s') plt.ylabel('SNR (dB)') plt.title('mean SNR') subplot(132) ind = 0 for key in dict_SNR_student.keys(): L = dict_SNR_student[key][1] text = 'degree {}'.format(key) plot(list_ratio, L, marker = marker[ind], label=text) ind+=1 plot(list_ratio, list_SNR_std_gauss , label='Gaussian measures') plt.xlabel('m/s') plt.ylabel('SNR (dB)') plt.title('standard deviation SNR') subplot(133) ind = 0 for key in dict_SNR_student.keys(): L = dict_SNR_student[key][2] text = 'degree {}'.format(key) plot(list_ratio, L, marker = marker[ind], label=text) ind+=1 plot(list_ratio, list_QC_gauss , label='Gaussian measures') plt.xlabel('m/s') plt.ylabel('Average QC fraction') plt.title('Fraction of coefficient satisfying QC') plt.legend(loc = 4) filename = 'snr_subplots_quant_student__n_{}_sparsity_{}.png'.format(n, sparsity) plt.savefig(filename, bbox_inches='tight') # ###histogram of residuals for Student variables # In[ ]: def mat_power(m, n, p): return random.standard_t(p, size=(m, n)) # In[ ]: def SNR_student_quant_cs_hist(n, sparsity, nbtest, student_degree): """Return a list of the (normalized) prediction error (residuals) (A signal_reconstruct-y)_i n : ambiant dimension of the signals sparsity : sparsity of signal nbtest : number of tests for point""" m = 40*sparsity list_residue_avg = zeros(m) A = mat_power(m, n, student_degree) for i in range(nbtest): x_hat = signal_gauss(n, sparsity) eps = float(max(abs(dot(A,x_hat)))/40) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] minus_x_recover = sol[n:2*n] - sol[0:n] pred = dot(A, minus_x_recover) list_residue = [sum(ele)/eps for ele in zip(pred, y)] list_residue_avg = [sum(ele) for ele in zip(list_residue_avg, list_residue)] return [ele/nbtest for ele in list_residue_avg] # In[ ]: n, sparsity, nbtest, student_degree = 100, 5, 10, 4 start = time.time() list_student_hist = SNR_student_quant_cs_hist(n, sparsity, nbtest, student_degree) print('{} seconds'.format(time.time()-start)) # In[ ]: #filename = "list_hist_student_n_{}_sparsity_{}_nbtest_{}_degree_{}.p".format(n, sparsity, nbtest, student_degree) #with open(filename, 'wb') as fp: # pickle.dump(list_student_hist, fp) #with open(filename, 'rb') as fp: # list_student_hist = pickle.load(fp) # In[ ]: plt.hist(list_student_hist, bins=40, normed=True) axvline(x = 1/2, linewidth=4, 'r--') axvline(x = -1/2, linewidth=4,'r--') #filename = "list_hist_student_n_{}_sparsity_{}_nbtest_{}_degree_{}.png".format(n, sparsity, nbtest, student_degree) #plt.savefig(filename, bbox_inches='tight') # #Phase transition diagrams for correlated measurements # In this last section, we consider gaussian vectors with correlated entries as measurements vectors. # In[51]: def covariance_mat(n, p): """covariance matrix with off-diagonal coefficients equal p and diagonal coefficients equal 0""" return (1-p)*identity(n) + p*ones((n,n)) def mat_gauss_corr(n, m, p): mean = zeros(n) cov = covariance_mat(n,p) return random.multivariate_normal(mean, cov, m) # In[52]: def phase_transition_mat_sum_quant_gauss_corr(n, eps, nbtest, p): PTM = zeros((n,int(n/2))) for m in range(1,n+1):#construct one line of the Phase transition matrix for a given number of measurements m if (m % 20) == 0: print("line number {} done".format(m)) A = mat_gauss_corr(n, m, p) for sparsity in range(1, int(n/2)+1): sum_errors = 0 for i in range(nbtest): x_hat = signal(n, sparsity) y = measures_quantized(A, x_hat, eps) a, M, b = cvx_mat(A, y, eps) sol = solvers.lp(a, M, b) sol = sol['x'] sum_errors = sum_errors + dist(x_hat, sol) PTM[m-1, sparsity-1] = sum_errors return PTM # In[53]: n, eps, nbtest, nb_curves = 100, 0.1, 10, 10 L = zeros(int(n/2)) dict_gauss_corr_quant_cs = {0: L, 0.05: L, 0.1: L, 0.2: L}# note that for alpha<0.5 cvxopt solver fails #dict_power_quant_cs = {2: L, 1.8: L, 1.5: L, 1.3: L, 1: L, 0.8: L, 0.5: L} for p in dict_gauss_corr_quant_cs.keys(): print('correlation parameter {} running'.format(p)) for i in range(nb_curves): mat = phase_transition_mat_sum_quant_gauss_corr(n, eps, nbtest, p) F = frontier_sum(mat, eps, nbtest) dict_gauss_corr_quant_cs[p] = [sum(a) for a in zip(dict_gauss_corr_quant_cs[p], F)] dict_gauss_corr_quant_cs[p] = [ele/nb_curves for ele in dict_gauss_corr_quant_cs[p]] # In[54]: #save the dictionary of phase transitions curves to speed up later investigation #import pickle #filename = 'dictionnaries_quantized_cs_gauss_corr_n_{}_eps_{}.p'.format(n, eps) #with open(filename, 'wb') as fp: # pickle.dump(dict_gauss_corr_quant_cs, fp) #To load the dictionary #with open(filename, 'rb') as fp: # dict_gauss_corr_quant_cs = pickle.load(fp) # In[57]: X = range(int(n/2)) plt.figure(figsize=(8,6)) for key in sort(list(dict_gauss_corr_quant_cs.keys())): L = dict_gauss_corr_quant_cs[key] text = 'p = {}'.format(key) plot(X, L, label = text) plt.xlabel('sparsity') plt.ylabel('number of measurements') plt.title('phase transition curves - quantized CS - correlated measurements') #Gaussian phase transition #n_gauss = len(L_gauss) #X_gauss = range(n_gauss) #plot(X_gauss, L_gauss, 'r--', linewidth=3, label="Gaussian phase transition") #plot(X, L_gauss[0:int(n/2)], 'r--', linewidth=3, label="Gaussian phase transition") plt.legend(loc=4) filename = "phase_transition_curves_quant_cs_gauss_corr_n_{}_eps_{}.png".format(n, eps) plt.savefig(filename, bbox_inches='tight') # In[ ]: