Source code for andes.variables.dae

import logging
from collections import OrderedDict
from typing import List, Union

import numpy as np

from andes.core import JacTriplet
from andes.core.var import BaseVar, ExtVar
from andes.shared import jac_names, pd, spmatrix

logger = logging.getLogger(__name__)


# TODO: Separate array data and Triplets

[docs]class DAETimeSeries: """ DAE time series data. """
[docs] def __init__(self, dae=None): self.dae = dae # accessible attributes self._public = ['t', 'x', 'y', 'z', 'xy', 'txyz', 'df_x', 'df_y', 'df_z', 'df_xy', 'df_xyz'] # internal dict storage self._xs = OrderedDict() self._ys = OrderedDict() self._zs = OrderedDict() self._fs = OrderedDict() self._hs = OrderedDict() self._is = OrderedDict() self.idx_ptr = 0 # index pointer to the beginning of data that should be written
[docs] def unpack_np(self, attr, warn_empty=True): """ Unpack dict data into numpy arrays. """ n_steps = len(self._ys) if attr is None or 't' in attr: self.t = np.array(list(self._ys.keys())) def _dict2array(src, dest): """ Helper function to convert data stord in a dict to an array. """ if len(self.__dict__[src]): nx = len(self.__dict__[src][self.t[0]]) else: nx = 0 self.__dict__[dest] = np.zeros((n_steps, nx)) if len(self.__dict__[src]) > 0: for ii, val in enumerate(self.__dict__[src].values()): self.__dict__[dest][ii, :] = val pairs = (('_xs', 'x'), ('_ys', 'y'), ('_zs', 'z'), ('_fs', 'f'), ('_hs', 'h'), ('_is', 'i')) for a, b in pairs: if attr is None or b in attr: _dict2array(a, b) if attr is None or attr == 'xy': self.xy = np.hstack((self.x, self.y)) if attr is None or attr == 'txy': self.txy = np.hstack((self.t.reshape((-1, 1)), self.x, self.y)) if attr is None or attr == 'txyz': self.txyz = np.hstack((self.t.reshape((-1, 1)), self.x, self.y, self.z)) if n_steps == 0: if warn_empty: logger.warning("TimeSeries does not contain any time stamp.") return False return True
[docs] def unpack(self, df=False, attr=None, warn_empty=True): """ Unpack dict-stored data into arrays and/or dataframes. Parameters ---------- df : bool True to construct DataFrames `self.df` and `self.df_z` (time-consuming). attr : str, optional Attribute name to unpack. If None, unpack all. Returns ------- True when done. """ self.unpack_np(attr=attr, warn_empty=warn_empty) if df is True: self.unpack_df(attr=attr) return True
[docs] def unpack_df(self, attr): """ Construct pandas dataframes. """ uxname = self.dae.x_name_output uyname = self.dae.y_name_output uzname = self.dae.z_name if attr is None or 'x' in attr: self.df_x = pd.DataFrame.from_dict(self._xs, orient='index', columns=uxname) if attr is None or 'y' in attr: self.df_y = pd.DataFrame.from_dict(self._ys, orient='index', columns=uyname) if attr is None or 'z' in attr: self.df_z = pd.DataFrame.from_dict(self._zs, orient='index', columns=uzname) if attr is None or attr == 'df_xy': self.df_xy = pd.concat((self.df_x, self.df_y), axis=1) if attr is None or attr == 'df_xyz': self.df_xyz = pd.concat((self.df_x, self.df_y, self.df_z), axis=1) return True
[docs] def get_data(self, base_vars: Union[BaseVar, List[BaseVar]], *, a=None, rhs: bool = False,): """ Get time-series data, either for a variable or for the equation associated with the variable. Parameters ---------- base_var : BaseVar or a sequence of BaseVar(s) The variable types and internal addresses are used for looking up the data. a : an array/list of int or None Sub-indices into the address of `base_var`. Applied to each variable. Returns ------- np.ndarray A two-dimensional array. Each row corresponds to one time step. Each column corresponds to a different different variable. """ out = np.zeros((len(self.t), 0)) if isinstance(base_vars, BaseVar): base_vars = (base_vars, ) for base_var in base_vars: if base_var.n == 0: logger.info("Variable <%s.%s> does not contain any element.", base_var.owner.class_name, base_var.name) continue if (rhs is True) and (base_var.e_code == 'g') and \ not isinstance(base_var, ExtVar): logger.warning("RHS of the internal Algeb var <%s.%s> is always zero.", base_var.owner.class_name, base_var.name) continue if rhs is False: indices = base_var.a array_code = base_var.v_code if self.dae.system.Output.n > 0: indices = self.dae.system.Output.to_output_addr(base_var, check=True) if len(indices) == 0: continue else: if isinstance(base_var, ExtVar): # external algebraic variables indices = base_var.r array_code = base_var.r_code else: # internal differential variables indices = base_var.a array_code = base_var.e_code indices = indices[a] if a is not None else indices out = np.hstack((out, self._access_array(array_code, indices))) return out
def _access_array(self, array_name, indices=None): """ Helper function to access an existing array in TimeSeries. The function checks for empty arrays and shows warnings. """ if np.count_nonzero(self.__dict__[array_name]) == 0: logger.error("TimeSeries matrix <%s> contains no element. Check if `[TDS] store_%s = 1`", array_name, array_name) return None if indices is None: return self.__dict__[array_name][:, :] else: return self.__dict__[array_name][:, indices] @property def df(self): """ Short-hand for the xy dataframe. """ return self.df_xy def __getattr__(self, attr): if attr in super().__getattribute__('_public'): df = True if attr.startswith("df") else False self.unpack(df=df, attr=attr) return super().__getattribute__(attr) def __getstate__(self): return self.__dict__
[docs] def reset(self): """ Reset the internal storage and erase all data. """ self._xs = OrderedDict() self._ys = OrderedDict() self._zs = OrderedDict() self._fs = OrderedDict() self._hs = OrderedDict() self._is = OrderedDict() self.unpack_np(attr=None, warn_empty=False) self.unpack_df(attr=None) self.idx_ptr = 0 logger.debug("TimeSeries storage is cleared.")
[docs]class DAE: r""" Class for storing numerical values of the DAE system, including variables, equations and first order derivatives (Jacobian matrices). Variable values and equation values are stored as :py:class:`numpy.ndarray`, while Jacobians are stored as :py:class:`kvxopt.spmatrix`. The defined arrays and descriptions are as follows: +-----------+---------------------------------------------+ | DAE Array | Description | +===========+=============================================+ | x | Array for state variable values | +-----------+---------------------------------------------+ | y | Array for algebraic variable values | +-----------+---------------------------------------------+ | z | Array for 0/1 limiter states (if enabled) | +-----------+---------------------------------------------+ | f | Array for differential equation derivatives | +-----------+---------------------------------------------+ | Tf | Left-hand side time constant array for f | +-----------+---------------------------------------------+ | g | Array for algebraic equation mismatches | +-----------+---------------------------------------------+ The defined scalar member attributes to store array sizes are +-----------+---------------------------------------------+ | Scalar | Description | +===========+=============================================+ | m | The number of algebraic variables/equations | +-----------+---------------------------------------------+ | n | The number of state variables/equations | +-----------+---------------------------------------------+ | o | The number of limiter state flags | +-----------+---------------------------------------------+ The derivatives of `f` and `g` with respect to `x` and `y` are stored in four :py:mod:`kvxopt.spmatrix` sparse matrices: **fx**, **fy**, **gx**, and **gy**, where the first letter is the equation name, and the second letter is the variable name. Notes ----- DAE in ANDES is defined in the form of .. math :: T \dot{x} = f(x, y) \\ 0 = g(x, y) DAE does not keep track of the association of variable and address. Only a variable instance keeps track of its addresses. """
[docs] def __init__(self, system): self.system = system self.t = np.array(-1.0, dtype=float) self.ts = DAETimeSeries(self) self.kcount = 0 # time step count self._array_and_counter = { 'f': 'n', # differential equation RHS 'x': 'n', # differential variables 'g': 'm', # algebraic equation residual 'y': 'm', # algebraic variables 'z': 'o', # limiter flags 'h': 'p', # RHS of external states 'i': 'q', # RHS of external algebraic variables } self.m, self.n, self.o, self.p, self.q = 0, 0, 0, 0, 0 self.x, self.y, self.z = np.array([]), np.array([]), np.array([]) self.f, self.g = np.array([]), np.array([]) # RHS of equations self.h, self.i = np.array([]), np.array([]) # RHS of external equations # `self.Tf` is the time-constant array for differential equations self.Tf = np.array([]) self.fx, self.fy = None, None self.gx, self.gy = None, None self.rx, self.tx = None, None self.x_name, self.x_tex_name = [], [] self.y_name, self.y_tex_name = [], [] self.z_name, self.z_tex_name = [], [] self.h_name, self.h_tex_name = [], [] self.i_name, self.i_tex_name = [], [] self.triplets = JacTriplet() self.tpl = dict() # sparsity templates with constants self._write_append = False # True if data should be appended when writing to output self._lst_written = False
[docs] def request_address(self, array_name: str, ndevice, nvar, collate=False): """ Interface for requesting addresses for a model. Parameters ---------- array_name : str array name in 'x' and 'y' ndevice : int number of devices nvar : int number of variables collate : bool, optional False if the same variable for different devices are contiguous. True if variables for the same devices should collate. Note: setting ``collate`` to True will degrade the performance. Returns ------- list A list of arrays for each variable. """ out = [] counter_name = self._array_and_counter[array_name] idx_begin = self.__dict__[counter_name] idx_end = idx_begin + ndevice * nvar if not collate: for idx in range(nvar): out.append(np.arange(idx_begin + idx * ndevice, idx_begin + (idx + 1) * ndevice)) else: for idx in range(nvar): out.append(np.arange(idx_begin + idx, idx_end, nvar)) self.__dict__[counter_name] = idx_end return out
[docs] def clear_ts(self): """ Drop the TimeSeries data and create a new one. """ self.ts = DAETimeSeries(self)
[docs] def clear_arrays(self): """ Reset equation and variable arrays to empty. """ self.clear_fg() self.clear_xy() self.clear_z()
[docs] def clear_fg(self): """ Resets equation arrays to empty. """ self.f[:] = 0 self.g[:] = 0
[docs] def clear_xy(self): """ Reset variable arrays to empty. """ self.x[:] = 0 self.y[:] = 0
[docs] def clear_z(self): """ Reset status arrays to empty """ self.z[:] = 0
[docs] def clear_ijv(self): """ Clear stored triplets. """ self.triplets.clear_ijv()
[docs] def restore_sparse(self, names=None): """ Restore all sparse matrices to the sparsity pattern filled with zeros (for variable Jacobian elements) and non-zero constants. Parameters ---------- names : None or list List of Jacobian names to restore sparsity pattern """ if names is None: names = jac_names elif isinstance(names, str): names = [names] for name in names: self.__dict__[name] = spmatrix(self.tpl[name].V, self.tpl[name].I, self.tpl[name].J, self.tpl[name].size, 'd')
[docs] def reset(self): """ Reset array sizes to zero and clear all arrays. """ self.set_t(0.0) self.m = 0 self.n = 0 self.o = 0 self.resize_arrays() self.clear_ijv() self.clear_ts()
[docs] def set_t(self, t): """ Helper function for setting time in-place. """ self.t.itemset(t)
[docs] def get_size(self, name): """ Get the size of an array or sparse matrix based on name. Parameters ---------- name : str (f, g, fx, gy, etc.) array/sparse name Returns ------- tuple sizes of each element in a tuple """ ret = [] for char in name: if char in ('f', 'x'): ret.append(self.n) elif char in ('g', 'y'): ret.append(self.m) elif char == 'z': ret.append(self.o) return tuple(ret)
[docs] def store_sparse_ijv(self, name, row, col, val): """ Store the sparse pattern triplets. This function is to be called by System after building the complete sparsity pattern for each Jacobian matrix. Parameters ---------- name : str sparse Jacobian matrix name row : np.ndarray all row indices col : np.ndarray all col indices val : np.ndarray all values """ self.triplets.ijac[name] = row self.triplets.jjac[name] = col self.triplets.vjac[name] = val
[docs] def build_pattern(self, name): """ Build sparse matrices with stored patterns. Call to `store_row_col_idx` should be made before this function. Parameters ---------- name : name jac name """ try: self.tpl[name] = spmatrix(self.triplets.vjac[name], self.triplets.ijac[name], self.triplets.jjac[name], self.get_size(name), 'd') except TypeError as e: logger.error("Your new model might have accessed an Algeb using ExtState, or vice versa.") raise e self.restore_sparse(name)
def _compare_pattern(self, name): """ Compare the sparsity pattern for the given Jacobian name. This function is for debugging the symbolic factorization error / sparsity pattern change. To use, add the following line in `System.j_update` for each `j_name` at the end: self.dae._compare_pattern(j_name) """ self.__dict__[f'{name}_tpl'] = spmatrix(self.triplets.vjac[name], self.triplets.ijac[name], self.triplets.jjac[name], self.get_size(name), 'd') m_before = self.__dict__[f'{name}_tpl'] m_after = self.__dict__[name] for i in range(len(m_after)): if m_after.I[i] != m_before.I[i] or m_after.J[i] != m_before.J[i]: raise KeyError
[docs] def store(self): """ Store values for the current time step to the TimeSeries storage. Values include variables, equation RHS and discrete states. """ system = self.system tds = self.system.TDS ts = self.ts t = self.t.tolist() if system.Output.n > 0: # select variables based on `Output` ts._xs[t] = self.x[system.Output.xidx] ts._ys[t] = self.y[system.Output.yidx] else: ts._xs[t] = np.array(self.x) ts._ys[t] = np.array(self.y) if tds.config.store_z: z_vals = system.get_z(system.exist.pflow_tds) ts._zs[t] = np.array(z_vals) if tds.config.store_f: ts._fs[t] = np.array(self.f) if tds.config.store_h: ts._hs[t] = np.array(self.h) if tds.config.store_i: ts._is[t] = np.array(self.i)
[docs] def resize_arrays(self): """ Resize arrays to the new sizes `m` and `n`, and `o`. If ``m > len(self.y)`` or ``n > len(self.x``, arrays will be extended. Otherwise, new empty arrays will be sliced, starting from 0 to the given size. Warnings -------- This function should not be called directly. Instead, it is called in ``System.set_address`` which re-points variables used in power flow to the new array for dynamic analyses. """ self.x = self._extend_or_slice(self.x, self.n) self.y = self._extend_or_slice(self.y, self.m) self.z = self._extend_or_slice(self.z, self.o) self.f = self._extend_or_slice(self.f, self.n) self.g = self._extend_or_slice(self.g, self.m) self.h = self._extend_or_slice(self.h, self.p) self.i = self._extend_or_slice(self.i, self.q) self.Tf = self._extend_or_slice(self.Tf, self.n, fill_func=np.ones)
def _extend_or_slice(self, array, new_size, fill_func=np.zeros): """ Helper function for ``self.resize_arrays`` to grow or shrink arrays. """ if new_size > len(array): array = np.append(array, fill_func(new_size - len(array))) else: array = array[0:new_size] return array
[docs] def alloc_or_extend_names(self): """ Allocate empty lists for names for the given size. """ specs = {'x_name': self.n, 'y_name': self.m, 'h_name': self.p, 'i_name': self.q, 'x_tex_name': self.n, 'y_tex_name': self.m, 'h_tex_name': self.p, 'i_tex_name': self.q, } for name, size in specs.items(): length = len(self.__dict__[name]) if length == 0: self.__dict__[name] = [''] * size elif 0 < length <= size: self.__dict__[name].extend([''] * (size - length)) else: raise NotImplementedError("Does not know how to shrink arrays")
@property def xy(self): """ Return a concatenated array of [x, y]. """ return np.hstack((self.x, self.y)) @property def xyz(self): """ Return a concatenated array of [x, y]. """ return np.hstack((self.x, self.y, self.z)) @property def fg(self): """ Return a concatenated array of [f, g]. """ return np.hstack((self.f, self.g)) @property def x_name_output(self): """ Return a list of state var names selected by Output. """ if self.system.Output.n == 0: return self.x_name else: return [self.x_name[i] for i in self.system.Output.xidx] @property def y_name_output(self): """ Return a list of algeb var names selected by Output. """ if self.system.Output.n == 0: return self.y_name else: return [self.y_name[i] for i in self.system.Output.yidx] @property def x_tex_name_output(self): """ Return a list of state var LaTeX names selected by Output. """ if self.system.Output.n == 0: return self.x_tex_name else: return [self.x_tex_name[i] for i in self.system.Output.xidx] @property def y_tex_name_output(self): """ Return a list of algeb var LaTeX names selected by Output. """ if self.system.Output.n == 0: return self.y_tex_name else: return [self.y_tex_name[i] for i in self.system.Output.yidx] @property def xy_name(self): """ Return a concatenated list of all variable names without format. """ return self.x_name + self.y_name @property def xyz_name(self): """ Return a concatenated list of all variable names without format. """ return self.x_name + self.y_name + self.z_name @property def xy_tex_name(self): """ Return a concatenated list of all variable names in LaTeX format. """ return self.x_tex_name + self.y_tex_name @property def xyz_tex_name(self): """ Return a concatenated list of all variable names in LaTeX format. """ return self.x_tex_name + self.y_tex_name + self.z_tex_name
[docs] def get_name(self, arr): """ Helper function for geting the list of variable names based on the array name. Parameters ---------- arr : str Array name in 'f', 'g', 'x', 'y', 'z'. """ mapping = {'f': 'x', 'g': 'y', 'x': 'x', 'y': 'y', 'z': 'z'} return self.__dict__[mapping[arr] + '_name']
[docs] def print_array(self, name, values=None, tol=None): """ Debug helper to print array values and names. Parameters ---------- name : str array name in 'f', 'g', 'x', 'y' values : array-like, optional substitute array values to use tol : float, optional tolerance value to use. Values below `tol` will not be displayed """ if values is None: values = self.__dict__[name] indices = list(range(len(values))) if tol is not None: indices = np.where(abs(values) >= tol) values = values[indices] name_list = np.array(self.get_name(name))[indices] if not len(name_list): return logger.info(f"Debug Print at {self.t:.4f}") res = "\n".join("{:15s} {:<10.4g}".format(x, y) for x, y in zip(name_list, values)) logger.info(res)
[docs] def write_lst(self, lst_path): """ Dump the variable name lst file. Parameters ---------- lst_path Path to the lst file. Returns ------- bool succeed flag """ if self._lst_written is True: return system = self.system out = '' template = '{:>6g}, {:>25s}, {:>35s}\n' # header line out += template.format(0, 'Time [s]', 'Time [s]') if system.Output.n == 0: # output variable indices idx = list(range(self.m + self.n + self.o)) # variable names concatenated uname = self.xyz_name fname = self.xyz_tex_name else: idx = list(range(len(system.Output.xidx) + len(system.Output.yidx) + self.o)) uname = self.x_name_output + self.y_name_output + self.z_name fname = self.x_tex_name_output + self.y_tex_name_output + self.z_tex_name for e, i in enumerate(idx): # `idx` in the lst file is always consecutive out += template.format(e + 1, uname[i], fname[i]) with open(lst_path, 'w') as f: f.write(out) self._lst_written = True return True
[docs] def write_npy(self, file_path): """ Write TDS data into NumPy uncompressed format. """ txyz_data = self.ts.txyz np.save(file_path, txyz_data)
[docs] def write_npz(self, file_path): """ Write TDS data into NumPy compressed format. The function supports writing out all values at once or writing them out incrementally. """ tds = self.system.TDS ts = self.ts if not tds.config.limit_store: # write the whole TimeSeries in one step txyz_data = self.ts.txyz np.savez_compressed(file_path, data=txyz_data) else: # create a new npz file and write for the first time if self._write_append is False: txyz_data = self.ts.txyz[ts.idx_ptr:, :] np.savez_compressed(file_path, data=txyz_data) self._write_append = True ts.idx_ptr = len(self.ts.t) # write and append to an existing npz file else: self.ts.unpack() txyz_data = self.ts.txyz[ts.idx_ptr:, :] # skip if no new data if len(txyz_data) == 0: logger.debug("No new data to write to file. Skipped.") return data = np.load(file_path)['data'] # in most cases, append new data to the existing if len(data) > 0: data = np.vstack((data, txyz_data)) logger.debug("Appended new data to output file.") # in case the previous step stopped at tf=0 else: data = txyz_data np.savez_compressed(file_path, data=data) ts.idx_ptr = len(self.ts.t)