Source code for camiba.cs.soe

# This file is part of Camiba.
#
# Camiba is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Camiba is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Camiba. If not, see <http://www.gnu.org/licenses/>.
"""
Here we implement a Class, which provides different methods for sparsity order
estimation from compressed measurements.

for EET see:
http://www.eurasip.org/Proceedings/Eusipco/Eusipco2014/HTML/papers/1569925343.pdf

for EFT see:

"""

import numpy as np
import numpy.random as npr
import numpy.linalg as npl
import os

from ..linalg.basic import proj_sphere
from ..linalg.basic import khatri_rao
from ..linalg import vand
from ..algs import omp
from .scenario import Scenario


[docs]class Soe(Scenario): """ Here we implement a Class, which provides different methods for sparsity order estimation from compressed measurements. Many of them are very good. """ def __init__( self, num_n, num_m, mos_method='eft', *args, algorithm=omp.recover, **kwargs ): """ Construct an SOE Scenario Parameters ---------- num_n : int number of atoms in the dictionary num_m : int number to compress down to mos_method : string, optional method to use for model order selection algorithm : function recovery method **kwargs : dict additional arguments for each MOS method """ # save the MOS algorithm self._mos_method = mos_method if self._mos_method == 'lopes': # evenly divide the measurement self.num_m1 = int(num_m/2) self.num_m2 = num_m - self.num_m1 self._num_gamma = kwargs['num_gamma'] # init measurement matrix matMeasurement = np.zeros((num_m, num_n)) # generate cauchy measurements matMeasurement[:self.num_m1, :] = npr.standard_cauchy( (self.num_m1, num_n) ) * self._num_gamma # generate gaussian measurements matMeasurement[self.num_m1:, :] = npr.randn( self.num_m2, num_n ) * self._num_gamma matMeasurement[:] = proj_sphere(matMeasurement) # set appropriate estimation function self._estimate_function = self._est_lopes elif mos_method == 'ravazzi': # density parameter and variance of normal distribution self._num_gamma = kwargs['num_gamma'] # generate the uniform distribution to decide where there are zeros # in the measurement matrix matUniform = npr.uniform(0, 1, (num_m, num_n)) # generate the normal distributed samples matNormal = (1 / np.sqrt(self._num_gamma)) * \ npr.randn(num_m, num_n) # decide where there are zeros matSubSel = 1 * (matUniform < self._num_gamma) # put the normal distributed elements, where we rolled the # dice correctly matMeasurement = matSubSel * matNormal # set the correct estimation function self._estimate_function = self._est_ravazzi else: # path to store training data to self._buffer_path = kwargs['str_path'] # overlap parameter self._num_p = kwargs['num_p'] # error probability during training self._num_err_prob = kwargs['num_err_prob'] # make scenario complex if we do overlap self._do_complex = ((self._num_p != 0) or (kwargs['do_complex'])) # if we have no overlap, both matrices should be gaussian # if we have overlap one has to be vandermonde and the scenario # itself has to be complex if self._num_p == 0: # find dimensions most suitable self._num_l, self._num_k = self._largest_div(num_m) # create the matrices if self._do_complex: mat_psi = npr.randn(self._num_l, num_n) + \ 1j * npr.randn(self._num_l, num_n) mat_phi = npr.randn(self._num_k, num_n) + \ 1j * npr.randn(self._num_k, num_n) else: mat_psi = npr.randn(self._num_l, num_n) mat_phi = npr.randn(self._num_k, num_n) self._estimate_function = self._nope_overlap else: mat_psi = vand.draw( int(np.ceil(float(num_m)/self._num_p)), num_n, self._buffer_path + "vander_c" ) mat_phi = npr.randn(self._num_p, num_n) + \ 1j * npr.randn(self._num_p, num_n) self._num_l = self._find_block_length(num_m, self._num_p) self._num_k = int((num_m - self._num_l) / self._num_p + 1) # set appropriate estimation function self._estimate_function = self._true_overlap # measurement is the KRP of scaled KRP of vandermonde # where we only take the first num_m rows matMeasurement = proj_sphere( khatri_rao(mat_psi, mat_phi)[:num_m, :] ) if mos_method == 'eft': # generate the array with the helper values self._num_q = min(self._num_l, self._num_k) self._arr_r = self._eft_fetch_arr_r( self._num_q, self._num_l, self._buffer_path+"eft_arr_r_"+str(self._num_l)+"_"+str(self._num_k) ) # fetch the thresholding coefficients self._arr_eta = self._eft_fetch_arr_eta( self._num_l, self._num_k, self._num_err_prob, self._do_complex, self._buffer_path + "eft_arr_eta_" + str(self._num_err_prob) + "_" + int(self._do_complex) * "c_" + str(self._num_l) + "_" + str(self._num_k) ) + 0.4 elif mos_method == 'eet': self._arr_eta = self._eet_fetch_arr_eta( self._num_l, self._num_k, self._num_err_prob, self._do_complex, self._buffer_path + "eet_arr_eta_" + str(self._num_err_prob) + "_" + int(self._do_complex) * "c_" + str(self._num_l) + "_" + str(self._num_k) ) elif mos_method == 'new': self._arr_eta = self._new_fetch_arr_eta( self._num_l, self._num_k, self._num_err_prob, self._do_complex, self._buffer_path + "new_arr_eta_" + str(self._num_err_prob) + "_" + int(self._do_complex) * "c_" + str(self._num_l) + "_" + str(self._num_k) ) # now create the cs scenario Scenario.__init__( self, # yay! np.eye(num_n), # during soe the dictionary is the # identity matrix, i.e. the vector # itself is sparse matMeasurement, algorithm # recovery is done with OMP )
[docs] def estimate( self, arr_b ): """ Do Sparsity Order Estimation This function does the acutal SOE process for a given measurement and returns the estimated size. Parameters ---------- arr_b : ndarray one ore several compressed measurements Returns ------- ndarray the estimated sparsity orders """ if len(arr_b.shape) > 1: return np.apply_along_axis( self._estimate_function, 0, arr_b ) else:
return self._estimate_function(arr_b) def _eft_fetch_arr_r(self, num_m, num_n, str_p): """ Helper function to calculate r and store r helper array""" if os.path.isfile(str_p+'.npy'): return(np.load(str_p+'.npy')) else: arr_r = np.zeros(num_m) # calculate the array for every entry # starting at value 1 for ii in range(1, num_m): num_M = float(ii+1) num_r_old = 1 num_r_new = 0 num_dim_frac = (num_M + float(num_n))/float(num_n*num_M) while abs(num_r_old - num_r_new) > 1e-15: num_r_old = num_r_new num_r_m = num_r_old**num_M num_r_prod = (1.0 - num_r_m)*(1.0 + num_r_old) num_r_new = -(num_dim_frac*num_r_prod - 1.0 - num_r_m*(1.0 - num_r_old)) arr_r[ii] = num_r_new np.save(str_p+'.npy', arr_r) return arr_r def _eft_fetch_arr_eta( self, num_N, num_M, num_err_prob, doComplex, str_p ): """Given dimensions and the desired probability of error, and whether to work complex, we generate training data on noise and save the trained coefficients in str_p""" if os.path.isfile(str_p+'.npy'): return(np.load(str_p+'.npy')) else: print('$SOE$ Starting with EFT training...') # dynamically generate enough trials num_trials = int(round(10.0*1.0/(num_err_prob))) # adapt to complex case if doComplex: ten_trials = (npr.randn(num_N, num_M, num_trials) + 1j*npr.randn(num_N, num_M, num_trials) )/np.sqrt(2.0) else: ten_trials = npr.randn(num_N, num_M, num_trials) # generate values for eta as thresholding parameter res_eta = 10000 grd_eta = np.linspace(-2, 2, res_eta) # matrix for storing number of false alarms mat_FA_prb = np.zeros((res_eta, self._num_q)) # go through all trials for ii in range(0, num_trials): if ii % int(float(num_trials)*0.1) == 0: print("$SOE$ %d%% " % int(float(ii)/num_trials*100)) # reverse array of singular values arr_SV = np.flipud( npl.svd(ten_trials[:, :, ii], compute_uv=0)**2 ) # go through every dimension for jj in range(1, self._num_q+1): jj = int(jj-1) num_sig = np.mean(arr_SV[0:(jj+1)]) num_SV_est = (jj + 1) * \ ((1 - self._arr_r[jj]) / (1 - self._arr_r[jj]**(jj + 1)) )*num_sig num_ratio = (arr_SV[jj] - num_SV_est)/num_SV_est mat_FA_prb[num_ratio > grd_eta, jj - 1] += 1.0 # normalize to probability mat_FA_prb /= float(num_trials) # invert the relation between eta and probability arr_eta = np.zeros(self._num_q) for ii in range(0, self._num_q): ind_eta = 0 while (mat_FA_prb[ind_eta, ii] > num_err_prob and ind_eta + 2 < res_eta): ind_eta += 1 # print(mat_FA_prb[ind_eta,ii]) arr_eta[ii] = grd_eta[ind_eta] np.save(str_p+'.npy', arr_eta) print('$SOE$ Done with EFT training...') return arr_eta def _eet_fetch_arr_eta( self, num_N, num_M, num_err_prob, doComplex, str_p ): """Given dimensions and the desired probability of error, and whether to work complex, we generate training data on noise and save the trained coefficients in str_p""" if os.path.isfile(str_p+'.npy'): return(np.load(str_p+'.npy')) else: print('$SOE$ Starting with EET training...') # dynamically generate enough trials num_trials = int(round(500.0*1.0/(num_err_prob))) # adapt to complex case if doComplex: mat_trials = (npr.randn(self._num_l * self._num_k, num_trials) + 1j*npr.randn(self._num_l * self._num_k, num_trials) )/np.sqrt(2.0) else: mat_trials = npr.randn(num_N, num_M, num_trials) num_P = min(self._num_k, self._num_l) arr_sv = np.empty((num_P, num_trials)) ten_trials = np.empty( (self._num_l, self._num_k, num_trials), dtype=mat_trials.dtype) if self._num_p == 0: for ii in range(num_trials): ten_trials[:, :, ii] = mat_trials[:, ii].reshape( self._num_l, self._num_k) else: for ii in range(num_trials): ten_trials[:, :, ii] = self._reshape_measurement( mat_trials[:, ii]) for ii in range(num_trials): arr_sv[:, ii] = npl.svd(ten_trials[:, :, ii], compute_uv=0) arr_quot = arr_sv[:(num_P-1), :] / arr_sv[1:, :] num_Q = 20000 num_max_ratio = np.max(arr_quot, axis=1) num_tau = num_max_ratio/num_Q arr_P = np.empty(num_Q) arr_eta = np.zeros(num_P - 1) lst_Q = list(range(num_Q)) for ii in range(num_P-1): for qq in range(num_Q-1): arr_P[qq] = np.sum( (arr_quot[ii, :] >= num_tau[ii]*qq) * (arr_quot[ii, :] < num_tau[ii]*(qq+1)) )/num_trials arr_abs = np.abs((1 - np.cumsum(arr_P) + self._num_err_prob)) num_j_eta = np.argmin(arr_abs) arr_eta[ii] = num_j_eta*num_tau[ii] np.save(str_p+'.npy', arr_eta) print('$SOE$ Done with EET training...') return arr_eta def _new_fetch_arr_eta( self, num_N, num_M, num_err_prob, doComplex, str_p ): if os.path.isfile(str_p+'.npy'): return(np.load(str_p+'.npy')) else: print('$SOE$ Starting with NEW training...') # dynamically generate enough trials num_trials = int(round(500.0*1.0/(num_err_prob))) # adapt to complex case if doComplex: mat_trials = 2*(npr.randn(self._num_k, num_trials) + 1j*npr.randn(self._num_k, num_trials) )/np.sqrt(2.0) else: mat_trials = 2*npr.randn(self._num_k, num_trials) num_P = min(self._num_k, self._num_l) arr_sv = np.empty((num_P, num_trials)) ten_trials = np.empty( (self._num_l, self._num_k, num_trials), dtype=mat_trials.dtype) if self._num_p == 0: for ii in range(num_trials): ten_trials[:, :, ii] = mat_trials[:, ii].reshape( self._num_l, self._num_k) else: for ii in range(num_trials): ten_trials[:, :, ii] = self._reshape_measurement( mat_trials[:, ii]) for ii in range(num_trials): arr_sv[:, ii] = npl.svd(ten_trials[:, :, ii], compute_uv=0)**2 arr_ratios = np.empty((num_P - 1, num_trials)) for ii in range(num_P-1): arr_ratios[ii, :] = arr_sv[ii, :] / \ np.mean(arr_sv[(ii+1):, :], 0) arr_eta = np.zeros((num_P - 1, 2)) for ii in range(num_P-1): arr_eta[ii, 0] = np.percentile( arr_ratios[ii, :], 50.0*self._num_err_prob) arr_eta[ii, 1] = np.percentile( arr_ratios[ii, :], 100. - 50.0*self._num_err_prob) np.save(str_p+'.npy', arr_eta) print('$SOE$ Done with NEW training...') return arr_eta def _reshape_measurement( self, vec_b ): # create a now empty matrix with the same datatype as into # where we shape the input into mat_B = np.empty( (self._num_l, self._num_k), dtype=vec_b.dtype ) # do the reshapig with overlap for ii in range(0, self._num_k): mat_B[:, ii] = vec_b[ (ii*self._num_p): (ii*self._num_p + self._num_l) ] return mat_B def _do_eet( self, mat_B ): arr_sv = npl.svd(mat_B, compute_uv=0) N = len(arr_sv) for ii in range(N-1): if (arr_sv[N-ii-2] / arr_sv[N - ii - 1]) > self._arr_eta[N-ii-2]: return N - ii - 1 return 1 def _do_eft( self, mat_B ): # get the singular values arr_SV = npl.svd(mat_B, compute_uv=0)**2 # do the threshold test for jj in range(1, self._num_q+1): jj = int(jj-1) num_sig = np.mean(arr_SV[0:(jj+1)]) num_SV_est = num_SV_est = (jj + 1) * \ ((1 - self._arr_r[jj]) / (1 - self._arr_r[jj]**(jj + 1)) )*num_sig num_ratio = (arr_SV[jj] - num_SV_est)/num_SV_est # use training data to distinguish between noise and actual data if num_ratio >= self._arr_eta[jj]: return (self._num_l - jj) return 0 def _do_new(self, mat_B): # get the singular values arr_SV = npl.svd(mat_B, compute_uv=0)**2 num_P = min(self._num_k, self._num_l) for ii in range(num_P - 1): quot = (arr_SV[num_P - 2 - ii] / np.mean(arr_SV[(num_P - 1 - ii):])) if (quot > self._arr_eta[num_P - 2 - ii, 1] or quot < self._arr_eta[num_P - 2 - ii, 0]): return num_P - 1 - ii return 1 def _nope_overlap(self, vec_b): """ estimation routine that reshapes b without reuing any elements. """ mat_B = vec_b.reshape((self._num_l, self._num_k)) # initiate the model order selection if self._mos_method == 'eft': return self._do_eft(mat_B) elif self._mos_method == 'eet': return self._do_eet(mat_B) elif self._mos_method == 'new': return self._do_new(mat_B) def _true_overlap(self, vec_b): """ estimation routine that reshapes b by reusing some elements according to some overlap """ mat_B = self._reshape_measurement(vec_b) # initiate the model order selection if self._mos_method == 'eft': return self._do_eft(mat_B) elif self._mos_method == 'eet': return self._do_eet(mat_B) elif self._mos_method == 'new': return self._do_new(mat_B) def _largest_div( self, num_n # number to split into factors ): """ Calculates the largest divisor of a natural number num_n """ num_d = np.ceil(np.sqrt(num_n)) while True: if num_n % num_d == 0: return (int(num_d), int(num_n/num_d)) num_d -= 1 def _find_block_length(self, num_m, num_p): """ Finds the optimal block length num_l for given dimensions and block advance """ num_l_init = int(np.ceil((float(num_m) + float(num_p))/(num_p+1.0))) num_l1 = num_l_init num_l2 = num_l_init - 1 while True: if (num_m - num_l1) % num_p == 0: return int(num_l1) if (num_m - num_l2) % num_p == 0: return int(num_l2) num_l1 += 1 num_l2 -= 1 def _est_lopes(self, vec_b): numT1 = np.median(np.abs(vec_b[: self.num_m1])) / self._num_gamma numT2 = np.mean(vec_b[self.num_m1:] ** 2) / (self._num_gamma ** 2) return int(np.round(numT1 ** 2 / numT2)) def _est_ravazzi(self, vec_b): num_s = 10 num_k = 10 arr_ka = np.zeros(num_s) arr_pi = np.zeros((self._num_c, num_s)) arr_al = np.zeros(num_s) arr_be = np.zeros(num_s) arr_pe = np.zeros(num_s) arr_pi[:, 0] = 0.5 arr_al[0] = 5 arr_pe[0] = 0.01 arr_be[0] = 2 for ii in range(num_s - 1): # E-step q1 = (arr_pe[ii]) / np.sqrt(arr_al[ii]) q2 = (1 - arr_pe[ii]) / np.sqrt(arr_be[ii]) e1 = np.exp(- vec_b ** 2 / (2 * arr_al[ii])) e2 = np.exp(- vec_b ** 2 / (2 * arr_be[ii])) arr_pi[:, ii + 1] = (q1 * e1)/(q1 * e1 + q2 * e2) # M-Step sum1 = np.sum(arr_pi[:, ii + 1]) sum2 = np.sum(1 - arr_pi[:, ii + 1]) arr_pe[ii + 1] = sum1 / self._num_k arr_ka[ii + 1] = np.log(arr_pe[ii + 1]) \ / np.log(1 - self._num_gamma) arr_al[ii + 1] = np.inner(arr_pi[:, ii + 1], vec_b ** 2) / sum1 arr_be[ii + 1] = np.inner(1 - arr_pi[:, ii + 1], vec_b ** 2) / sum2 return int(arr_ka[num_s - 1])
[docs] def phase_trans_est(self, num_s, fun_x, arr_snr, fun_noise, dct_fun_compare, num_trials ): """ Calculate a phase transition Given the scenario we generate sparse vectors with a certain fixed sparsity level and according to a provided scheme. Then we apply the forward model and the compression and add noise to the compressed measurements, which is generated by a provided noise generating function. Here, we only estimate the sparsity order and check if it succeeds or fails. This is done several times for each level of SNR and after each reconstruction, we compare the reconstruction with respect to one or more provided error metrics. Finally everything is saved and returned in a dictionary where the keys are given by the SNR and the name of the applied error metric. Parameters ---------- num_s : int sparsity level fun_x : method function, which takes the sparsity level as parameter to generate a sparse vector args : dict arguments for the recovery algorithm arr_snr : ndarray all level of SNR to go through fun_noise : ndarray noise generating function, taking the snr and the vector size as arguments dct_fun_compare : dict dictionary of comparison function num_trials : int number of trials to run at each snr level Returns ------- ndarray the compressed measurement """ dct_res = {} for ii in dct_fun_compare.items(): dct_res.update({ii[0]: np.zeros(( len(arr_snr), # for each snr level num_trials # for each trial ))}) for ii, snr in enumerate(arr_snr): for jj in range(num_trials): # generate ground truth and noisy measurement arrX = fun_x(num_s) arrB = self.compress(arrX) + fun_noise(snr, self._num_c) # estimate the sparsity order num_s_est = self.estimate(arrB) for kk in dct_res.items(): key = kk[0] dct_res[key][ii, jj] = dct_fun_compare[key]( arrX, num_s_est) for ii in dct_res.items(): key = ii[0] dct_res[key] = np.mean(dct_res[key], axis=1)
return dct_res
[docs] def phase_trans_est_rec(self, num_s, fun_x, args, arr_snr, fun_noise, dct_fun_compare, num_trials ): """ Calculate a phase transition Given the scenario we generate sparse vectors with a certain fixed sparsity level and according to a provided scheme. Then we apply the forward model and the compression and add noise to the compressed measurements, which is generated by a provided noise generating function. Then we aim a reconstructing the original signal from this compressed and noisy data, where we also feed the estimated sparsity order, which is provided by this class as a parameter in the algorithm for reconstruction. This is done several times for each level of SNR and after each reconstruction, we compare the reconstruction with respect to one or more provided error metrics. Finally everything is saved and returned in a dictionary where the keys are given by the SNR and the name of the applied error metric. Parameters ---------- num_s : int sparsity level fun_x : method function, which takes the sparsity level as parameter to generate a sparse vector args : dict arguments for the recovery algorithm arr_snr : ndarray all level of SNR to go through fun_noise : ndarray noise generating function, taking the snr and the vector size as arguments dct_fun_compare : dict dictionary of comparison function num_trials : int number of trials to run at each snr level Returns ------- ndarray the compressed measurement """ # dictionary with results dct_res = {} # initialize with keys from compare function for ii in dct_fun_compare.items(): dct_res.update({ii[0]: np.zeros(( len(arr_snr), # for each snr level num_trials # for each trial ))}) # go through SNR and trials for ii, snr in enumerate(arr_snr): for jj in range(num_trials): # generate ground truth and noisy measurement arrX = fun_x(num_s) arrB = self.compress(arrX) + fun_noise(snr, self._num_c) # estimate the sparsity order num_s_est = self.estimate(arrB) # add the estimated order to the params of the recovery # TODO: update it if already present args.update({'num_steps': num_s_est}) if num_s_est > 0: # do recovery with estimated sparsity arr_x_est = self.recover(arrB, args) else: arr_x_est = np.zeros(self._num_m) # write all the results into the appropriate place for kk in dct_res.items(): key = kk[0] dct_res[key][ii, jj] = dct_fun_compare[key]( arrX, arr_x_est) # calculate for ii in dct_res.items(): key = ii[0] dct_res[key] = np.mean(dct_res[key], axis=1)
return dct_res