# [ANDES] (C)2015-2022 Hantao Cui
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# File name: discrete.py
import logging
from typing import List, Tuple, Union
import numpy as np
from andes.core.common import dummify
from andes.utils.func import interp_n2
from andes.utils.tab import Tab
logger = logging.getLogger(__name__)
[docs]class Discrete:
"""
Base discrete class.
Discrete classes export flag arrays (usually boolean) .
"""
[docs] def __init__(self, name=None, tex_name=None, info=None, no_warn=False,
min_iter=2, err_tol=1e-2,
):
self.name = name
self.tex_name = tex_name
self.info = info
self.owner = None
if not hasattr(self, 'export_flags'):
self.export_flags = []
if not hasattr(self, 'export_flags_tex'):
self.export_flags_tex = []
self.input_list = [] # references to input variables
self.param_list = [] # references to parameters
self.x_set = list()
self.y_set = list() # NOT being used
self.warn_flags = [] # warn if flags in `warn_flags` not initialized to zero
self.no_warn = no_warn
# default minimum iteration number and error tolerance to allow checking
# To enable `min_iter` and `err_tol`, a `Discrete` subclass needs to call
# `check_iter_err()` manually in `check_var()` and/or `check_eq()`.
self.min_iter = min_iter
self.err_tol = err_tol
self.has_check_var = False # if subclass implements `check_var()`
self.has_check_eq = False # if subclass implements `check_eq()`
[docs] def check_var(self, *args, **kwargs):
"""
This function is called in ``l_update_var`` before evaluating equations.
It should update internal flags only.
Parameters
----------
adjust_upper : bool
True to adjust the upper limit to the value of the variable.
Supported by limiters.
adjust_lower : bool
True to adjust the lower limit to the value of the variable.
Supported by limiters.
"""
pass
[docs] def check_eq(self, **kwargs):
"""
This function is called in ``l_check_eq`` after updating equations.
It updates internal flags, set differential equations, and record pegged variables.
"""
pass
[docs] def get_names(self):
"""
Available symbols from this class
Returns
-------
"""
return [f'{self.name}_{flag}' for flag in self.export_flags]
[docs] def get_tex_names(self):
"""
Return tex_names of exported flags.
TODO: Fix the bug described in the warning below.
Warnings
--------
If underscore `_` appears in both flag tex_name and `self.tex_name` (for example, when this discrete is
within a block), the exported tex_name will become invalid for SymPy.
Variable name substitution will fail.
Returns
-------
list
A list of tex_names for all exported flags.
"""
return [rf'{flag_tex}^{self.tex_name}' for flag_tex in self.export_flags_tex]
[docs] def get_values(self):
return [self.__dict__[flag] for flag in self.export_flags]
@property
def class_name(self):
return self.__class__.__name__
[docs] def list2array(self, n):
"""
Allocate memory for the discrete flags specified in `self.export_flags`.
Parameters
----------
n : int
Number of elements in the array. Provided by the calling function.
"""
for flag in self.export_flags:
self.__dict__[flag] = self.__dict__[flag] * np.ones(n, dtype=float)
[docs] def warn_init_limit(self):
"""
Warn if associated variables are initialized at limits.
"""
if self.no_warn:
return
for f, limit in self.warn_flags:
if f not in self.export_flags:
logger.error('warn_flags contain unknown flag %s', f)
continue
mask = np.ones(self.owner.n, dtype=bool)
if limit == 'upper':
mask = self.mask_upper
elif limit == 'lower':
mask = self.mask_lower
else:
logger.debug('Unknown limit name <%s>', limit)
# process online devices only
flag_vals = np.logical_and(self.__dict__[f], self.owner.u.v)
# ignore limits that has been adjusted
flag_vals = np.logical_and(flag_vals, np.logical_not(mask))
pos = np.argwhere(np.not_equal(flag_vals, 0)).ravel()
if len(pos) == 0:
continue
# convert limie values to arrays
if isinstance(self.__dict__[limit].v, np.ndarray):
lim_value = self.__dict__[limit].v
else:
lim_value = self.__dict__[limit].v * np.ones(self.owner.n)
at_limit_pos = list()
out_limit_pos = list()
for item in pos:
if np.isclose(lim_value[item], self.u.v[item]):
at_limit_pos.append(item)
else:
out_limit_pos.append(item)
if len(out_limit_pos) > 0:
# warn out of limits
err_msg = f'{self.owner.class_name}.{self.name} out of limits <{self.__dict__[limit].name}>'
err_data = {'idx': [self.owner.idx.v[i] for i in out_limit_pos],
'Flag': [f] * len(out_limit_pos),
'Input Value': self.u.v[out_limit_pos],
'Limit': lim_value[out_limit_pos]
}
tab = Tab(title=err_msg,
header=err_data.keys(),
data=list(map(list, zip(*err_data.values()))))
logger.warning(tab.draw())
if len(at_limit_pos) > 0:
# warn at limits
err_msg = f'{self.owner.class_name}.{self.name} at limits <{self.__dict__[limit].name}>'
err_data = {'idx': [self.owner.idx.v[i] for i in at_limit_pos],
'Flag': [f] * len(at_limit_pos),
'Input Value': self.u.v[at_limit_pos],
'Limit': lim_value[at_limit_pos]
}
tab = Tab(title=err_msg,
header=err_data.keys(),
data=list(map(list, zip(*err_data.values()))))
logger.debug(tab.draw())
[docs] def check_iter_err(self, niter=None, err=None):
"""
Check if the minimum iteration or maximum error is reached
so that this discrete block should be enabled.
Only when both `niter` and `err` are given, (niter < min_iter)
, and (err > err_tol) it will return False.
This logic will start checking the discrete states if called
from an external solver that does not feed `niter` or `err`
at each step.
Returns
-------
bool
True if it should be enabled, False otherwise
"""
if (niter is not None) and (niter < self.min_iter) and \
(err is not None) and (err > self.err_tol):
return False
return True
def __repr__(self):
return f'{self.__class__.__name__}: {self.owner.__class__.__name__}.{self.name}'
[docs]class LessThan(Discrete):
"""
Less than (<) comparison function that tests if ``u < bound``.
Exports two flags: z1 and z0.
For elements satisfying the less-than condition, the corresponding z1 = 1.
z0 is the element-wise negation of z1.
Notes
-----
The default z0 and z1, if not enabled, can be set through the constructor.
By default, the model will not adjust the limit.
"""
def __init__(self, u, bound, equal=False, enable=True, name=None, tex_name=None,
info: str = None, cache: bool = False,
z0=0, z1=1):
super().__init__(name=name, tex_name=tex_name, info=info)
self.u = u
self.bound = dummify(bound)
self.equal: bool = equal
self.enable: bool = enable
self.cache: bool = cache
self._eval: bool = False # if has been eval'ed and cached
self.z0 = np.array([z0]) # negation of `self.z1`
self.z1 = np.array([z1]) # if the less-than condition (u < bound) is True
self.export_flags = ['z0', 'z1']
self.export_flags_tex = ['z_0', 'z_1']
self.input_list.extend([self.u])
self.param_list.extend([self.bound])
self.has_check_var = True
def check_var(self, *args, **kwargs):
"""
If enabled, set flags based on inputs. Use cached values if enabled.
"""
if not self.enable:
return
if self.cache and self._eval:
return
if not self.equal:
self.z1[:] = np.less(self.u.v, self.bound.v)
else:
self.z1[:] = np.less_equal(self.u.v, self.bound.v)
self.z0[:] = np.logical_not(self.z1)
self._eval = True
class IsEqual(Discrete):
"""
Is equal (==) comparison function to test if ``u == bound``.
Exports one flag: z1.
For elements satisfying the equality condition, the corresponding z1 = 1.
Notes
-----
The default z1 when not enabled can be set through the constructor.
By default, the model will not adjust the limit.
"""
def __init__(self, u, bound, enable=True, name=None, tex_name=None,
info: str = None, cache: bool = False, z1=1):
super().__init__(name=name, tex_name=tex_name, info=info)
self.u = u
self.bound = dummify(bound)
self.enable: bool = enable
self.cache: bool = cache
self._eval: bool = False
self.z1 = np.array([z1])
self.export_flags = ['z1']
self.export_flags_tex = ['z_1']
self.input_list.extend([self.u])
self.param_list.extend([self.bound])
self.has_check_var = True
def check_var(self, *args, **kwargs):
"""
If enabled, set flags based on inputs. Use cached values if enabled.
"""
if not self.enable:
return
if self.cache and self._eval:
return
self.z1[:] = np.equal(self.u.v, self.bound.v)
self._eval = True
[docs]class Limiter(Discrete):
"""
Base limiter class.
This class compares values and sets limit values. Exported flags are `zi`, `zl` and `zu`.
Notes
-----
If not enabled, the default flags are ``zu = zl = 0``, ``zi = 1``.
Parameters
----------
u : BaseVar
Input Variable instance
lower : BaseParam
Parameter instance for the lower limit
upper : BaseParam
Parameter instance for the upper limit
no_lower : bool
True to only use the upper limit
no_upper : bool
True to only use the lower limit
sign_lower: 1 or -1
Sign to be multiplied to the lower limit
sign_upper: bool
Sign to be multiplied to the upper limit
equal : bool
True to include equal signs in comparison (>= or <=).
no_warn : bool
Disable initial limit warnings
zu : 0 or 1
Default value for `zu` if not enabled
zl : 0 or 1
Default value for `zl` if not enabled
zi : 0 or 1
Default value for `zi` if not enabled
Attributes
----------
zl : array-like
Flags of elements violating the lower limit;
A array of zeros and/or ones.
zi : array-like
Flags for within the limits
zu : array-like
Flags for violating the upper limit
"""
def __init__(self, u, lower, upper, enable=True,
name: str = None, tex_name: str = None, info: str = None,
min_iter: int = 2, err_tol: float = 0.01,
allow_adjust: bool = True,
no_lower=False, no_upper=False, sign_lower=1, sign_upper=1,
equal=True, no_warn=False,
zu=0.0, zl=0.0, zi=1.0):
Discrete.__init__(self, name=name, tex_name=tex_name, info=info,
min_iter=min_iter, err_tol=err_tol)
self.u = u
self.lower = dummify(lower)
self.upper = dummify(upper)
self.enable = enable
self.no_lower = no_lower
self.no_upper = no_upper
self.allow_adjust = allow_adjust
if sign_lower not in (1, -1):
raise ValueError("sign_lower must be 1 or -1, got %s" % sign_lower)
if sign_upper not in (1, -1):
raise ValueError("sign_upper must be 1 or -1, got %s" % sign_upper)
self.sign_lower = dummify(sign_lower)
self.sign_upper = dummify(sign_upper)
self.equal = equal
self.no_warn = no_warn
self.zu = np.array([zu])
self.zl = np.array([zl])
self.zi = np.array([zi])
self.mask_upper = None
self.mask_lower = None
self.has_check_var = True
self.export_flags.append('zi')
self.export_flags_tex.append('z_i')
self.input_list.extend([self.u])
self.param_list.extend([self.lower, self.upper])
if not self.no_lower:
self.export_flags.append('zl')
self.export_flags_tex.append('z_l')
self.warn_flags.append(('zl', 'lower'))
if not self.no_upper:
self.export_flags.append('zu')
self.export_flags_tex.append('z_u')
self.warn_flags.append(('zu', 'upper'))
def check_var(self,
allow_adjust=True, # allow flag from model
adjust_lower=False,
adjust_upper=False,
is_init: bool = False,
*args, **kwargs):
"""
Check the input variable and set flags.
"""
if not self.enable:
return
if not self.no_upper:
upper_v = -self.upper.v if self.sign_upper.v == -1 else self.upper.v
# FIXME: adjust will not be successful when sign is -1
if self.allow_adjust and is_init:
self.do_adjust_upper(self.u.v, upper_v, allow_adjust, adjust_upper)
if self.equal:
self.zu[:] = np.greater_equal(self.u.v, upper_v)
else:
self.zu[:] = np.greater(self.u.v, upper_v)
if not self.no_lower:
lower_v = -self.lower.v if self.sign_lower.v == -1 else self.lower.v
# FIXME: adjust will not be successful when sign is -1
if self.allow_adjust and is_init:
self.do_adjust_lower(self.u.v, lower_v, allow_adjust, adjust_lower)
if self.equal:
self.zl[:] = np.less_equal(self.u.v, lower_v)
else:
self.zl[:] = np.less(self.u.v, lower_v)
self.zi[:] = np.logical_not(np.logical_or(self.zu, self.zl))
def do_adjust_lower(self, val, lower, allow_adjust=True, adjust_lower=False):
"""
Adjust the lower limit.
Notes
-----
This method is only executed if `allow_adjust` is True
and `adjust_lower` is True.
"""
if allow_adjust:
mask = (val < lower)
if sum(mask) == 0:
return
if adjust_lower:
self._show_adjust(val, lower, mask, self.lower.name, adjusted=True)
lower[mask] = val[mask]
self.mask_lower = mask # store after adjusting
else:
self._show_adjust(val, lower, mask, self.lower.name, adjusted=False)
def _show_adjust(self, val, old_limit, mask, limit_name, adjusted=True):
"""
Helper function to show a table of the adjusted limits.
"""
idxes = np.array(self.owner.idx.v)[mask]
if isinstance(val, (int, float)):
val = np.array([val])
if isinstance(old_limit, (int, float)):
old_limit = np.array([old_limit])
adjust_or_not = 'adjusted' if adjusted else '*not adjusted*'
tab = Tab(title=f"{self.owner.class_name}.{self.name}: {adjust_or_not} limit <{limit_name}>",
header=['Idx', 'Input', 'Old Limit'],
data=[*zip(idxes, val[mask], old_limit[mask])],
)
if adjusted:
logger.info(tab.draw())
else:
logger.warning(tab.draw())
def do_adjust_upper(self, val, upper, allow_adjust=True, adjust_upper=False):
"""
Adjust the upper limit.
Notes
-----
This method is only executed if `allow_adjust` is True
and `adjust_upper` is True.
"""
if allow_adjust:
mask = (val > upper)
if sum(mask) == 0:
return
if adjust_upper:
self._show_adjust(val, upper, mask, self.upper.name, adjusted=True)
upper[mask] = val[mask]
self.mask_upper = mask
else:
self._show_adjust(val, upper, mask, self.lower.name, adjusted=False)
[docs]class SortedLimiter(Limiter):
"""
A limiter that sorts inputs based on the absolute or
relative amount of limit violations.
Parameters
----------
n_select : int
the number of violations to be flagged,
for each of over-limit and under-limit cases.
If `n_select` == 1, at most one over-limit
and one under-limit inputs will be flagged.
If `n_select` is zero, heuristics will be used.
abs_violation : bool
True to use the absolute violation.
False if the relative violation
abs(violation/limit) is used for sorting.
Since most variables are in per unit,
absolute violation is recommended.
"""
def __init__(self, u, lower, upper, n_select: int = 5,
name=None, tex_name=None, enable=True, abs_violation=True,
min_iter: int = 2, err_tol: float = 0.01,
allow_adjust: bool = True,
zu=0.0, zl=0.0, zi=1.0, ql=0.0, qu=0.0,
):
super().__init__(u, lower, upper,
enable=enable, name=name, tex_name=tex_name,
min_iter=min_iter, err_tol=err_tol,
allow_adjust=allow_adjust,
zu=zu, zl=zl, zi=zi,
)
self.n_select = int(n_select)
self.auto = True if self.n_select == 0 else False
self.abs_violation = abs_violation
self.ql = np.array([ql])
self.qu = np.array([qu])
# count of ones in `ql` and `qu`
self.nql = 0
self.nqu = 0
# smallest and largest `n_select`
self.min_sel = 2
self.max_sel = 50
# store the lower and upper limit values with zeros converted to a small number
self.lower_denom = None
self.upper_denom = None
self.export_flags.extend(['ql', 'qu'])
self.export_flags_tex.extend(['q_l', 'q_u'])
def list2array(self, n):
"""
Initialize maximum and minimum `n_select` based on input size.
"""
super().list2array(n)
if self.auto:
self.min_sel = max(2, int(n / 10))
self.max_sel = max(2, int(n / 2))
def check_var(self, *args, niter=None, err=None, **kwargs):
"""
Check for the largest and smallest `n_select` elements.
"""
if not self.enable:
return
if not self.check_iter_err(niter=niter, err=err):
return
super().check_var()
# first run - calculate the denominators if using relative violation
if not self.abs_violation:
if self.lower_denom is None:
self.lower_denom = np.array(self.lower.v)
self.lower_denom[self.lower_denom == 0] = 1e-6
if self.upper_denom is None:
self.upper_denom = np.array(self.upper.v)
self.upper_denom[self.upper_denom == 0] = 1e-6
# calculate violations - abs or relative
if self.abs_violation:
lower_vio = self.u.v - self.lower.v
upper_vio = self.upper.v - self.u.v
else:
lower_vio = np.abs((self.u.v - self.lower.v) / self.lower_denom)
upper_vio = np.abs((self.upper.v - self.u.v) / self.upper_denom)
# count the number of inputs flagged
if self.auto:
self.calc_select()
# sort in both ascending and descending orders
asc = np.argsort(lower_vio)
desc = np.argsort(upper_vio)
top_n = asc[:self.n_select]
bottom_n = desc[:self.n_select]
# `reset_out` is used to flag the
reset_out = np.zeros_like(self.u.v)
reset_out[top_n] = 1
reset_out[bottom_n] = 1
# set new flags
self.zl[:] = np.logical_or(np.logical_and(reset_out, self.zl),
self.ql)
self.zu[:] = np.logical_or(np.logical_and(reset_out, self.zu),
self.qu)
self.zi[:] = 1 - np.logical_or(self.zl, self.zu)
self.ql[:] = self.zl
self.qu[:] = self.zu
# compute the number of updated flags
ql1 = np.count_nonzero(self.ql)
qu1 = np.count_nonzero(self.qu)
dqu = qu1 - self.nqu
dql = ql1 - self.nql
if dqu > 0 or dql > 0:
logger.debug("SortedLimiter: flagged %s upper and %s lower limit violations",
dqu, dql)
self.nqu = qu1
self.nql = ql1
def calc_select(self):
"""
Set `n_select` automatically.
"""
ret = int((np.count_nonzero(self.zl) + np.count_nonzero(self.zu)) / 2) + 1
if ret > self.max_sel:
ret = self.max_sel
elif ret < self.min_sel:
ret = self.min_sel
self.n_select = ret
[docs]class HardLimiter(Limiter):
"""
Hard limiter for algebraic or differential variable. This class is an alias of `Limiter`.
"""
pass
[docs]class AntiWindup(Limiter):
"""
Anti-windup limiter.
Anti-windup limiter prevents the wind-up effect of a differential variable.
The derivative of the differential variable is reset if it continues to increase in the same direction
after exceeding the limits.
During the derivative return, the limiter will be inactive ::
if x > xmax and x dot > 0: x = xmax and x dot = 0
if x < xmin and x dot < 0: x = xmin and x dot = 0
This class takes one more optional parameter for specifying the equation.
Parameters
----------
state : State, ExtState
A State (or ExtState) whose equation value will be checked and, when condition satisfies, will be reset
by the anti-windup-limiter.
"""
def __init__(self, u, lower, upper, enable=True, no_warn=False,
no_lower=False, no_upper=False, sign_lower=1, sign_upper=1,
name=None, tex_name=None, info=None, state=None,
allow_adjust: bool = True,):
super().__init__(u, lower, upper, enable=enable, no_warn=no_warn,
no_lower=no_lower, no_upper=no_upper,
sign_lower=sign_lower, sign_upper=sign_upper,
name=name, tex_name=tex_name, info=info,
allow_adjust=allow_adjust,)
self.state = state if state else u
self.has_check_var = False
self.has_check_eq = True
self.no_warn = no_warn
def check_var(self, *args, **kwargs):
"""
This function is empty. Defers `check_var` to `check_eq`.
"""
pass
def check_eq(self,
allow_adjust=True,
adjust_lower=False,
adjust_upper=False,
is_init: bool = False,
**kwargs):
"""
Check the variables and equations and set the limiter flags.
Reset differential equation values based on limiter flags.
Notes
-----
The current implementation reallocates memory for `self.x_set` in each call.
Consider improving for speed. (TODO)
"""
if not self.no_upper:
upper_v = -self.upper.v if self.sign_upper.v == -1 else self.upper.v
if self.allow_adjust and is_init:
self.do_adjust_upper(self.u.v, upper_v,
allow_adjust=allow_adjust,
adjust_upper=adjust_upper)
self.zu[:] = np.logical_and(np.greater_equal(self.u.v, upper_v),
np.greater_equal(self.state.e, 0))
if not self.no_lower:
lower_v = -self.lower.v if self.sign_lower.v == -1 else self.lower.v
if self.allow_adjust and is_init:
self.do_adjust_lower(self.u.v, lower_v,
allow_adjust=allow_adjust,
adjust_lower=adjust_lower)
self.zl[:] = np.logical_and(np.less_equal(self.u.v, lower_v),
np.less_equal(self.state.e, 0))
self.zi[:] = np.logical_not(np.logical_or(self.zu, self.zl))
# must flush the `x_set` list at the beginning
self.x_set = list()
if not np.all(self.zi):
idx = np.where(self.zi == 0)
self.state.e[:] = self.state.e * self.zi
self.state.v[:] = self.state.v * self.zi
if not self.no_upper:
self.state.v[:] += upper_v * self.zu
if not self.no_lower:
self.state.v[:] += lower_v * self.zl
self.x_set.append((self.state.a[idx], self.state.v[idx], 0)) # (address, var. values, eqn. values)
# logger.debug(f'AntiWindup for states {self.state.a[idx]}')
# Very important note:
# The set equation values and variable values are collected by `System.fg_to_dae`:
# - Equation values is collected by `System._e_to_dae`,
# - Variable values are collected at the end of `System.fg_to_dae`.
# Also, equation values are processed in `TDS` for resetting the `q`.
[docs]class RateLimiter(Discrete):
"""
Rate limiter for a differential variable.
RateLimiter does not export any variable. It directly modifies the differential equation value.
Notes
-----
RateLimiter inherits from Discrete to avoid internal naming conflicts with `Limiter`.
Warnings
--------
RateLimiter cannot be applied to a state variable that already undergoes an AntiWindup limiter.
Use `AntiWindupRate` for a rate-limited anti-windup limiter.
"""
def __init__(self, u, lower, upper, enable=True,
no_lower=False, no_upper=False, lower_cond=1, upper_cond=1,
name=None, tex_name=None, info=None):
Discrete.__init__(self, name=name, tex_name=tex_name, info=info)
self.u = u
self.rate_lower = dummify(lower)
self.rate_upper = dummify(upper)
# `lower_cond` and `upper_cond` are arrays of 0/1 indicating whether
# the corresponding rate limit should be *enabled*.
# 0 - disabled, 1 - enabled.
# If is `None`, all rate limiters will be enabled.
self.rate_lower_cond = dummify(lower_cond)
self.rate_upper_cond = dummify(upper_cond)
self.mask_lower = None
self.mask_upper = None
self.rate_no_lower = no_lower
self.rate_no_upper = no_upper
self.enable = enable
self.zur = np.array([0])
self.zlr = np.array([0])
self.has_check_eq = True
# Note: save ops by not calculating `zir`
# self.zir = np.array([1])
# self.export_flags = ['zir']
# self.export_flags_tex = ['z_{ir}']
if not self.rate_no_lower:
self.export_flags.append('zlr')
self.export_flags_tex.append('z_{lr}')
self.warn_flags.append(('zlr', 'lower'))
if not self.rate_no_upper:
self.export_flags.append('zur')
self.export_flags_tex.append('z_{ur}')
self.warn_flags.append(('zur', 'upper'))
self.param_list.extend([self.rate_lower, self.rate_upper,
self.rate_lower_cond, self.rate_upper_cond])
def check_eq(self, **kwargs):
if not self.enable:
return
if not self.rate_no_lower:
self.zlr[:] = np.less(self.u.e, self.rate_lower.v) # 1 if at the lower rate limit
if self.rate_lower_cond is not None:
self.zlr[:] = self.zlr * self.rate_lower_cond.v # 1 if both at the lower rate limit and enabled
# for where `zlr == 1`, set the equation value to the lower limit
self.u.e[np.where(self.zlr)] = self.rate_lower.v[np.where(self.zlr)]
if not self.rate_no_upper:
self.zur[:] = np.greater(self.u.e, self.rate_upper.v)
if self.rate_upper_cond is not None:
self.zur[:] = self.zur * self.rate_upper_cond.v
self.u.e[np.where(self.zur)] = self.rate_upper.v[np.where(self.zur)]
[docs]class AntiWindupRate(AntiWindup, RateLimiter):
"""
Anti-windup limiter with rate limits
"""
def __init__(self, u, lower, upper, rate_lower, rate_upper,
no_lower=False, no_upper=False, rate_no_lower=False, rate_no_upper=False,
rate_lower_cond=None, rate_upper_cond=None,
enable=True, name=None, tex_name=None, info=None,
allow_adjust: bool = True,):
RateLimiter.__init__(self, u, lower=rate_lower, upper=rate_upper, enable=enable,
no_lower=rate_no_lower, no_upper=rate_no_upper,
lower_cond=rate_lower_cond, upper_cond=rate_upper_cond,
)
AntiWindup.__init__(self, u, lower=lower, upper=upper, enable=enable,
no_lower=no_lower, no_upper=no_upper,
name=name, tex_name=tex_name, info=info,
allow_adjust=allow_adjust,
)
def check_eq(self, **kwargs):
RateLimiter.check_eq(self, **kwargs)
AntiWindup.check_eq(self, **kwargs)
[docs]class Selector(Discrete):
"""
Selection between two variables using the provided reduce function.
The reduce function should take the given number of arguments. An example function is `np.maximum.reduce`
which can be used to select the maximum.
Names are in `s0`, `s1`.
Warnings
--------
A potential bug when more than two inputs are provided, and values in different inputs are equal.
Only two inputs are allowed.
.. deprecated:: 1.5.9
Use of this class for comparison-based output is discouraged.
Instead, use `LessThan` and `Limiter` to construct piesewise equations.
See the new implementation of ``HVGate`` and ``LVGate``.
Examples
--------
Example 1: select the largest value between `v0` and `v1` and put it into vmax.
After the definitions of `v0` and `v1`, define the algebraic variable `vmax` for the largest value,
and a selector `vs` ::
self.vmax = Algeb(v_str='maximum(v0, v1)',
tex_name='v_{max}',
e_str='vs_s0 * v0 + vs_s1 * v1 - vmax')
self.vs = Selector(self.v0, self.v1, fun=np.maximum.reduce)
The initial value of `vmax` is calculated by ``maximum(v0, v1)``, which is the element-wise maximum in SymPy
and will be generated into ``np.maximum(v0, v1)``. The equation of `vmax` is to select the values based on
`vs_s0` and `vs_s1`.
Notes
-----
A common pitfall is the 0-based indexing in the Selector flags. Note that exported flags start from 0. Namely,
`s0` corresponds to the first variable provided for the Selector constructor.
See Also
--------
numpy.ufunc.reduce : NumPy reduce function
"""
def __init__(self, *args, fun, tex_name=None, info=None):
super().__init__(tex_name=tex_name, info=info)
# TODO: only allow two inputs
self.input_vars = args
self.fun = fun
self.n = len(args)
self._inputs = None
self._outputs = None
# TODO: allow custom initial value
for i in range(len(self.input_vars)):
self.__dict__[f's{i}'] = np.array([0])
self.export_flags = [f's{i}' for i in range(len(self.input_vars))]
self.export_flags_tex = [f's_{i}' for i in range(len(self.input_vars))]
self.input_list = args
self.has_check_var = True
def check_var(self, *args, **kwargs):
"""
Set the i-th variable's flags to 1 if the return of the reduce function equals the i-th input.
"""
if self._inputs is None:
# input is only evaluated at the first time due to memory stability
self._inputs = [self.input_vars[i].v for i in range(self.n)]
if self._outputs is None:
self._outputs = self.fun(self._inputs)
else:
self._outputs[:] = self.fun(self._inputs)
for i in range(self.n):
self.__dict__[f's{i}'][:] = np.equal(self._inputs[i], self._outputs)
[docs]class Switcher(Discrete):
"""
Switcher based on an input parameter.
The switch class takes one v-provider, compares the input with each value in the option list, and exports
one flag array for each option. The flags are 0-indexed.
Exported flags are named with `_s0`, `_s1`, ..., with a total number of `len(options)`. See the examples
section.
Notes
-----
Switches needs to be distinguished from Selector.
Switcher is for generating flags indicating option selection based on an input parameter. Selector is
for generating flags at run time based on variable values and a selection function.
Examples
--------
The IEEEST model takes an input for selecting the signal. Options are 1 through 6.
One can construct ::
self.IC = NumParam(info='input code 1-6') # input code
self.SW = Switcher(u=self.IC, options=[0, 1, 2, 3, 4, 5, 6])
If the IC values from the data file ends up being ::
self.IC.v = np.array([1, 2, 2, 4, 6])
Then, the exported flag arrays will be ::
{'IC_s0': np.array([0, 0, 0, 0, 0]),
'IC_s1': np.array([1, 0, 0, 0, 0]),
'IC_s2': np.array([0, 1, 1, 0, 0]),
'IC_s3': np.array([0, 0, 0, 0, 0]),
'IC_s4': np.array([0, 0, 0, 1, 0]),
'IC_s5': np.array([0, 0, 0, 0, 0]),
'IC_s6': np.array([0, 0, 0, 0, 1])
}
where `IC_s0` is used for padding so that following flags align with the options.
"""
def __init__(self, u, options: Union[list, Tuple], info: str = None,
name: str = None, tex_name: str = None, cache=True,):
super().__init__(name=name, tex_name=tex_name, info=info,)
self.u = u
self.options: Union[List, Tuple] = options
self.cache: bool = cache
self._eval: bool = False # if the flags has been evaluated
for i in range(len(options)):
self.__dict__[f's{i}'] = 0
self.export_flags = [f's{i}' for i in range(len(options))]
self.export_flags_tex = [f's_{i}' for i in range(len(options))]
self.input_list.extend([self.u])
self.has_check_var = True
def check_var(self, *args, **kwargs):
"""
Set the switcher flags based on inputs. Uses cached flags if cache is set to True.
"""
if self.cache and self._eval:
return
for v in self.u.v:
if v not in self.options and not np.isnan(v):
raise ValueError(f'option {v} is invalid for {self.owner.class_name}.{self.u.name}. '
f'Options are {self.options}.')
if len(self.u.v) > 0:
for i in range(len(self.options)):
self.__dict__[f's{i}'][:] = np.equal(self.u.v, self.options[i])
self._eval = True
def list2array(self, n):
"""
This forces to evaluate Switcher upon System setup
"""
super().list2array(n)
self.check_var()
[docs]class DeadBand(Limiter):
r"""
The basic deadband type.
Parameters
----------
u : NumParam
The pre-deadband input variable
center : NumParam
Neutral value of the output
lower : NumParam
Lower bound
upper : NumParam
Upper bound
enable : bool
Enabled if True; Disabled and works as a pass-through if False.
Notes
-----
Input changes within a deadband will incur no output changes. This component computes and exports three flags.
Three flags computed from the current input:
- zl: True if the input is below the lower threshold
- zi: True if the input is within the deadband
- zu: True if is above the lower threshold
Initial condition:
All three flags are initialized to zero. All flags are updated during `check_var` when enabled. If the
deadband component is not enabled, all of them will remain zero.
Examples
--------
Exported deadband flags need to be used in the algebraic equation corresponding to the post-deadband variable.
Assume the pre-deadband input variable is `var_in` and the post-deadband variable is `var_out`. First, define a
deadband instance `db` in the model using ::
self.db = DeadBand(u=self.var_in, center=self.dbc,
lower=self.dbl, upper=self.dbu)
To implement a no-memory deadband whose output returns to center when the input is within the band,
the equation for `var` can be written as ::
var_out.e_str = 'var_in * (1 - db_zi) + \
(dbc * db_zi) - var_out'
"""
def __init__(self, u, center, lower, upper,
enable=True, equal=False,
zu=0.0, zl=0.0, zi=0.0,
name=None, tex_name=None, info=None,
):
Limiter.__init__(self, u, lower, upper,
enable=enable, equal=equal, zi=zi, zl=zl, zu=zu,
name=name, tex_name=tex_name, info=info,
allow_adjust=False,)
self.center = dummify(center) # CURRENTLY NOT IN USE
self.param_list.extend([self.center])
def check_var(self, *args, **kwargs):
"""
Notes
-----
Updates three flags: zi, zu, zl based on the following rules:
zu:
1 if u > upper; 0 otherwise.
zl:
1 if u < lower; 0 otherwise.
zi:
not(zu or zl);
"""
Limiter.check_var(self, *args, **kwargs)
[docs]class DeadBandRT(DeadBand):
r"""
Deadband with flags for directions of return.
Parameters
----------
u : NumParam
The pre-deadband input variable
center : NumParam
Neutral value of the output
lower : NumParam
Lower bound
upper : NumParam
Upper bound
enable : bool
Enabled if True; Disabled and works as a pass-through if False.
Notes
-----
Input changes within a deadband will incur no output changes. This component computes and exports five flags.
The additional two flags on top of `DeadBand` indicate the direction of return:
- zur: True if the input is/has been within the deadband and was returned from the upper threshold
- zlr: True if the input is/has been within the deadband and was returned from the lower threshold
Initial condition:
All five flags are initialized to zero. All flags are updated during `check_var` when enabled. If the
deadband component is not enabled, all of them will remain zero.
Examples
--------
To implement a deadband whose output is pegged at the nearest deadband bounds, the equation for `var` can be
provided as ::
var_out.e_str = 'var_in * (1 - db_zi) + \
dbl * db_zlr + \
dbu * db_zur - var_out'
"""
def __init__(self, u, center, lower, upper, enable=True):
"""
"""
DeadBand.__init__(self, u, center=center, lower=lower, upper=upper, enable=enable)
# default state if not enabled
self.zur = np.array([0.])
self.zlr = np.array([0.])
self.export_flags.extend(['zur', 'zlr'])
self.export_flags_tex.extend(['z_ur', 'z_lr'])
self.has_check_var = True
def check_var(self, *args, **kwargs):
"""
Notes
-----
Updates five flags: zi, zu, zl; zur, and zlr based on the following rules:
zu:
1 if u > upper; 0 otherwise.
zl:
1 if u < lower; 0 otherwise.
zi:
not(zu or zl);
zur:
- set to 1 when (previous zu + present zi == 2)
- hold when (previous zi == zi)
- clear otherwise
zlr:
- set to 1 when (previous zl + present zi == 2)
- hold when (previous zi == zi)
- clear otherwise
"""
DeadBand.check_var(self, *args, **kwargs)
if not self.enable:
return
# square return dead band
self.zur[:] = np.equal(self.zu + self.zi, 2) + self.zur * np.equal(self.zi, self.zi)
self.zlr[:] = np.equal(self.zl + self.zi, 2) + self.zlr * np.equal(self.zi, self.zi)
[docs]class Delay(Discrete):
"""
Delay class to memorize past variable values.
Delay allows to impose a predefined "delay" (in either steps or seconds)
for an input variable. The amount of delay is a scalar and has to be fixed
at model definition in the current implementation.
"""
def __init__(self, u, mode='step', delay=0,
name=None, tex_name=None, info=None):
Discrete.__init__(self, name=name, tex_name=tex_name, info=info)
if mode not in ('step', 'time'):
raise ValueError(f'mode {mode} is invalid. Must be in "step" or "time"')
self.u = u
self.mode = mode
self.delay = delay
self.export_flags = ['v']
self.export_flags_tex = ['v']
self.input_list.extend([u])
self.has_check_var = True
self.t = np.array([0])
self.v = np.array([0])
self._v_mem = np.zeros((0, 1))
self.rewind = False
def list2array(self, n):
"""
Allocate memory for storage arrays.
"""
super().list2array(n)
if self.mode == 'step':
self._v_mem = np.zeros((n, self.delay + 1))
self.t = np.zeros(self.delay + 1)
else:
self._v_mem = np.zeros((n, 1))
def check_var(self, dae_t, *args, **kwargs):
# Storage:
# Output values is in the first col.
# Latest values are stored in /appended to the last column
self.rewind = False
if dae_t == 0:
self._v_mem[:] = self.u.v[:, None]
elif dae_t < self.t[-1]:
self.rewind = True
self.t[-1] = dae_t
self._v_mem[:, -1] = self.u.v
elif dae_t == self.t[-1]:
self._v_mem[:, -1] = self.u.v
elif dae_t > self.t[-1]:
if self.mode == 'step':
self.t[:-1] = self.t[1:]
self.t[-1] = dae_t
self._v_mem[:, :-1] = self._v_mem[:, 1:]
self._v_mem[:, -1] = self.u.v
else:
self.t = np.append(self.t, dae_t)
self._v_mem = np.hstack((self._v_mem, self.u.v[:, None]))
if dae_t - self.t[0] > self.delay:
t_interp = dae_t - self.delay
idx = np.argmax(self.t >= t_interp) - 1
v_interp = interp_n2(t_interp,
self.t[idx:idx+2],
self._v_mem[:, idx:idx + 2])
self.t[idx] = t_interp
self._v_mem[:, idx] = v_interp
self.t = np.delete(self.t, np.arange(0, idx))
self._v_mem = np.delete(self._v_mem, np.arange(0, idx), axis=1)
self.v[:] = self._v_mem[:, 0]
def __repr__(self):
out = ''
out += f'v:\n {self.v}\n'
out += f't:\n {self.t}\n'
out += f'_v_men: \n {self._v_mem}\n'
return out
[docs]class Average(Delay):
"""
Compute the average of a BaseVar over a period of time or a number of samples.
"""
def check_var(self, dae_t, *args, **kwargs):
Delay.check_var(self, dae_t, *args, **kwargs)
if dae_t == 0:
self.v[:] = self._v_mem[:, -1]
self._v_mem[:, :-1] = 0
return
else:
nt = len(self.t)
self.v[:] = 0.5 * np.sum((self._v_mem[:, 1-nt:] + self._v_mem[:, -nt:-1]) *
(self.t[1:] - self.t[:-1]), axis=1) / (self.t[-1] - self.t[0])
[docs]class Derivative(Delay):
"""
Compute the derivative of an algebraic variable using numerical differentiation.
"""
def __init__(self, u, name=None, tex_name=None, info=None):
Delay.__init__(self, u=u, mode='step', delay=1,
name=name, tex_name=tex_name, info=info)
def check_var(self, dae_t, *args, **kwargs):
Delay.check_var(self, dae_t, *args, **kwargs)
# Note:
# Very small derivatives (< 1e-8) could cause numerical problems (chattering).
# Need to reset the output to zero following a rewind.
if (dae_t == 0) or (self.rewind is True):
self.v[:] = 0
else:
self.v[:] = (self._v_mem[:, 1] - self._v_mem[:, 0]) / (self.t[1] - self.t[0])
self.v[np.where(self.v < 1e-8)] = 0
[docs]class Sampling(Discrete):
"""
Sample an input variable repeatedly at a given time interval.
"""
def __init__(self, u, interval=1.0, offset=0.0, name=None, tex_name=None, info=None):
Discrete.__init__(self, name=name, tex_name=tex_name, info=info)
self.u = u
self.interval = interval
self.offset = offset
self.export_flags = ['v']
self.export_flags_tex = ['v']
self.input_list.extend([self.u])
self.has_check_var = True
self.v = np.array([0])
self._last_t = np.array([0])
self._last_v = np.array([0])
self.indices = np.array([0])
self.rewind = False
def list2array(self, n):
super().list2array(n)
self._last_v = np.zeros(n)
def check_var(self, dae_t, *args, **kwargs):
"""
Check and update the output.
Notes
-----
Present output stored in `v`. Output of the last step is stored in `_last_v`.
Time for the last output is stored in `_last_t`.
Initially, store `v` and `_last_v`.
If time progresses and `dae_t` is a multiple of `period`, update `_last_v` and then `v`.
Record `_last_t`.
If time does not progress, update `v`.
If time rewinds, restore `_last_v` to `v`.
"""
self.rewind = False
if dae_t == 0: # initial step
self._last_v[:] = self.u.v[:]
self.v[:] = self.u.v[:]
elif dae_t > self._last_t:
do_sample = (dae_t - self.offset - self._last_t) > self.interval
if do_sample:
self._last_v[:] = self.v
self.v[:] = self.u.v
self._last_t[0] = dae_t
elif dae_t == self._last_t:
if len(self.indices) > 0:
self.v[:] = self.u.v
else:
# if dae_t < self._last_t
self.rewind = True
if self._last_t[0] > dae_t:
self.v[:] = self._last_v
self._last_t[0] = dae_t
[docs]class ShuntAdjust(Discrete):
"""
Class for adjusting switchable shunts.
Parameters
----------
v : BaseVar
Voltage measurement
lower : BaseParam
Lower voltage bound
upper : BaseParam
Upper voltage bound
bsw : SwBlock
SwBlock instance for susceptance
gsw : SwBlock
SwBlock instance for conductance
dt : NumParam
Delay time
u : NumParam
Connection status
min_iter : int
Minimum iteration number to enable shunt switching
err_tol : float
Minimum iteration tolerance to enable switching
"""
def __init__(self, *, v, lower, upper, bsw, gsw, dt, u, enable=True,
min_iter=2, err_tol=1e-2,
name=None, tex_name=None, info=None, no_warn=False):
Discrete.__init__(self, name=name, tex_name=tex_name, info=info,
no_warn=no_warn)
self.v = v
self.lower = lower
self.upper = upper
self.bsw = bsw
self.gsw = gsw
self.dt = dt
self.u = u
self.enable = enable
self.min_iter = min_iter
self.err_tol = err_tol
self.has_check_var = True
self.input_list.extend([self.v])
self.param_list.extend([self.lower, self.upper, self.u])
self.t_last = None
self.t_enable = None
self.direction = None
def check_var(self, dae_t, *args, niter=None, err=None, **kwargs):
"""
Check voltage and perform shunt switching.
Parameters
----------
niter : int or None
Current iteration step
"""
if not self.enable:
return
if self.t_last is None:
self.t_last = np.zeros_like(self.v.v)
self.t_enable = np.ones_like(self.v.v, dtype=int)
self.direction = np.zeros_like(self.v.v, dtype=int)
if not self.check_iter_err(niter=niter, err=err):
return
# determine the shunt +/- direction
self.direction[:] = 0
self.direction[np.logical_and(self.v.v < self.lower.v,
self.bsw.sel < self.bsw.maxsel)] = 1
self.direction[np.logical_and(self.v.v > self.upper.v,
self.bsw.sel > 0)] = -1
self.direction *= self.u.v.astype(int) # consider online statuses
if not np.any(self.direction):
return
# allow unlimited switching in power flow
if dae_t == 0.0:
self.t_enable[:] = 1.0
# consider delay for time-domain simulation
else:
self.t_enable[:] = (dae_t - self.t_last - self.dt.v) >= 0
self.direction[:] *= self.t_enable
if not np.any(self.direction):
return
logger.debug("--- Shunt Switch at t=%.6g, niter=%d ---", dae_t, niter)
logger.debug("Delay enable flags=%s, adjusted levels=%s.",
self.t_enable, self.direction)
logger.debug("Bus voltage=%s", self.v.v)
logger.debug("Before: b=%s, g=%s", self.bsw.v, self.gsw.v)
self.bsw.adjust(self.direction)
self.gsw.adjust(self.direction)
self.t_last[self.direction != 0] = dae_t
logger.debug("After: b=%s, g=%s", self.bsw.v, self.gsw.v)