Source code for camiba.cs.scenario

# This file is part of Camiba.
#
# Camiba is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Camiba is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Camiba. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
import numpy.random as npr


[docs]class Scenario: """ General Compressed Sensing Scenario This class offers a very convenient but still abstract interface to a standard compressed sensing scenario, which generally consists of a dictionary matrix, a compression matrix and a reconstruction scheme represented by an algorithm. """ def __init__(self, mat_D, mat_C, algo): """ CS Scenario Constructor The user has to specify the dictionary and measurement matrices and an reconstruction algorithm. Parameters ---------- mat_D : ndarray dictionary matrix mat_C : ndarray compression matrix algo : method reconstruction algorithm """ self._num_n, self._num_m = mat_D.shape self._num_c = mat_C.shape[0] self._mat_D = mat_D self._mat_C = mat_C self._mat_A = mat_C.dot(mat_D) self._algo = algo
[docs] def gen_sparse(self, num_s, entries=[], do_complex=False): """ Generate a Sparse Signal This function generates a sparse ground truth signal according to the scenarios dimensions of a given sparsity order and makes use of a certain set of specified elements if provided. Parameters ---------- num_s : int desired sparsity order entries : list list of possible values do_complex : bool whether the elements are complex or not Returns ------- ndarray the sparse vector """ dataType = ["float", "complex"][1 * do_complex] arr_x = np.zeros(self._num_m, dtype=dataType) if entries == []: if do_complex: arr_s = npr.randn(num_s) + 1j*npr.randn(num_s) else: arr_s = npr.randn(num_s) else: arr_s = npr.choice(entries, num_s, replace=True) arr_x[npr.choice(range(self._num_m), num_s, replace=False)] = arr_s
return arr_x
[docs] def gen_sparse_sep(self, num_s, dist, entries=[], do_complex=False): """ Generate a Sparse Signal with Separation Condition This function generates a sparse ground truth signal according to the scenarios dimensions of a given sparsity order and makes use of a certain set of specified elements if provided. Moreover, a separation condition is used to not put support elements closer than this distance. Ultimatively this generates more "well behaved" signals in terms of reconstruction. Parameters ---------- num_s : int desired sparsity order num_dist : int desired separation distance entries : list list of possible values do_complex : bool whether the elements are complex or not Returns ------- ndarray the sparse signal """ dataType = ["float", "complex"][1 * do_complex] arr_x = np.zeros(self._num_m, dtype=dataType) arr_possible = np.array(range(self._num_m), dtype='int') for ii in range(num_s): prob = npr.choice(arr_possible, 1, replace=False) if entries == []: if compl: arr_s = npr.randn(num_s) + 1j*npr.randn(num_s) else: arr_s = npr.randn(num_s) else: arr_x[prob] = npr.choice(entries, 1, replace=False) arr_possible = arr_possible[np.abs(arr_possible - prob) > dist] if len(arr_possible) == 0: print("Could not fit all in! Returning a less sparse x!") return arr_x
return arr_x
[docs] def compress(self, arr_y): """ Apply Compression Parameters ---------- arr_y : ndarray signal to compress Returns ------- ndarray the compressed measurement """
return self._mat_C.dot(arr_y)
[docs] def to_signal(self, arr_x): """ Transform to signal This method takes a sparse vector and multiplies it with the dictionary to get the acutal signal from its sparse representation. Often this is called forward model. Parameters ---------- arr_x : ndarray sparse vector Returns ------- ndarray the signal """
return self._mat_D.dot(arr_x)
[docs] def recover(self, arr_b, args): """ Recover a sparse vector Given a measurement and additional arguments for the recovery method we call this very method and return its result. Parameters ---------- arr_b : ndarray measurement vector args : dict recovery algorithms arguments Returns ------- ndarray the sparse reconstructed vector """
return self._algo(self._mat_A, arr_b, **args)
[docs] def pipeline(self, arr_x): """ Transform to measurment This method takes a sparse vector and multiplies it with the dictionary to get the acutal signal from its sparse representation. Often this is called forward model. Then also the defined compression step is applied and we implemented the whole pipelein Parameters ---------- arr_x : ndarray sparse vector Returns ------- ndarray the compressed measurement """ arr_y = self.to_signal(arr_x) arr_b = self.compress(arr_y)
return self.recover(arr_b)
[docs] def phase_trans_rec(self, num_s, fun_x, args, arr_snr, fun_noise, dct_fun_compare, num_trials ): """ Calculate a phase transition Given the scenario we generate sparse vectors with a certain fixed sparsity level and according to a provided scheme. Then we apply the forward model and the compression and add noise to the compressed measurements, which is generated by a provided noise generating function. Then we aim a reconstructing the original signal from this compressed and noisy data. This is done several times for each level of SNR and after each reconstruction, we compare the reconstruction with respect to one or more provided error metrics. Finally everything is saved and returned in a dictionary where the keys are given by the SNR and the name of the applied error metric. Parameters ---------- num_s : int sparsity level fun_x : method function, which takes the sparsity level as parameter to generate a sparse vector args : dict arguments for the recovery algorithm arr_snr : ndarray all level of SNR to go through fun_noise : ndarray noise generating function, taking the snr and the vector size as arguments dct_fun_compare : dict dictionary of comparison function num_trials : int number of trials to run at each snr level Returns ------- ndarray the compressed measurement """ # dictionary of result with keys from dct_fun_compare dct_res = {} for ii in dct_fun_compare.items(): dct_res.update({ii[0]: np.zeros(( len(arr_snr), # for each snr level num_trials # for each trial ))}) # go through trials and SNR for ii, snr in enumerate(arr_snr): for jj in range(num_trials): # generate ground truth and noisy measurement arrX = fun_x(num_s) arrB = self.compress(arrX) + fun_noise(snr, self._num_c) # do reconstruction arrXEst = self.recover(arrB, args) # apply all the comparison functions and store results for kk in dct_res.items(): key = kk[0] dct_res[key][ii, jj] = dct_fun_compare[key](arrX, arrXEst) # take the mean in each result across all trials for ii in dct_res.items(): key = ii[0] dct_res[key] = np.mean(dct_res[key], axis=1)
return dct_res