import logging
import inspect
from collections import OrderedDict
import numpy as np
from andes.core.service import BackRef
from andes.utils.func import list_flatten
logger = logging.getLogger(__name__)
[docs]class GroupBase:
"""
Base class for groups.
"""
[docs] def __init__(self):
self.common_params = ['u', 'name']
self.common_vars = []
self.models = OrderedDict() # model name: model instance
self._idx2model = OrderedDict() # element idx: model instance
self.uid = {} # idx - group internal 0-indexed uid
self.services_ref = OrderedDict() # BackRef
def __setattr__(self, key, value):
if hasattr(value, 'owner'):
if value.owner is None:
value.owner = self
if hasattr(value, 'name'):
if value.name is None:
value.name = key
if isinstance(value, BackRef):
self.services_ref[key] = value
super().__setattr__(key, value)
@property
def class_name(self):
return self.__class__.__name__
@property
def n(self):
"""
Total number of devices.
"""
return len(self._idx2model)
[docs] def add_model(self, name: str, instance):
"""
Add a Model instance to group.
Parameters
----------
name : str
Model name
instance : Model
Model instance
Returns
-------
None
"""
if name not in self.models:
self.models[name] = instance
else:
raise KeyError(f"{self.class_name}: Duplicate model registration of {name}")
[docs] def add(self, idx, model):
"""
Register an idx from model_name to the group
Parameters
----------
idx: Union[str, float, int]
Register an element to a model
model: Model
instance of the model
Returns
-------
"""
if idx in self._idx2model:
raise KeyError(f'Group <{self.class_name}> already contains <{repr(idx)}> from '
f'<{self._idx2model[idx].class_name}>')
self.uid[idx] = self.n
self._idx2model[idx] = model
[docs] def idx2model(self, idx, allow_none=False):
"""
Find model name for the given idx.
Parameters
----------
idx : float, int, str, array-like
idx or idx-es of devices.
allow_none : bool
If True, return `None` at the positions where idx is not found.
Returns
-------
If `idx` is a list, return a list of model instances.
If `idx` is a single element, return a model instance.
"""
ret = []
idx, single = self._1d_vectorize(idx)
for i in idx:
try:
if i is None and allow_none:
ret.append(None)
else:
ret.append(self._idx2model[i])
except KeyError:
raise KeyError(f'Group <{self.class_name}> does not contain device with idx={i}')
if single:
ret = ret[0]
return ret
[docs] def idx2uid(self, idx):
"""
Convert idx to the 0-indexed unique index.
Parameters
----------
idx : array-like, numbers, or str
idx of devices
Returns
-------
list
A list containing the unique indices of the devices
"""
vec_idx, single = self._1d_vectorize(idx)
out = [self.uid[i] if i is not None else None for i in vec_idx]
if single:
out = out[0]
return out
[docs] def get(self, src: str, idx, attr: str = 'v', allow_none=False, default=0.0):
"""
Based on the indexer, get the `attr` field of the `src` parameter or variable.
Parameters
----------
src : str
param or var name
idx : array-like
device idx
attr
The attribute of the param or var to retrieve
allow_none : bool
True to allow None values in the indexer
default : float
If `allow_none` is true, the default value to use for None indexer.
Returns
-------
The requested param or variable attribute. If `idx` is a list, return a list of values.
If `idx` is a single element, return a single value.
"""
self._check_src(src)
self._check_idx(idx)
idx, single = self._1d_vectorize(idx)
n = len(idx)
if n == 0:
return np.zeros(0)
ret = [''] * n
_type_set = False
models = self.idx2model(idx, allow_none=allow_none)
for i, idx in enumerate(idx):
if models[i] is not None:
uid = models[i].idx2uid(idx)
instance = models[i].__dict__[src]
val = instance.__dict__[attr][uid]
else:
val = default
# deduce the type for ret
if not _type_set:
if isinstance(val, str):
ret = [''] * n
else:
ret = np.zeros(n)
_type_set = True
ret[i] = val
if single:
ret = ret[0]
return ret
[docs] def set(self, src: str, idx, attr, value):
"""
Set the value of an attribute of a group property.
Performs ``self.<src>.<attr>[idx] = value``.
The user needs to ensure that the property is shared by all models
in this group.
Parameters
----------
src : str
Name of property.
idx : str, int, float, array-like
Indices of devices.
attr : str, optional, default='v'
The internal attribute of the property to get.
``v`` for values, ``a`` for address, and ``e`` for equation value.
value : array-like
New values to be set
Returns
-------
bool
True when successful.
"""
self._check_src(src)
self._check_idx(idx)
idx, _ = self._1d_vectorize(idx)
models = self.idx2model(idx)
if isinstance(value, (str, int, float, np.integer, np.floating)):
value = [value] * len(idx)
for mdl, ii, val in zip(models, idx, value):
uid = mdl.idx2uid(ii)
mdl.__dict__[src].__dict__[attr][uid] = val
return True
[docs] def find_idx(self, keys, values, allow_none=False, default=None):
"""
Find indices of devices that satisfy the given `key=value` condition.
This method iterates over all models in this group.
"""
indices_found = []
# `indices_found` contains found indices returned from all models of this group
for model in self.models.values():
indices_found.append(model.find_idx(keys, values, allow_none=True, default=default))
out = []
for idx, idx_found in enumerate(zip(*indices_found)):
if not allow_none:
if idx_found.count(None) == len(idx_found):
missing_values = [item[idx] for item in values]
raise IndexError(f'{list(keys)} = {missing_values} not found in {self.class_name}')
real_idx = default
for item in idx_found:
if item is not None:
real_idx = item
break
out.append(real_idx)
return out
def _check_src(self, src: str):
"""
Helper function for checking if ``src`` is a shared field.
The requirement is not strictly enforced and is only for debugging purposed.
"""
if src not in self.common_vars + self.common_params:
logger.debug(f'Group <{self.class_name}> does not share property <{src}>.')
def _check_idx(self, idx):
"""
Helper function for checking if ``idx`` is None.
Raises IndexError if idx is None.
"""
if idx is None:
raise IndexError(f'{self.class_name}: idx cannot be None')
def _1d_vectorize(self, idx):
"""
Helper function to convert a single element, list, or nested lists
into a list.
If the input is a nested list, flatten it into a 1-dimensional
list.
Returns
-------
idx : list
List of indices.
single : bool
True if the input is a single element.
"""
single = False
list_alike = (list, tuple, np.ndarray)
if not isinstance(idx, list_alike):
idx = [idx]
single = True
elif len(idx) > 0 and isinstance(idx[0], list_alike):
idx = list_flatten(idx)
return idx, single
[docs] def get_field(self, src: str, idx, field: str):
"""
Helper function for retrieving an attribute of a member variable shared
by models in this group.
Returns
-------
list
A list with the length equal to ``len(idx)``.
"""
self._check_src(src)
self._check_idx(idx)
idx, _ = self._1d_vectorize(idx)
models = self.idx2model(idx, allow_none=True)
ret = [None] * len(models)
for ii, model in enumerate(models):
if model is not None:
ret[ii] = getattr(model.__dict__[src], field)
return ret
[docs] def set_backref(self, name, from_idx, to_idx):
"""
Set idxes to ``BackRef``, and set them to models.
"""
uid = self.idx2uid(to_idx)
self.services_ref[name].v[uid].append(from_idx)
model = self.idx2model(to_idx)
model.set_backref(name, from_idx, to_idx)
[docs] def get_next_idx(self, idx=None, model_name=None):
"""
Get a no-conflict idx for a new device.
Use the provided ``idx`` if no conflict.
Generate a new one otherwise.
Parameters
----------
idx : str or None
Proposed idx. If None, assign a new one.
model_name : str or None
Model name. If not, prepend the group name.
Returns
-------
str
New device name.
"""
if model_name is None:
model_name = self.class_name
need_new = False
if idx is not None:
if idx not in self._idx2model:
# name is good
pass
else:
logger.warning("Group <%s>: idx=%s is used by %s. Data may be inconsistent.",
self.class_name, idx, self.idx2model(idx).class_name)
need_new = True
else:
need_new = True
if need_new is True:
count = self.n
while True:
# IMPORTANT: automatically assigned index is 1-indexed. Namely, `GENCLS_1` is the first generator.
# This is because when we say, for example, `GENCLS_10`, people usually assume it starts at 1.
idx = model_name + '_' + str(count + 1)
if idx not in self._idx2model:
break
else:
count += 1
return idx
[docs] def doc(self, export='plain'):
"""
Return the documentation of the group in a string.
"""
out = ''
if export == 'rest':
out += f'.. _{self.class_name}:\n\n'
group_header = '=' * 80 + '\n'
else:
group_header = ''
if export == 'rest':
out += group_header + f'{self.class_name}\n' + group_header
else:
out += group_header + f'Group <{self.class_name}>\n' + group_header
if self.__doc__ is not None:
out += inspect.cleandoc(self.__doc__) + '\n\n'
if len(self.common_params):
out += 'Common Parameters: ' + ', '.join(self.common_params)
out += '\n\n'
if len(self.common_vars):
out += 'Common Variables: ' + ', '.join(self.common_vars)
out += '\n\n'
if len(self.models):
out += 'Available models:\n'
model_name_list = list(self.models.keys())
if export == 'rest':
def add_reference(name_list):
return [f'{item}_' for item in name_list]
model_name_list = add_reference(model_name_list)
out += ',\n'.join(model_name_list) + '\n'
return out
[docs] def doc_all(self, export='plain'):
"""
Return documentation of the group and its models.
Parameters
----------
export : 'plain' or 'rest'
Export format, plain-text or RestructuredText
Returns
-------
str
"""
out = self.doc(export=export)
out += '\n'
for instance in self.models.values():
out += instance.doc(export=export)
out += '\n'
return out
class Undefined(GroupBase):
"""
The undefined group. Holds models with no ``group``.
"""
pass
class ACTopology(GroupBase):
def __init__(self):
super().__init__()
self.common_vars.extend(('a', 'v'))
class DCTopology(GroupBase):
def __init__(self):
super().__init__()
self.common_vars.extend(('v',))
class Collection(GroupBase):
"""Collection of topology models"""
pass
class Calculation(GroupBase):
"""Group of classes that calculates based on other models."""
pass
class StaticGen(GroupBase):
"""
Static generator group for power flow calculation
"""
def __init__(self):
super().__init__()
self.common_params.extend(('Sn', 'Vn', 'p0', 'q0', 'ra', 'xs', 'subidx'))
self.common_vars.extend(('p', 'q', 'a', 'v'))
self.SynGen = BackRef()
class ACLine(GroupBase):
def __init__(self):
super(ACLine, self).__init__()
self.common_params.extend(('bus1', 'bus2', 'r', 'x'))
self.common_vars.extend(('v1', 'v2', 'a1', 'a2'))
class ACShort(GroupBase):
def __init__(self):
super(ACShort, self).__init__()
self.common_params.extend(('bus1', 'bus2'))
self.common_vars.extend(('v1', 'v2', 'a1', 'a2'))
class StaticLoad(GroupBase):
"""
Static load group.
"""
pass
class StaticShunt(GroupBase):
"""
Static shunt compensator group.
"""
pass
class DynLoad(GroupBase):
"""
Dynamic load group.
"""
pass
class SynGen(GroupBase):
"""
Synchronous generator group.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('Sn', 'Vn', 'fn', 'bus', 'M', 'D', 'subidx'))
self.common_vars.extend(('omega', 'delta', 'tm', 'te', 'vf', 'XadIfd', 'vd', 'vq', 'Id', 'Iq',
'a', 'v'))
self.TurbineGov = BackRef()
self.Exciter = BackRef()
class RenGen(GroupBase):
"""
Renewable generator (converter) group.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('bus', 'gen', 'Sn'))
self.common_vars.extend(('Pe', 'Qe'))
class RenExciter(GroupBase):
"""
Renewable electrical control (exciter) group.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('reg',))
self.common_vars.extend(('Pref', 'Qref', 'wg', 'Pord'))
class RenPlant(GroupBase):
"""
Renewable plant control group.
"""
def __init__(self):
super().__init__()
class RenGovernor(GroupBase):
"""
Renewable turbine governor group.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('ree', 'w0', 'Sn', 'Pe0'))
self.common_vars.extend(('Pm', 'wr0', 'wt', 'wg', 's3_y'))
class RenAerodynamics(GroupBase):
"""
Renewable aerodynamics group.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('rego',))
self.common_vars.extend(('theta',))
class RenPitch(GroupBase):
"""
Renewable generator pitch controller group.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('rea',))
class RenTorque(GroupBase):
"""
Renewable torque (Pref) controller.
"""
def __init__(self):
super().__init__()
class DG(GroupBase):
"""
Distributed generation (small-scale).
"""
def __init__(self):
super().__init__()
self.common_params.extend(('bus', 'fn'))
class DGProtection(GroupBase):
"""
Protection model for DG.
"""
def __init__(self):
super().__init__()
class TurbineGov(GroupBase):
"""
Turbine governor group for synchronous generator.
"""
def __init__(self):
super().__init__()
self.common_vars.extend(('pout',))
class Exciter(GroupBase):
"""
Exciter group for synchronous generators.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('syn',))
self.common_vars.extend(('vout', 'vi',))
self.VoltComp = BackRef()
self.PSS = BackRef()
class VoltComp(GroupBase):
"""
Voltage compensator group for synchronous generators.
"""
def __init__(self):
super().__init__()
self.common_params.extend(('rc', 'xc',))
self.common_vars.extend(('vcomp',))
class PSS(GroupBase):
"""Power system stabilizer group."""
def __init__(self):
super().__init__()
self.common_vars.extend(('vsout',))
class Experimental(GroupBase):
"""Experimental group"""
pass
class DCLink(GroupBase):
"""Basic DC links"""
pass
class StaticACDC(GroupBase):
"""AC DC device for power flow"""
pass
class TimedEvent(GroupBase):
"""Timed event group"""
pass
class FreqMeasurement(GroupBase):
"""Frequency measurements."""
def __init__(self):
super().__init__()
self.common_vars.extend(('f',))
class PhasorMeasurement(GroupBase):
"""Phasor measurements"""
def __init__(self):
super().__init__()
self.common_vars.extend(('am', 'vm'))
class Motor(GroupBase):
"""Induction Motor group
"""
def __init__(self):
super().__init__()
class Information(GroupBase):
"""
Group for information container models.
"""
def __init__(self):
GroupBase.__init__(self)
self.common_params = []