from typing import Iterator, Union
from .testbench import TestBench
from jinja2 import Environment, PackageLoader, select_autoescape
from ltspice import Ltspice
from pandas import read_csv
import numpy as np
import logging
import subprocess
import os
import time
logger = logging.getLogger(__name__)
_template_env = Environment(
loader=PackageLoader("cbadc", package_path="circuit/templates"),
autoescape=select_autoescape(),
trim_blocks=True,
lstrip_blocks=True,
)
[docs]class NGSpiceSimulator(Iterator[np.ndarray]):
"""Simulate a testbench using NGSpice
Parameters
----------
"""
step: float
t_end: float
control_vector: np.ndarray
testbench: TestBench
netlist_filename: str
raw_output_filename: str
raw_file: Ltspice
stdout_filename: str
stderr_filename: str
simulation_time: float
def __init__(
self,
testbench: TestBench,
step: float,
t_end: float,
netlist_filename: str = "ngspice_sim.cir",
raw_output_filename: str = "ngspice_sim.raw",
stdout_filename: str = "ngspice_sim.stdout",
stderr_filename: str = "ngspice_sim.stderr",
ac_freq_range=(1e3, 1e7),
options: dict[str] = {},
):
self.testbench = testbench
self.transient_analysis = {
"start_time": 0.0,
"stop_time": t_end,
"step": step,
"max_step": step / 1.0,
}
self._save_variable_names = []
for terminal_list in testbench.highlighted_terminals.values():
self._save_variable_names.extend(
[terminal.name for terminal in terminal_list]
)
self._save_variable_names.extend(
[
f"Xaf.{term.name}"
for term in self.testbench.Xaf.xp + self.testbench.Xaf.xn
]
)
self.ac_analysis = {
"number_of_points": 1 << 8,
"start_frequency": ac_freq_range[0],
"stop_frequency": ac_freq_range[1],
}
self.options = {
# 'method': 'gear',
# 'itl3': 100,
# 'itl4': 100,
# 'itl5': 10000,
# 'maxord': 6,
"reltol": 1e-4,
# 'abstol': 1e-18,
"opts": None,
"warn": 1,
"ACCT": None,
**options,
}
self.control_vector = np.zeros(
(
int(
np.ceil(
t_end / testbench.Xaf.analog_frontend.digital_control.clock.T
)
),
testbench.Xaf.analog_frontend.analog_system.M,
)
)
self.testbench = testbench
self.netlist_filename = netlist_filename
self.raw_output_filename = raw_output_filename
self.stderr_filename = stderr_filename
self.stdout_filename = stdout_filename
self.simulation_time = 0.0
self._made_netlist = False
self._ran = False
self._parsed = False
[docs] def run(self):
"""Run the simulation"""
cmd = ["ngspice", "-b", "-r", self.raw_output_filename, self.netlist_filename]
start_time = time.time()
completed_process = subprocess.run(cmd, check=True, capture_output=True)
end_time = time.time()
self.simulation_time = time.strftime(
"%H:%M:%S", time.gmtime(end_time - start_time)
)
logger.info(completed_process.args)
logger.debug(completed_process.stdout)
if completed_process.stderr:
logger.error(completed_process.stderr)
with open(self.stdout_filename, "w") as f:
f.write(completed_process.stdout.decode())
with open(self.stderr_filename, "w") as f:
f.write(completed_process.stderr.decode())
self._ran = True
[docs] def get_state_trajectories(self):
"""Get the state trajectories from the simulation"""
time = self.raw_file.get_time()
headers = ["t"]
state_terminal_names = [
f"XAF.{term.name}" for term in self.testbench.Xaf.xp + self.testbench.Xaf.xn
]
headers.extend(state_terminal_names)
data = [
np.array(self.raw_file.get_data(f"V({term_name})"))
for term_name in state_terminal_names
]
return headers, np.vstack((time, *data)).transpose()
# def get_state_observations(self):
# time = self.raw_file.get_time()
# data = [
# np.array(self.raw_file.get_data(f'V(XAF.STILDE{i})'))
# for i in range(self.control_vector.shape[1])
# ]
# u_hat = np.array(self.raw_file.get_data(f'V(IN0)'))
# return np.vstack((time, *data, u_hat)).transpose()
[docs] def parse(self):
"""Parse the simulation results"""
self.raw_file = Ltspice(self.raw_output_filename)
self.raw_file.parse()
# time = raw_file.get_time()
clk = np.array(self.raw_file.get_data("V(CLK)"))
if "control" not in self.testbench.highlighted_terminals:
raise ValueError("No control signals highlighted in testbench")
control_signal_terminals = [
term.name
for term in self.testbench.highlighted_terminals["control"][
: self.testbench.Xaf.analog_frontend.analog_system.M
]
]
vcm = float(self.testbench.Vdd._parameters_dict["dc"])
s_raw = [
np.array(self.raw_file.get_data(f"V({control_signal_terminals[i]})"))
/ (vcm * 2.0)
for i in range(self.control_vector.shape[1])
]
trigger_enable = False
index: int = 0
float_to_ternary = np.vectorize(_float_to_ternary)
for s in zip(clk, *s_raw):
if not trigger_enable and s[0] <= vcm:
trigger_enable = True
if index < self.control_vector.shape[0]:
self.control_vector[index, :] = float_to_ternary(s[1:])
index += 1
elif trigger_enable and s[0] > vcm:
trigger_enable = False
self._control_iterator = iter(self.control_vector)
self._parsed = True
[docs] def save_control_vector(self, filename):
"""Save the control vector to a numpy file"""
self._lazy_initialize()
np.save(filename, self.control_vector)
[docs] def make_netlist(self):
"""Make the netlist for the simulation"""
netlist = _template_env.get_template("ngspice/simulator.cir.j2").render(
{
"testbench": self.testbench.get_ngspice(),
"transient_analysis": self.transient_analysis,
"ac_analysis": self.ac_analysis,
"state_variables": [
(f"Xaf.{x[0]}", f"Xaf.{x[1]}")
for x in zip(self.testbench.Xaf.xp, self.testbench.Xaf.xn)
],
"save_variables": self._save_variable_names,
"options": self.options,
}
)
with open(self.netlist_filename, "w") as f:
f.write(netlist)
self._made_netlist = True
def __iter__(self):
return self
def _lazy_initialize(self):
if not self._made_netlist or not self._ran or not self._parsed:
# Make netlist
self.make_netlist()
# simulate with ngspice
self.run()
# parse simulation result
self.parse()
def __next__(self):
self._lazy_initialize()
return next(self._control_iterator)
[docs] def cleanup(self):
"""Cleanup the simulation files"""
os.remove(self.netlist_filename)
os.remove(self.raw_output_filename)
[docs]class SpectreSimulator(NGSpiceSimulator):
stop: float
strobefreq: float
strobedelay: float
skipdc: bool
cmin: float
def __init__(
self,
testbench: TestBench,
stop: float,
strobefreq: float,
strobedelay: float,
skipdc: bool = False,
cmin: float = 0.0,
netlist_filename: str = "spectre_sim.cir",
raw_output_filename: str = "spectre_sim.raw",
):
self.stop = stop
self.strobedelay = strobedelay
self.strobefreq = strobefreq
self.testbench = testbench
self.skipdc = skipdc
self.cmin = cmin
self.netlist_filename = netlist_filename
self.raw_output_filename = raw_output_filename
# Make netlist
self.make_netlist()
# simulate with ngspice
self.run()
# parse simulation result
self.parse()
[docs] def parse(self):
self.control_vector = read_csv(
self.testbench.control_signal_vector_filename
).to_numpy()[:, : self.control_vector.shape[1]]
self._control_iterator = iter(self.control_vector)
[docs] def make_netlist(self):
testbench_netlist, verilog_ams = self.testbench.get_spectre()
netlist = _template_env.get_template("spectre/simulator.cir.j2").render(
{
"testbench": testbench_netlist,
"cmin": self.cmin,
"stop": self.stop,
"strobedelay": self.strobedelay,
"strobefreq": self.strobefreq,
"skipdc": ["no", "yes"][self.skipdc],
}
)
with open(self.netlist_filename, "w") as f:
f.write(netlist)
with open(self.testbench.verilog_ams_library_name, "w") as f:
f.write(verilog_ams)
[docs] def run(self):
cmd = [
"spectre",
self.netlist_filename,
"-format psfascii",
"-raw",
self.raw_output_filename,
"++aps",
"-ahdllibdir",
os.path.abspath(self.testbench.verilog_ams_library_name),
"-log",
]
completed_process = subprocess.run(cmd, check=True, capture_output=True)
logger.info(completed_process.args)
logger.debug(completed_process.stdout)
if completed_process.stderr:
logger.error(completed_process.stderr)
[docs] def cleanup(self):
super().cleanup()
os.remove(self.testbench.control_signal_vector_filename)
os.remove(self.testbench.verilog_ams_library_name)
def _float_to_ternary(x: float) -> float:
if x > 2.0 / 3.0:
return 1.0
elif x < 1.0 / 3.0:
return 0.0
else:
return 0.5