import logging
from collections import OrderedDict
from typing import List, Union
import numpy as np
from andes.core import JacTriplet
from andes.core.var import BaseVar
from andes.shared import jac_names, pd, spmatrix
logger = logging.getLogger(__name__)
# TODO: Separate array data and Triplets
class DAETimeSeries:
"""
DAE time series data.
"""
def __init__(self, dae=None):
self.dae = dae
# accessible attributes
self._public = ['t', 'x', 'y', 'z', 'xy', 'txyz',
'df_x', 'df_y', 'df_z', 'df_xy', 'df_xyz']
# internal dict storage
self._xs = OrderedDict()
self._ys = OrderedDict()
self._zs = OrderedDict()
self._fs = OrderedDict()
self._hs = OrderedDict()
self._is = OrderedDict()
def unpack_np(self):
"""
Unpack dict data into numpy arrays.
"""
n_steps = len(self._ys)
self.t = np.array(list(self._ys.keys()))
def _dict2array(src, dest):
nx = len(self.__dict__[src][0]) if len(self.__dict__[src]) else 0
self.__dict__[dest] = np.zeros((n_steps, nx))
if len(self.__dict__[src]) > 0:
for ii, val in enumerate(self.__dict__[src].values()):
self.__dict__[dest][ii, :] = val
pairs = (('_xs', 'x'), ('_ys', 'y'), ('_zs', 'z'),
('_fs', 'f'), ('_hs', 'h'), ('_is', 'i'))
for a, b in pairs:
_dict2array(a, b)
self.xy = np.hstack((self.x, self.y))
self.txy = np.hstack((self.t.reshape((-1, 1)), self.xy))
self.txyz = np.hstack((self.t.reshape((-1, 1)), self.xy, self.z))
if n_steps == 0:
logger.warning("TimeSeries does not contain any time stamp.")
return False
return True
def unpack(self, df=False):
"""
Unpack dict-stored data into arrays and/or dataframes.
Parameters
----------
df : bool
True to construct DataFrames `self.df` and `self.df_z` (time-consuming).
Returns
-------
True when done.
"""
self.unpack_np()
if df is True:
self.unpack_df()
return True
def unpack_df(self):
"""
Construct pandas dataframes.
"""
self.df_x = pd.DataFrame.from_dict(self._xs, orient='index',
columns=self.dae.x_name)
self.df_y = pd.DataFrame.from_dict(self._ys, orient='index',
columns=self.dae.y_name)
self.df_z = pd.DataFrame.from_dict(self._zs, orient='index',
columns=self.dae.z_name)
self.df_xy = pd.concat((self.df_x, self.df_y), axis=1)
self.df_xyz = pd.concat((self.df_xy, self.df_z), axis=1)
return True
def get_data(self, base_vars: Union[BaseVar, List[BaseVar]], a=None):
"""
Get time-series data for a variable instance.
Values for different variables will be stacked horizontally.
Parameters
----------
base_var : BaseVar or a sequence of BaseVar(s)
The variable types and internal addresses
are used for looking up the data.
a : an array/list of int or None
Sub-indices into the address of `base_var`.
Applied to each variable.
"""
out = np.zeros((len(self.t), 0))
if isinstance(base_vars, BaseVar):
base_vars = (base_vars, )
for base_var in base_vars:
if base_var.n == 0:
logger.info("Variable <%s.%s> does not contain any element.",
base_var.owner.class_name, base_var.name)
continue
indices = base_var.a
if a is not None:
indices = indices[a]
out = np.hstack((out, self.__dict__[base_var.v_code][:, indices]))
return out
@property
def df(self):
return self.df_xy
def __getattr__(self, attr):
if attr in super().__getattribute__('_public'):
df = True if attr.startswith("df") else False
self.unpack(df=df)
return super().__getattribute__(attr)
def __getstate__(self):
return self.__dict__
[docs]class DAE:
r"""
Class for storing numerical values of the DAE system, including variables, equations and first order
derivatives (Jacobian matrices).
Variable values and equation values are stored as :py:class:`numpy.ndarray`, while Jacobians are stored as
:py:class:`kvxopt.spmatrix`. The defined arrays and descriptions are as follows:
+-----------+---------------------------------------------+
| DAE Array | Description |
+===========+=============================================+
| x | Array for state variable values |
+-----------+---------------------------------------------+
| y | Array for algebraic variable values |
+-----------+---------------------------------------------+
| z | Array for 0/1 limiter states (if enabled) |
+-----------+---------------------------------------------+
| f | Array for differential equation derivatives |
+-----------+---------------------------------------------+
| Tf | Left-hand side time constant array for f |
+-----------+---------------------------------------------+
| g | Array for algebraic equation mismatches |
+-----------+---------------------------------------------+
The defined scalar member attributes to store array sizes are
+-----------+---------------------------------------------+
| Scalar | Description |
+===========+=============================================+
| m | The number of algebraic variables/equations |
+-----------+---------------------------------------------+
| n | The number of algebraic variables/equations |
+-----------+---------------------------------------------+
| o | The number of limiter state flags |
+-----------+---------------------------------------------+
The derivatives of `f` and `g` with respect to `x` and `y` are stored in four :py:mod:`kvxopt.spmatrix`
sparse matrices:
**fx**, **fy**, **gx**, and **gy**,
where the first letter is the equation name, and the second letter is the variable name.
Notes
-----
DAE in ANDES is defined in the form of
.. math ::
T \dot{x} = f(x, y) \\
0 = g(x, y)
DAE does not keep track of the association of variable and address.
Only a variable instance keeps track of its addresses.
"""
def __init__(self, system):
self.system = system
self.t = np.array(0, dtype=float)
self.ts = DAETimeSeries(self)
self.m, self.n, self.o = 0, 0, 0
self.p, self.q = 0, 0
self.x, self.y, self.z = np.array([]), np.array([]), np.array([])
self.f, self.g = np.array([]), np.array([]) # RHS of equations
self.h, self.i = np.array([]), np.array([]) # RHS of external equations
# `self.Tf` is the time-constant array for differential equations
self.Tf = np.array([])
self.fx, self.fy = None, None
self.gx, self.gy = None, None
self.rx, self.tx = None, None
self.h_name, self.h_tex_name = [], []
self.i_name, self.i_tex_name = [], []
self.x_name, self.x_tex_name = [], []
self.y_name, self.y_tex_name = [], []
self.z_name, self.z_tex_name = [], []
self.triplets = JacTriplet()
self.tpl = dict() # sparsity templates with constants
def clear_ts(self):
self.ts = DAETimeSeries(self)
def clear_arrays(self):
"""
Reset equation and variable arrays to empty.
"""
self.clear_fg()
self.clear_xy()
self.clear_z()
def clear_fg(self):
"""Resets equation arrays to empty
"""
self.f[:] = 0
self.g[:] = 0
def clear_xy(self):
"""
Reset variable arrays to empty.
"""
self.x[:] = 0
self.y[:] = 0
def clear_z(self):
"""
Reset status arrays to empty
"""
self.z[:] = 0
def clear_ijv(self):
"""
Clear stored triplets.
"""
self.triplets.clear_ijv()
def restore_sparse(self, names=None):
"""
Restore all sparse matrices to the sparsity pattern
filled with zeros (for variable Jacobian elements)
and non-zero constants.
Parameters
----------
names : None or list
List of Jacobian names to restore sparsity pattern
"""
if names is None:
names = jac_names
elif isinstance(names, str):
names = [names]
for name in names:
self.__dict__[name] = spmatrix(self.tpl[name].V,
self.tpl[name].I,
self.tpl[name].J,
self.tpl[name].size, 'd')
def reset(self):
"""
Reset array sizes to zero and clear all arrays.
"""
self.set_t(0.0)
self.m = 0
self.n = 0
self.o = 0
self.resize_arrays()
self.clear_ijv()
self.clear_ts()
def set_t(self, t):
"""Helper function for setting time in-place"""
self.t.itemset(t)
def get_size(self, name):
"""
Get the size of an array or sparse matrix based on name.
Parameters
----------
name : str (f, g, fx, gy, etc.)
array/sparse name
Returns
-------
tuple
sizes of each element in a tuple
"""
ret = []
for char in name:
if char in ('f', 'x', 't'):
ret.append(self.n)
elif char in ('g', 'y', 'r'):
ret.append(self.m)
elif char == 'z':
ret.append(self.o)
return tuple(ret)
def store_sparse_ijv(self, name, row, col, val):
"""
Store the sparse pattern triplets.
This function is to be called by System after building the complete sparsity pattern for each Jacobian
matrix.
Parameters
----------
name : str
sparse Jacobian matrix name
row : np.ndarray
all row indices
col : np.ndarray
all col indices
val : np.ndarray
all values
"""
self.triplets.ijac[name] = row
self.triplets.jjac[name] = col
self.triplets.vjac[name] = val
def build_pattern(self, name):
"""
Build sparse matrices with stored patterns.
Call to `store_row_col_idx` should be made before this function.
Parameters
----------
name : name
jac name
"""
try:
self.tpl[name] = spmatrix(self.triplets.vjac[name],
self.triplets.ijac[name],
self.triplets.jjac[name],
self.get_size(name), 'd')
except TypeError as e:
logger.error("Your new model might have accessed an Algeb using ExtState, or vice versa.")
raise e
self.restore_sparse(name)
def _compare_pattern(self, name):
"""
Compare the sparsity pattern for the given Jacobian name.
This function is for debugging the symbolic factorization error / sparsity pattern change.
To use, add the following line in `System.j_update` for each `j_name` at the end:
self.dae._compare_pattern(j_name)
"""
self.__dict__[f'{name}_tpl'] = spmatrix(self.triplets.vjac[name],
self.triplets.ijac[name],
self.triplets.jjac[name],
self.get_size(name), 'd')
m_before = self.__dict__[f'{name}_tpl']
m_after = self.__dict__[name]
for i in range(len(m_after)):
if m_after.I[i] != m_before.I[i] or m_after.J[i] != m_before.J[i]:
raise KeyError
def store(self):
"""
Store values and equations to in internal TimeSeries storage.
"""
ts = self.ts
t = self.t.tolist()
tds = self.system.TDS
z_vals = self.system.get_z(self.system.exist.pflow_tds) if tds.config.store_z else None
f_vals = self.f if tds.config.store_f else None
h_vals = self.h if tds.config.store_h else None
i_vals = self.i if tds.config.store_i else None
ts._xs[t] = np.array(self.x)
ts._ys[t] = np.array(self.y)
if z_vals is not None:
ts._zs[t] = np.array(z_vals)
if f_vals is not None:
ts._fs[t] = np.array(f_vals)
if h_vals is not None:
ts._hs[t] = np.array(h_vals)
if i_vals is not None:
ts._is[t] = np.array(i_vals)
def resize_arrays(self):
"""
Resize arrays to the new sizes `m` and `n`, and `o`.
If ``m > len(self.y)`` or ``n > len(self.x``, arrays will be extended.
Otherwise, new empty arrays will be sliced, starting from 0 to the given size.
Warnings
--------
This function should not be called directly. Instead, it is called in
``System.set_address`` which re-points variables used in power flow
to the new array for dynamic analyses.
"""
self.x = self._extend_or_slice(self.x, self.n)
self.y = self._extend_or_slice(self.y, self.m)
self.z = self._extend_or_slice(self.z, self.o)
self.f = self._extend_or_slice(self.f, self.n)
self.g = self._extend_or_slice(self.g, self.m)
self.h = self._extend_or_slice(self.h, self.p)
self.i = self._extend_or_slice(self.i, self.q)
self.Tf = self._extend_or_slice(self.Tf, self.n, fill_func=np.ones)
def _extend_or_slice(self, array, new_size, fill_func=np.zeros):
"""
Helper function for ``self.resize_arrays`` to grow or shrink arrays.
"""
if new_size > len(array):
array = np.append(array, fill_func(new_size - len(array)))
else:
array = array[0:new_size]
return array
def alloc_or_extend_names(self):
"""
Allocate empty lists for names for the given size.
"""
specs = {'x_name': self.n,
'y_name': self.m,
'h_name': self.p,
'i_name': self.q,
'x_tex_name': self.n,
'y_tex_name': self.m,
'h_tex_name': self.p,
'i_tex_name': self.q,
}
for name, size in specs.items():
length = len(self.__dict__[name])
if length == 0:
self.__dict__[name] = [''] * size
elif 0 < length <= size:
self.__dict__[name].extend([''] * (size - length))
else:
raise NotImplementedError("Does not know how to shrink arrays")
@property
def xy(self):
"""Return a concatenated array of [x, y]."""
return np.hstack((self.x, self.y))
@property
def xyz(self):
"""Return a concatenated array of [x, y]."""
return np.hstack((self.x, self.y, self.z))
@property
def fg(self):
"""Return a concatenated array of [f, g]."""
return np.hstack((self.f, self.g))
@property
def xy_name(self):
"""Return a concatenated list of all variable names without format."""
return self.x_name + self.y_name
@property
def xyz_name(self):
"""Return a concatenated list of all variable names without format."""
return self.x_name + self.y_name + self.z_name
@property
def xy_tex_name(self):
"""Return a concatenated list of all variable names in LaTeX format."""
return self.x_tex_name + self.y_tex_name
@property
def xyz_tex_name(self):
"""Return a concatenated list of all variable names in LaTeX format."""
return self.x_tex_name + self.y_tex_name + self.z_tex_name
def get_name(self, arr):
mapping = {'f': 'x', 'g': 'y', 'x': 'x', 'y': 'y', 'z': 'z'}
return self.__dict__[mapping[arr] + '_name']
def print_array(self, name, values=None, tol=None):
if values is None:
values = self.__dict__[name]
indices = list(range(len(values)))
if tol is not None:
indices = np.where(abs(values) >= tol)
values = values[indices]
name_list = np.array(self.get_name(name))[indices]
if not len(name_list):
return
logger.info(f"Debug Print at {self.t:.4f}")
res = "\n".join("{:15s} {:<10.4g}".format(x, y) for x, y in zip(name_list, values))
logger.info(res)
def write_lst(self, lst_path):
"""
Dump the variable name lst file.
Parameters
----------
lst_path
Path to the lst file.
Returns
-------
bool
succeed flag
"""
out = ''
template = '{:>6g}, {:>25s}, {:>35s}\n'
# header line
out += template.format(0, 'Time [s]', 'Time [s]')
# output variable indices
idx = list(range(self.m + self.n + self.o))
# variable names concatenated
uname = self.xyz_name
fname = self.xyz_tex_name
for e, i in enumerate(idx):
# `idx` in the lst file is always consecutive
out += template.format(e + 1, uname[i], fname[i])
with open(lst_path, 'w') as f:
f.write(out)
return True
def write_npy(self, file_path):
"""
Write TDS data into NumPy uncompressed format.
"""
txyz_data = self.ts.txyz
np.save(file_path, txyz_data)
def write_npz(self, file_path):
"""
Write TDS data into NumPy compressed format.
"""
txyz_data = self.ts.txyz
np.savez_compressed(file_path, data=txyz_data)