Source code for cbadc.digital_estimator.nuv_estimator

"""The digital NUV estimator."""
from typing import Iterator
import cbadc
import logging
import scipy.linalg
import scipy.integrate
import numpy as np

logger = logging.getLogger(__name__)


[docs]class NUVEstimator: """The NUV batch estimator The NUV estimator iteratively estimates estimates a filtered version :math:`\hat{\mathbf{u}}(t)` of the input signal :math:`\mathbf{u}(t)` from a sequence of control signals :math:`\mathbf{s}[k]`. In comparision to the linear complexity estimators - :py:class:`cbadc.digital_estimator.BatchEstimator` - :py:class:`cbadc.digital_estimator.ParallelEstimator` - :py:class:`cbadc.digital_estimator.IIRFilter` - :py:class:`cbadc.digital_estimator.FIRFilter` the NUV estimator enforces the bounded outputs in an iterative scheme as described in * `R. Keusch and H.-A. Loeliger, A Binarizing NUV Prior and its Use for M-Level Control and Digital-to-Analog Conversion. <https://arxiv.org/pdf/2105.02599.pdf>`_ Parameters ---------- analog_system: :py:class:`cbadc.analog_system.AnalogSystem` an analog system (necessary to compute the estimators filter coefficients). digital_control: :py:class:`cbadc.digital_control.DigitalControl` a digital control (necessary to determine the corresponding DAC waveform). covU: `float` a prior covariance matrix for the sought estimate. bound_y: `float` the bound enforced on the outputs :math:`\mathbf{y}(t)` of the system. gamma: `float` a scale factor. K1 : `int` batch size. K2 : `int`, `optional` lookahead size, defaults to 0. iterations_per_batch: `int`, `optional` number of iteration steps per batch, defaults to 100. Ts: `float`, `optional` the sampling time, defaults to the time period of the digital control. oversample: `int`, `optional` add observations per control signal, defaults to 1, i.e., no oversampling. Attributes ---------- analog_system: :py:class:`cbadc.analog_system.AnalogSystem` analog system as in :py:class:`cbadc.analog_system.AnalogSystem` or from derived class. digital_control: :py:class:`cbadc.digital_control.DigitalControl` digital control as in :py:class:`cbadc.digital_control.DigitalControl` or from derived class. K1: `int` number of samples per estimate batch. K2: `int` number of lookahead samples per computed batch. covU: `float` the prior covariance matrix (or scalar in the single input case) of the estimate :math:`\hat{\mathbf{u}}(t)`. bound_y: `float` the bound enforced on the outputs :math:`\mathbf{y}(t)` of the system. Ts: `float` the sample rate of the estimates iterations_per_batch: `int` number of iteration steps per batch. gamma: `float` a scale factor. """ def __init__( self, analog_system: cbadc.analog_system.AnalogSystem, digital_control: cbadc.digital_control.DigitalControl, covU: float, bound_y: float, gamma: float, K1: int, K2: int = 0, iterations_per_batch: int = 100, Ts: float = None, oversample: int = 1, ): # Check inputs if K1 < 1: raise Exception("K1 must be a positive integer.") self.K1 = K1 if K2 < 0: raise Exception("K2 must be a non negative integer.") self.K2 = K2 self.K3 = K1 + K2 self.analog_system = analog_system self.covU = covU self.bound_y = bound_y # if not np.allclose(self.analog_system.D, np.zeros_like(self.analog_system.D)): # raise Exception( # """Can't compute filter coefficients for system with non-zero # D matrix. Consider chaining for removing D""" # ) self.digital_control = digital_control if Ts: self.Ts = Ts / oversample else: self.Ts = digital_control.clock.T / oversample self.control_signal = None self._stop_iteration = False self._estimate_pointer = self.K1 self.iterations_per_batch = iterations_per_batch self.gamma = gamma # Initialize filters self._compute_filter_coefficients(analog_system, digital_control) self._allocate_memory_buffers() # Set initial Covariances self._forward_CoVariance[0, :, :] = 1e12 * np.eye(self.analog_system.N) self._oversample = oversample def _allocate_memory_buffers(self): # Allocate memory buffers self._control_signal = np.zeros((self.K3, self.analog_system.M)) self._estimate = np.zeros((self.K1, self.analog_system.L), dtype=np.double) self._control_signal_in_buffer = 0 self._forward_mean = np.zeros((self.K3, self.analog_system.N), dtype=np.double) self._forward_CoVariance = np.zeros( (self.K3, self.analog_system.N, self.analog_system.N), dtype=np.double ) self._G = np.zeros( (self.K3, self.analog_system.N_tilde, self.analog_system.N_tilde), dtype=np.double, ) self._F = np.zeros( (self.K3, self.analog_system.N, self.analog_system.N), dtype=np.double ) self._y_mean = np.zeros((self.K3, self.analog_system.N_tilde)) self._y_CoVariance = np.zeros( (self.K3, self.analog_system.N_tilde, self.analog_system.N_tilde) ) # self._W_tilde = np.zeros( # ( # self.K3, # self.analog_system.N, # self.analog_system.N # ), # dtype=np.double # ) self._xi_tilde = np.zeros((self.K3 + 1, self.analog_system.N), dtype=np.double) self._sigma_squared_1 = np.ones( (self.K3, self.analog_system.N_tilde), dtype=np.double ) self._sigma_squared_2 = np.ones( (self.K3, self.analog_system.N_tilde), dtype=np.double ) self._posterior_observation_mean = np.zeros( (self.K3, self.analog_system.N_tilde), dtype=np.double )
[docs] def set_iterator(self, control_signal_sequence: Iterator[np.ndarray]): """Set iterator of control signals Parameters ----------- control_signal_sequence : iterator a iterator which outputs a sequence of control signals. """ self.control_signal = control_signal_sequence
def __call__(self, control_signal_sequence: Iterator[np.ndarray]): return self.set_iterator(control_signal_sequence) def __iter__(self): return self def __next__(self) -> np.ndarray: # Check if control signal iterator is set. if self.control_signal is None: raise Exception("No iterator set.") # Check if there are estimates in the estimate buffer if self._estimate_pointer < self.K1: temp = np.array(self._estimate[self._estimate_pointer, :], dtype=np.double) self._estimate_pointer += 1 return temp # Check if stop iteration has been raised in previous batch if self._stop_iteration: logger.warning("Warning: StopIteration received by estimator.") raise StopIteration # Otherwise start receiving control signals full = False # Fill up batch with new control signals. while not full: # next(self.control_signal) calls the control signal # iterator and thus recives new control # signal samples try: control_signal_sample = next(self.control_signal) except RuntimeError: self._stop_iteration = True control_signal_sample = np.zeros((self.analog_system.M)) for _ in range(self._oversample): full = self._input(control_signal_sample) # Compute new batch of K1 estimates self._reset_between_batch() self._compute_batch() # adjust pointer to indicate that estimate buffer # is non empty self._estimate_pointer -= self.K1 # recursively call itself to return new estimate return self.__next__() def _input(self, s: np.ndarray) -> bool: if self._control_signal_in_buffer == (self.K3): raise Exception( """Input buffer full. You must compute batch before adding more control signals""" ) for _ in range(self.analog_system.M): self._control_signal[self._control_signal_in_buffer, :] = np.asarray( 2 * s - 1 ) self._control_signal_in_buffer += 1 return self._control_signal_in_buffer > (self.K3 - 1) def _compute_filter_coefficients( self, analog_system: cbadc.analog_system.AnalogSystem, digital_control: cbadc.digital_control.DigitalControl, ): logger.info("Compute filter coefficients.") # Compute filter coefficients self.Af: np.ndarray = np.asarray(scipy.linalg.expm(analog_system.A * self.Ts)) Gamma = np.array(analog_system.Gamma) # Solve IVPs self.Bf: np.ndarray = np.zeros((self.analog_system.N, self.analog_system.M)) atol = 1e-200 rtol = 1e-12 max_step = self.Ts / 1000.0 for m in range(self.analog_system.M): def _derivative_forward_2(t, x): return np.dot(analog_system.A, x) + np.dot( Gamma, digital_control.impulse_response(m, t) ) solBf = scipy.integrate.solve_ivp( _derivative_forward_2, (0, self.Ts), np.zeros(self.analog_system.N), atol=atol, rtol=rtol, max_step=max_step, method="Radau", ).y[:, -1] self.Bf[:, m] = solBf BBT = np.dot(analog_system.B, np.dot(self.covU, analog_system.B.transpose())) def _derivative_input(t, x): t_minus_tau = self.Ts - t A_t_minus_tau = analog_system.A * t_minus_tau return np.dot( scipy.linalg.expm(A_t_minus_tau), np.dot(BBT, scipy.linalg.expm(A_t_minus_tau.transpose())), ).flatten() self.Vu = np.dot( self.covU, scipy.integrate.solve_ivp( _derivative_input, (0, self.Ts), np.zeros(self.analog_system.N**2), atol=atol, rtol=rtol, max_step=max_step, method="Radau", ) .y[:, -1] .reshape((self.analog_system.N, self.analog_system.N)), ) def _compute_batch(self): logger.info("Computing batch.") # check if ready to compute buffer if self._control_signal_in_buffer < self.K3: raise Exception("Control signal buffer not full") self._update_observation_statistics() self._MBF() for _ in range(self.iterations_per_batch): self._update_observation_bound_variances() self._update_observation_statistics() self._MBF() self._input_estimation() # Save forward mean and covariance for next batch self._forward_mean[0, :] = self._forward_mean[self.K1 - 1, :] # Compute posterior mean # self._forward_mean[0, :] = self._forward_mean[self.K1 - 1] - np.dot( # self._forward_CoVariance[self.K1 - 1, :, :], # self._xi_tilde[self.K1 - 1, :] # ) self._forward_CoVariance[0, :, :] = self._forward_CoVariance[self.K1 - 1, :, :] # Compute posterior covariance # self._forward_CoVariance[0, :, :] = self._forward_CoVariance[self.K1 - 1, :, :] - np.dot( # self._forward_CoVariance[self.K1 - 1, :, :], # np.dot( # self._W_tilde[self.K1 - 1, :, :], # self._forward_CoVariance[self.K1 - 1, :, :] # ) # ) self._sigma_squared_1[0, :] = self._sigma_squared_1[self.K1 - 1, :] self._sigma_squared_2[0, :] = self._sigma_squared_2[self.K1 - 1, :] # rotate buffer to make place for new control signals self._control_signal = np.roll(self._control_signal, -self.K1, axis=0) self._control_signal_in_buffer -= self.K1 def _MBF(self): # Forward pass temp_K3 = self.K3 - 1 for k in range(self.K3): self._G[k, :, :] = np.linalg.inv( self._y_CoVariance[k, :, :] + np.dot( self.analog_system.CT, np.dot( self._forward_CoVariance[k, :, :], self.analog_system.CT.transpose(), ), ) ) self._F[k, :, :] = np.eye(self.analog_system.N) - np.dot( self._forward_CoVariance[k, :, :], np.dot( self.analog_system.CT.transpose(), np.dot(self._G[k, :, :], self.analog_system.CT), ), ) if k < temp_K3: self._forward_mean[k + 1, :] = np.dot( self.Af, np.dot(self._F[k, :, :], self._forward_mean[k, :]) + np.dot( self._forward_CoVariance[k, :, :], np.dot( self.analog_system.CT.transpose(), np.dot(self._G[k, :, :], self._y_mean[k, :]), ), ), ) + np.dot(self.Bf, self._control_signal[k, :]) self._forward_CoVariance[k + 1, :, :] = ( np.dot( self.Af, np.dot( np.dot(self._F[k, :, :], self._forward_CoVariance[k, :, :]), self.Af.transpose(), ), ) + self.Vu ) # Backward pass for k in range(self.K3 - 1, -1, -1): if k < temp_K3: xi_tilde_z = np.dot(self.Af.transpose(), self._xi_tilde[k + 1, :]) # W_tilde_z = np.dot( # self.Af.transpose(), # np.dot( # self._W_tilde[k+1, :, :], # self.Af # ) # ) else: # Initialize with zero vector. xi_tilde_z = np.zeros(self.analog_system.N) # W_tilde_z = np.zeros( # (self.analog_system.N, self.analog_system.N)) # self._W_tilde[k, :, :] = np.dot( # self._F[k, :, :], # np.dot( # W_tilde_z, # self._F[k, :, :] # ) # ) + np.dot( # self.analog_system.CT.transpose(), # np.dot( # self._G[k, :, :], # self.analog_system.CT # ) # ) temp = ( np.dot(self.analog_system.CT.transpose(), self._forward_mean[k, :]) - self._y_mean[k, :] ) self._xi_tilde[k, :] = np.dot( self._F[k, :, :].transpose(), xi_tilde_z ) + np.dot( self.analog_system.CT.transpose(), np.dot(self._G[k, :, :], temp) ) # if (k < 5): # print(k, self._xi_tilde[k, :]) def _input_estimation(self): for k in range(self.K1): self._estimate[k, :] = -np.dot( self.covU, np.dot(self.analog_system.B.transpose(), self._xi_tilde[k, :]), ) def _update_observation_bound_variances(self): for k in range(self.K3): self._posterior_observation_mean[k, :] = np.dot( self.analog_system.CT, self._forward_mean[k] - np.dot(self._forward_CoVariance[k, :, :], self._xi_tilde[k, :]), ) # posterior_CoVariance = np.dot( # self.analog_system.CT, # np.dot( # self._forward_CoVariance[k, :, :] - np.dot( # self._forward_CoVariance[k, :, :], # np.dot( # self._W_tilde[k, :, :], # self._forward_CoVariance[k, :, :] # ) # ), # self.analog_system.CT.transpose() # ) # ) min_sigma_squared = 1e-100 for ell in range(self.analog_system.N_tilde): self._sigma_squared_1[k, ell] = max( np.abs(self._posterior_observation_mean[k, ell] - self.bound_y) / self.gamma, min_sigma_squared, ) self._sigma_squared_2[k, ell] = max( np.abs(self._posterior_observation_mean[k, ell] + self.bound_y) / self.gamma, min_sigma_squared, ) def _update_observation_statistics(self): for k in range(self.K3): W1_back = np.diag(1.0 / self._sigma_squared_1[k, :]) W2_back = np.diag(1.0 / self._sigma_squared_2[k, :]) W_back = W1_back + W2_back Wm_back = self.bound_y * np.sum(W1_back, axis=-1) - self.bound_y * np.sum( W2_back, axis=-1 ) self._y_CoVariance[k, :, :] = np.linalg.inv(W_back) self._y_mean[k, :] = np.dot(self._y_CoVariance[k, :, :], Wm_back) def _reset_between_batch(self): self._sigma_squared_1[1:, :] = np.ones( (self.K3 - 1, self.analog_system.N_tilde), dtype=np.double ) self._sigma_squared_2[1:, :] = np.ones( (self.K3 - 1, self.analog_system.N_tilde), dtype=np.double ) def __str__(self): return f"""NUVEstimator is parameterized as \nobservation bound = {self.bound_y:.2f}, \ncovU = {self.covU}, \ngamma = {self.gamma}, \nTs = {self.Ts},\nK1 = {self.K1},\nK2 = {self.K2}, \nand\nnumber of iterations per batch = {self.iterations_per_batch} \nResulting in the filter coefficients\nAf = \n{self.Af}, \nBf = \n{self.Bf}, \nand Vu = \n{self.Vu}. """