Source code for cbadc.digital_estimator.batch_estimator

"""The digital batch estimator
"""
from typing import Iterator
import cbadc
from cbadc.digital_estimator._filter_coefficients import (
    compute_filter_coefficients,
    FilterComputationBackend,
)
import cbadc.utilities
import scipy.integrate
import numpy as np
import sympy as sp
import logging

logger = logging.getLogger(__name__)


def _rotation_matrix(phi):
    return np.array([[np.cos(phi), -np.sin(phi)], [np.sin(phi), np.cos(phi)]])


[docs]class BatchEstimator(Iterator[np.ndarray]): """Batch estimator implementation. The digital estimator estimates a filtered version :math:`\hat{\mathbf{u}}(t)` (shaped by :py:func:`signal_transfer_function`) of the input signal :math:`\mathbf{u}(t)` from a sequence of control signals :math:`\mathbf{s}[k]`. Specifically, the estimates are computed as :math:`\overrightarrow{\mathbf{m}}[k] = \mathbf{A}_f \overrightarrow{\mathbf{m}}[k-1] + \mathbf{B}_f \mathbf{s}[k-1]`, :math:`\overleftarrow{\mathbf{m}}[k] = \mathbf{A}_b \overrightarrow{\mathbf{m}}[k+1] + \mathbf{B}_b \mathbf{s}[k]`, and :math:`\hat{\mathbf{u}}(k T) = \mathbf{W}^\mathsf{T}\\left(\overleftarrow{\mathbf{m}}[k] - \overrightarrow{\mathbf{m}}[k]\\right)` where :math:`\mathbf{A}_f, \mathbf{A}_b \in \mathbb{R}^{N \\times N}`, :math:`\mathbf{B}_f, \mathbf{B}_b \in \mathbb{R}^{N \\times M}`, and :math:`\mathbf{W}^\mathsf{T} \in \mathbb{R}^{L \\times N}` are the precomputed filter coefficient based on the choice of :py:class:`cbadc.analog_system.AnalogSystem` and :py:class:`cbadc.digital_control.DigitalControl`. Parameters ---------- analog_system : :py:class:`cbadc.analog_system.AnalogSystem` an analog system (necessary to compute the estimators filter coefficients). digital_control : :py:class:`cbadc.digital_control.DigitalControl` a digital control (necessary to determine the corresponding DAC waveform). eta2 : `float` the :math:`\eta^2` parameter determines the bandwidth of the estimator. K1 : `int` batch size. K2 : `int`, `optional` lookahead size, defaults to 0. stop_after_number_of_iterations : `int` determine a max number of iterations by the iterator, defaults to :math:`2^{63}`. Ts: `float`, `optional` the sampling time, defaults to the time period of the digital control. mid_point: `bool`, `optional` set samples in between control updates, i.e., :math:`\hat{u}(kT + T/2)` , defaults to False. downsample: `int`, `optional` set a downsampling factor compared to the control signal rate, defaults to 1, i.e., no downsampling. solver_type: :py:class:`cbadc.digital_estimator._filter_coefficients.FilterComputationBackend` determine which solver type to use when computing filter coefficients. Attributes ---------- analog_system : :py:class:`cbadc.analog_system.AnalogSystem` analog system as in :py:class:`cbadc.analog_system.AnalogSystem` or from derived class. digital_control : :py:class:`cbadc.digital_control.DigitalControl` digital control as in :py:class:`cbadc.digital_control.DigitalControl` or from derived class. eta2 : float eta2, or equivalently :math:`\eta^2`, sets the bandwidth of the estimator. control_signal : :py:class:`cbadc.digital_control.DigitalControl` a iterator suppling control signals as :py:class:`cbadc.digital_control.DigitalControl`. number_of_iterations : `int` number of iterations until iterator raises :py:class:`StopIteration`. downsample: `int`, `optional` The downsampling factor compared to the rate of the control signal. mid_point: `bool` estimated samples shifted in between control updates, i.e., :math:`\hat{u}(kT + T/2)`. K1 : `int` number of samples per estimate batch. K2 : `int` number of lookahead samples per computed batch. Ts : `float` spacing between samples in seconds. Af : `array_like`, shape=(N, N), readonly The Af matrix Ab : `array_like`, shape=(N, N), readonly The Ab matrix Bf : `array_like`, shape=(N, M), readonly The Bf matrix Bb : `array_like`, shape=(N, M), readonly The Bb matrix WT : `array_like`, shape=(L, N), readonly The W matrix transposed solver_type: :py:class:`cbadc.digital_estimator._filter_coefficients.FilterComputationBackend` The solver used for computing the filter coefficients. Yields ------ `array_like`, shape=(L,) an input estimate sample :math:`\hat{\mathbf{u}}(t)` """ def __init__( self, analog_system: cbadc.analog_system.AnalogSystem, digital_control: cbadc.digital_control.DigitalControl, eta2: float, K1: int, K2: int = 0, stop_after_number_of_iterations: int = (1 << 63), Ts: float = None, mid_point: bool = False, downsample: int = 1, solver_type: FilterComputationBackend = FilterComputationBackend.mpmath, modulation_frequency: float = None, ): # Check inputs if K1 < 1: raise Exception("K1 must be a positive integer.") self.K1 = K1 if K2 < 0: raise Exception("K2 must be a non negative integer.") self.K2 = K2 self.K3 = K1 + K2 self._filter_lag = -1 self.analog_system = analog_system # if not np.allclose(self.analog_system.D, np.zeros_like(self.analog_system.D)): # raise Exception( # """Can't compute filter coefficients for system with non-zero # D matrix. Consider chaining for removing D""" # ) self.digital_control = digital_control if eta2 < 0: raise Exception("eta2 must be non negative.") if Ts: self.Ts = Ts else: self.Ts = digital_control.clock.T self.eta2 = eta2 self.control_signal = None if downsample != 1: raise NotImplementedError( "Downsampling currently not implemented for DigitalEstimator" ) self.number_of_iterations = stop_after_number_of_iterations self._iteration = 0 self._estimate_pointer = self.K1 # For transfer functions self.eta2Matrix = np.eye(self.analog_system.CT.shape[0]) * self.eta2 self._stop_iteration = False self.mid_point = mid_point # Initialize filters self.solver_type = solver_type self._compute_filter_coefficients(analog_system, digital_control, eta2) self._allocate_memory_buffers() self._ntf_lambda = None self._stf_lambda = None # For modulation self._time_index = 0 if modulation_frequency is not None: self._modulation_frequency = modulation_frequency else: self._modulation_frequency = 0
[docs] def filter_lag(self): """Return the lag of the filter. As the filter computes the estimate as --------- | K2 | --------- ^ | u_hat[k] Returns ------- : `int` The filter lag. """ return self._filter_lag
[docs] def warm_up(self, samples=0): """Warm up filter by population control signals. Effectively removes the filter lag. Parameters ---------- samples: `int`, `optional` number of warmup samples, defaults to filter_lag """ logger.debug("Warming up estimator.") if samples > 0: for _ in range(samples): self.__next__() else: while self._filter_lag > 0: _ = self.__next__() self._filter_lag -= 1
[docs] def set_iterator(self, control_signal_sequence: Iterator[np.ndarray]): """Set iterator of control signals Parameters ----------- control_signal_sequence : iterator a iterator which outputs a sequence of control signals. """ self.control_signal = control_signal_sequence
def _compute_filter_coefficients( self, analog_system: cbadc.analog_system.AnalogSystem, digital_control: cbadc.digital_control.DigitalControl, eta2: float, ): logger.info( f"Computing filter coefficients. Using solver type: {self.solver_type.name}" ) Af, Ab, Bf, Bb, WT = compute_filter_coefficients( analog_system, digital_control, eta2, solver_type=self.solver_type, mid_point=self.mid_point, ) self.Af = np.array(Af, dtype=np.float64).reshape( (analog_system.N, analog_system.N) ) self.Bf = np.array(Bf, dtype=np.float64).reshape( (analog_system.N, analog_system.M) ) self.Ab = np.array(Ab, dtype=np.float64).reshape( (analog_system.N, analog_system.N) ) self.Bb = np.array(Bb, dtype=np.float64).reshape( (analog_system.N, analog_system.M) ) self.WT = np.array(WT, dtype=np.float64).reshape( (analog_system.L, analog_system.N) ) def _allocate_memory_buffers(self): # Allocate memory buffers self._control_signal = np.zeros((self.K3, self.analog_system.M)) self._estimate = np.zeros((self.K1, self.analog_system.L), dtype=np.double) self._control_signal_in_buffer = 0 self._mean = np.zeros((self.K1 + 1, self.analog_system.N), dtype=np.double) def _compute_batch(self): logger.info("Computing batch.") temp_forward_mean = np.zeros(self.analog_system.N, dtype=np.double) # check if ready to compute buffer if self._control_signal_in_buffer < self.K3: raise Exception("Control signal buffer not full") # compute lookahead for k1 in range(self.K3 - 1, self.K1 - 1, -1): temp = np.dot(self.Ab, self._mean[self.K1, :]) + np.dot( self.Bb, self._control_signal[k1, :] ) for n in range(self.analog_system.N): self._mean[self.K1, n] = temp[n] # compute forward recursion for k2 in range(self.K1): temp = np.dot(self.Af, self._mean[k2, :]) + np.dot( self.Bf, self._control_signal[k2, :] ) if k2 < self.K1 - 1: for n in range(self.analog_system.N): self._mean[k2 + 1, n] = temp[n] else: for n in range(self.analog_system.N): temp_forward_mean[n] = temp[n] # compute backward recursion and estimate for k3 in range(self.K1 - 1, -1, -1): temp = np.dot(self.Ab, self._mean[k3 + 1, :]) + np.dot( self.Bb, self._control_signal[k3, :] ) temp_estimate = np.dot(self.WT, temp - self._mean[k3, :]) self._estimate[k3, :] = temp_estimate[:] self._mean[k3, :] = temp[:] # reset intital means for n in range(self.analog_system.N): self._mean[0, n] = temp_forward_mean[n] self._mean[self.K1, n] = 0 # rotate buffer to make place for new control signals self._control_signal = np.roll(self._control_signal, -self.K1, axis=0) self._control_signal_in_buffer -= self.K1 def _input(self, s: np.ndarray) -> bool: if self._control_signal_in_buffer == (self.K3): raise Exception( """Input buffer full. You must compute batch before adding more control signals""" ) for m in range(self.analog_system.M): self._control_signal[self._control_signal_in_buffer, :] = np.asarray( 2 * s - 1 ) self._control_signal_in_buffer += 1 return self._control_signal_in_buffer > (self.K3 - 1) def __call__(self, control_signal_sequence: Iterator[np.ndarray]): return self.set_iterator(control_signal_sequence) def __iter__(self): return self def __next__(self) -> np.ndarray: # Check if control signal iterator is set. if self.control_signal is None: raise Exception("No iterator set.") # Check if the end of prespecified size if self.number_of_iterations < self._iteration: raise StopIteration self._iteration += 1 # Check if there are estimates in the estimate buffer if self._estimate_pointer < self.K1: temp = np.array(self._estimate[self._estimate_pointer, :], dtype=np.double) self._estimate_pointer += 1 self._time_index += 1 return temp # Check if stop iteration has been raised in previous batch if self._stop_iteration: logger.warning("Warning: StopIteration received by estimator.") raise StopIteration # Otherwise start receiving control signals full = False # Fill up batch with new control signals. while not full: # next(self.control_signal) calls the control signal # iterator and thus recives new control # signal samples try: control_signal_sample = next(self.control_signal) except RuntimeError: self._stop_iteration = True control_signal_sample = np.zeros((self.analog_system.M)) full = self._input(control_signal_sample) # Compute new batch of K1 estimates self._compute_batch() # adjust pointer to indicate that estimate buffer # is non empty self._estimate_pointer -= self.K1 # recursively call itself to return new estimate return self.__next__()
[docs] def demodulate(self) -> np.ndarray: """Demodulate the received signal. Returns: np.ndarray: Demodulated signal. """ if self.analog_system.L % 2 != 0: raise Exception("L must be an even number.") modulation_matrix = np.kron( _rotation_matrix(-self._modulation_frequency * self._time_index), np.eye(self.analog_system.L // 2), ) return np.dot(modulation_matrix, self.__next__())
def _lazy_initialise_ntf(self): logger.info("Computing analytical noise-transfer function") if self.analog_system._atf_lambda is None: self.analog_system._lazy_initialize_ATF() GH = self.analog_system._atf_s_matrix.transpose().conjugate() GGH = self.analog_system._atf_s_matrix * GH self._ntf_s_matrix = sp.simplify( GH * (GGH + self.eta2 * sp.eye(self.analog_system.N_tilde)).inv() ) self._ntf_lambda = sp.lambdify((self.analog_system.omega), self._ntf_s_matrix)
[docs] def noise_transfer_function(self, omega: np.ndarray): """Compute the noise transfer function (NTF) at the angular frequencies of the omega array. Specifically, computes :math:`\\text{NTF}( \omega) = \mathbf{G}( \omega)^\mathsf{H} \\left( \mathbf{G}( \omega)\mathbf{G}( \omega)^\mathsf{H} + \eta^2 \mathbf{I}_N \\right)^{-1}` for each angular frequency in omega where where :math:`\mathbf{G}(\omega)\in\mathbb{R}^{N \\times L}` is the ATF matrix of the analog system and :math:`\mathbf{I}_N` represents a square identity matrix. Parameters ---------- omega: `array_like`, shape=(K,) an array_like object containing the angular frequencies for evaluation. Returns ------- `array_like`, shape=(L, N_tilde, K) return NTF evaluated at K different angular frequencies. """ result = np.zeros( (self.analog_system.L, self.analog_system.N_tilde, omega.size), dtype=np.complex128, ) # if self._ntf_lambda is None: # self._lazy_initialise_ntf() for index, o in enumerate(omega): G = self.analog_system.transfer_function_matrix(np.array([o])) G = G.reshape((self.analog_system.N_tilde, self.analog_system.L)) GH = G.transpose().conjugate() GGH = np.dot(G, GH) result[:, :, index] = np.abs( np.dot(GH, np.linalg.pinv(GGH + self.eta2Matrix)) ) # result[:, :, index] = self._ntf_lambda(o) return result
def _lazy_initialise_stf(self): logger.info("Computing analytical signal transfer function.") if self._ntf_lambda is None: self._lazy_initialise_ntf() self._stf_s_matrix = sp.simplify( self._ntf_s_matrix * self.analog_system._atf_s_matrix ) self._stf_lambda = sp.lambdify((self.analog_system.omega), self._stf_s_matrix)
[docs] def signal_transfer_function(self, omega: np.ndarray): """Compute the signal transfer function (STF) at the angular frequencies of the omega array. Specifically, computes :math:`\\text{STF}( \omega) = \mathbf{G}( \omega)^\mathsf{H} \\left( \mathbf{G}( \omega)\mathbf{G}( \omega)^\mathsf{H} + \eta^2 \mathbf{I}_N \\right)^{-1} \mathbf{G}( \omega)` for each angular frequency in omega where where :math:`\mathbf{G}(\omega)\in\mathbb{R}^{N \\times L}` is the ATF matrix of the analog system and :math:`\mathbf{I}_N` represents a square identity matrix. Parameters ---------- omega: `array_like`, shape=(K,) an array_like object containing the angular frequencies for evaluation. Returns ------- `array_like`, shape=(L, K) return STF evaluated at K different angular frequencies. """ result = np.zeros((self.analog_system.L, omega.size), dtype=np.complex128) # if self._stf_lambda is None: # self._lazy_initialise_stf() for index, o in enumerate(omega): G = self.analog_system.transfer_function_matrix(np.array([o])).reshape( (self.analog_system.N_tilde, self.analog_system.L) ) GH = G.transpose().conjugate() GGH = np.dot(G, GH) STF_temp = np.dot(np.linalg.pinv(GGH + self.eta2Matrix), G) for l_index in range(self.analog_system.L): result[l_index, index] = np.abs( np.dot(GH[l_index, :], STF_temp[:, l_index]) ) # result[:, index] = self._stf_lambda(o) return result
[docs] def control_signal_transfer_function(self, omega: np.ndarray): """Compute the control signal transfer function at the angular frequencies of the omega array. Specifically, computes :math:`\\begin{pmatrix}\hat{u}_1(\omega) / s_1(\omega) & \\dots & \hat{u}_1(\omega) / s_M(\omega) \\\ \\vdots & \\ddots & \\vdots \\\ \hat{u}_L(\omega) / s_1(\omega) & \\dots & \hat{u}_L(\omega) / s_M(\omega) \\end{pmatrix}= \mathbf{G}( \omega)^\mathsf{H} \\left( \mathbf{G}( \omega)\mathbf{G}( \omega)^\mathsf{H} + \eta^2 \mathbf{I}_N \\right)^{-1} \\bar{\mathbf{G}}( \omega)` for each angular frequency in omega where where :math:`\\bar{\mathbf{G}}( \omega)= \mathbf{C}^\mathsf{T} \\left(\mathbf{A} - i \omega \mathbf{I}_N\\right)^{-1} \mathbf{\\Gamma} \in\mathbb{R}^{N \\times M}` is the transfer function from the control signals to the output and :math:`\mathbf{I}_N` represents a square identity matrix. Parameters ---------- omega: `array_like`, shape=(K,) an array_like object containing the angular frequencies for evaluation. Returns ------- `array_like`, shape=(L, M, K) return STF evaluated at K different angular frequencies. """ result = np.zeros((self.analog_system.L, self.analog_system.M, omega.size)) for index, o in enumerate(omega): G = self.analog_system.transfer_function_matrix(np.array([o])).reshape( (self.analog_system.N_tilde, self.analog_system.L) ) G_bar = self.analog_system.control_signal_transfer_function_matrix( np.array([o]) ).reshape((self.analog_system.N_tilde, self.analog_system.M)) GH = G.transpose().conjugate() GGH = np.dot(G, GH) result[:, :, index] = np.abs( np.dot(GH, np.dot(np.linalg.inv(GGH + self.eta2Matrix), G_bar)) ) return result
[docs] def general_transfer_function(self, omega: np.ndarray): """Compute the general transfer functions from additive sources into each state varible to the estimates. Parameters ---------- omega: `array_like`, shape=(K,) an array_like object containing the angular frequencies for evaluation. Returns ------- `array_like`, shape=(L, N, K) return transfer function from each state N to each input estimate L for each frequency K. """ # shape=(N_tilde, N, K) stf_array = self.analog_system.transfer_function_matrix(omega, general=True) # shape=(L, N_tilde, K) ntf_array = self.noise_transfer_function(omega) return np.einsum('lnk,nxk->lxk', ntf_array, stf_array)
def max_transfer_function_peak(self, BW: np.ndarray): omega = np.geomspace(BW[0], BW[1], 1000) tfs = np.abs(self.general_transfer_function(omega)) ** 2 res = np.max(tfs, axis=2) return res
[docs] def white_noise_balance(self, BW: np.ndarray, max=True): """See the magnitude difference between different noise contributions. Parameters ---------- BW: `array_like`, `shape=(2,)` the upper and lower bandwidth ranges as (BW_low, BW_high). max: `bool`, `optional` If to take the max value of the transferfunction or the squared integrated, defaults to True. Returns ------- : array_like, shape=(L, N) white noise balance. """ if max: omega = np.geomspace(BW[0], BW[1], 1000) tfs = np.abs(self.general_transfer_function(omega)) ** 2 res = np.max(tfs, axis=2) else: def _derivative(omega, x): _omega = np.array([omega]) return ( np.abs(self.general_transfer_function(_omega)).flatten(order="C") ** 2 ) res = ( scipy.integrate.solve_ivp( _derivative, (BW[0], BW[-1]), np.zeros(self.analog_system.L * self.analog_system.N), ) .y[:, -1] .reshape((self.analog_system.L, self.analog_system.N), order="C") ) # for n, sigma_z_2 in enumerate(noise_variances): # integrated_tf[:, n] *= sigma_z_2 # return integrated_tf return np.max(res, axis=1) / res / self.analog_system.N
[docs] def white_noise_sensitivities( self, BW: np.ndarray, target_snr: float, input_power: float = 0.5, max=True, spectrum=False, ) -> np.ndarray: """Compute per node white noise sensitivity Parameters ---------- BW: `array_like`, `shape=(2,)` the upper and lower bandwidth ranges as (BW_low, BW_high). target_snr: `float` the target SNR expressed in a linear scale. input_power: `float`, `optional` the input power expressed in a linear scale, defaults to 1/2. max: `bool`, `optional` If to take the max value of the transferfunction or the squared integrated, defaults to True. spectrum: `bool`, `optional` express the returned noise variance as a power spectral density, i.e., V^2/Hz, for states representing voltages. Defaults to False. Returns ------- : array_like, shape=(L, N) admissable noise power to align with SNR requirement. """ noise_variance = ( np.ones((self.analog_system.L, self.analog_system.N)) * input_power / target_snr ) balances = self.white_noise_balance(BW, max=max) for l in range(self.analog_system.L): for n in range(self.analog_system.N): noise_variance[l, n] *= balances[l, n] if spectrum: return noise_variance / (BW[1] - BW[0]) return noise_variance
def max_harmonic_estimate(self, BW: np.ndarray, SFDR: float): sfdr = cbadc.fom.snr_from_dB(SFDR) num = 1000 _omega = np.logspace(np.log(BW[0]), np.log(BW[-1]), num) tfs = np.abs(self.general_transfer_function(_omega)) max_input = np.max(tfs) # value = temp = np.max(tfs, axis=2) / max_input # this could probably be normalized by somethings like np.linalg.norm(temp) local_harmonics = 1.0 / temp local_harmonics /= np.linalg.norm(local_harmonics, ord=1) return local_harmonics / sfdr def __str__(self): return f"""Digital estimator is parameterized as \neta2 = {self.eta2:.2f}, {10 * np.log10(self.eta2):.0f} [dB], \nTs = {self.Ts},\nK1 = {self.K1},\nK2 = {self.K2}, \nand\nnumber_of_iterations = {self.number_of_iterations} \nResulting in the filter coefficients\nAf = \n{self.Af}, \nAb = \n{self.Ab}, \nBf = \n{self.Bf}, \nBb = \n{self.Bb}, \nand WT = \n{self.WT}."""
[docs] def save(self, filename: str): """Pickle object for later use. Uses :py:func:`cbadc.utilities.pickle_load` to save object for later use. Parameters ---------- filename: `str` filename to save object to. """ cbadc.utilities.pickle_dump(self, filename)