Source code for andes.models.group

import logging
import inspect
from collections import OrderedDict

import numpy as np

from andes.core.service import BackRef
from andes.utils.func import list_flatten

logger = logging.getLogger(__name__)


[docs]class GroupBase: """ Base class for groups. """
[docs] def __init__(self): self.common_params = ['u', 'name'] self.common_vars = [] self.models = OrderedDict() # model name: model instance self._idx2model = OrderedDict() # element idx: model instance self.uid = {} # idx - group internal 0-indexed uid self.services_ref = OrderedDict() # BackRef
def __setattr__(self, key, value): if hasattr(value, 'owner'): if value.owner is None: value.owner = self if hasattr(value, 'name'): if value.name is None: value.name = key if isinstance(value, BackRef): self.services_ref[key] = value super().__setattr__(key, value) @property def class_name(self): return self.__class__.__name__ @property def n(self): """ Total number of devices. """ return len(self._idx2model)
[docs] def add_model(self, name: str, instance): """ Add a Model instance to group. Parameters ---------- name : str Model name instance : Model Model instance Returns ------- None """ if name not in self.models: self.models[name] = instance else: raise KeyError(f"{self.class_name}: Duplicate model registration of {name}")
[docs] def add(self, idx, model): """ Register an idx from model_name to the group Parameters ---------- idx: Union[str, float, int] Register an element to a model model: Model instance of the model Returns ------- """ if idx in self._idx2model: raise KeyError(f'Group <{self.class_name}> already contains <{repr(idx)}> from ' f'<{self._idx2model[idx].class_name}>') self.uid[idx] = self.n self._idx2model[idx] = model
[docs] def idx2model(self, idx, allow_none=False): """ Find model name for the given idx. Parameters ---------- idx : float, int, str, array-like idx or idx-es of devices. allow_none : bool If True, return `None` at the positions where idx is not found. Returns ------- If `idx` is a list, return a list of model instances. If `idx` is a single element, return a model instance. """ ret = [] idx, single = self._1d_vectorize(idx) for i in idx: try: if i is None and allow_none: ret.append(None) else: ret.append(self._idx2model[i]) except KeyError: raise KeyError(f'Group <{self.class_name}> does not contain device with idx={i}') if single: ret = ret[0] return ret
[docs] def idx2uid(self, idx): """ Convert idx to the 0-indexed unique index. Parameters ---------- idx : array-like, numbers, or str idx of devices Returns ------- list A list containing the unique indices of the devices """ vec_idx, single = self._1d_vectorize(idx) out = [self.uid[i] if i is not None else None for i in vec_idx] if single: out = out[0] return out
[docs] def get(self, src: str, idx, attr: str = 'v', allow_none=False, default=0.0): """ Based on the indexer, get the `attr` field of the `src` parameter or variable. Parameters ---------- src : str param or var name idx : array-like device idx attr The attribute of the param or var to retrieve allow_none : bool True to allow None values in the indexer default : float If `allow_none` is true, the default value to use for None indexer. Returns ------- The requested param or variable attribute. If `idx` is a list, return a list of values. If `idx` is a single element, return a single value. """ self._check_src(src) self._check_idx(idx) idx, single = self._1d_vectorize(idx) n = len(idx) if n == 0: return np.zeros(0) ret = [''] * n _type_set = False models = self.idx2model(idx, allow_none=allow_none) for i, idx in enumerate(idx): if models[i] is not None: uid = models[i].idx2uid(idx) instance = models[i].__dict__[src] val = instance.__dict__[attr][uid] else: val = default # deduce the type for ret if not _type_set: if isinstance(val, str): ret = [''] * n else: ret = np.zeros(n) _type_set = True ret[i] = val if single: ret = ret[0] return ret
[docs] def set(self, src: str, idx, attr, value): """ Set the value of an attribute of a group property. Performs ``self.<src>.<attr>[idx] = value``. The user needs to ensure that the property is shared by all models in this group. Parameters ---------- src : str Name of property. idx : str, int, float, array-like Indices of devices. attr : str, optional, default='v' The internal attribute of the property to get. ``v`` for values, ``a`` for address, and ``e`` for equation value. value : array-like New values to be set Returns ------- bool True when successful. """ self._check_src(src) self._check_idx(idx) idx, _ = self._1d_vectorize(idx) models = self.idx2model(idx) if isinstance(value, (str, int, float, np.integer, np.floating)): value = [value] * len(idx) for mdl, ii, val in zip(models, idx, value): uid = mdl.idx2uid(ii) mdl.__dict__[src].__dict__[attr][uid] = val return True
[docs] def find_idx(self, keys, values, allow_none=False, default=None): """ Find indices of devices that satisfy the given `key=value` condition. This method iterates over all models in this group. """ indices_found = [] # `indices_found` contains found indices returned from all models of this group for model in self.models.values(): indices_found.append(model.find_idx(keys, values, allow_none=True, default=default)) out = [] for idx, idx_found in enumerate(zip(*indices_found)): if not allow_none: if idx_found.count(None) == len(idx_found): missing_values = [item[idx] for item in values] raise IndexError(f'{list(keys)} = {missing_values} not found in {self.class_name}') real_idx = default for item in idx_found: if item is not None: real_idx = item break out.append(real_idx) return out
def _check_src(self, src: str): """ Helper function for checking if ``src`` is a shared field. The requirement is not strictly enforced and is only for debugging purposed. """ if src not in self.common_vars + self.common_params: logger.debug(f'Group <{self.class_name}> does not share property <{src}>.') def _check_idx(self, idx): """ Helper function for checking if ``idx`` is None. Raises IndexError if idx is None. """ if idx is None: raise IndexError(f'{self.class_name}: idx cannot be None') def _1d_vectorize(self, idx): """ Helper function to convert a single element, list, or nested lists into a list. If the input is a nested list, flatten it into a 1-dimensional list. Returns ------- idx : list List of indices. single : bool True if the input is a single element. """ single = False list_alike = (list, tuple, np.ndarray) if not isinstance(idx, list_alike): idx = [idx] single = True elif len(idx) > 0 and isinstance(idx[0], list_alike): idx = list_flatten(idx) return idx, single
[docs] def get_field(self, src: str, idx, field: str): """ Helper function for retrieving an attribute of a member variable shared by models in this group. Returns ------- list A list with the length equal to ``len(idx)``. """ self._check_src(src) self._check_idx(idx) idx, _ = self._1d_vectorize(idx) models = self.idx2model(idx, allow_none=True) ret = [None] * len(models) for ii, model in enumerate(models): if model is not None: ret[ii] = getattr(model.__dict__[src], field) return ret
[docs] def set_backref(self, name, from_idx, to_idx): """ Set idxes to ``BackRef``, and set them to models. """ uid = self.idx2uid(to_idx) self.services_ref[name].v[uid].append(from_idx) model = self.idx2model(to_idx) model.set_backref(name, from_idx, to_idx)
[docs] def get_next_idx(self, idx=None, model_name=None): """ Get a no-conflict idx for a new device. Use the provided ``idx`` if no conflict. Generate a new one otherwise. Parameters ---------- idx : str or None Proposed idx. If None, assign a new one. model_name : str or None Model name. If not, prepend the group name. Returns ------- str New device name. """ if model_name is None: model_name = self.class_name need_new = False if idx is not None: if idx not in self._idx2model: # name is good pass else: logger.warning("Group <%s>: idx=%s is used by %s. Data may be inconsistent.", self.class_name, idx, self.idx2model(idx).class_name) need_new = True else: need_new = True if need_new is True: count = self.n while True: # IMPORTANT: automatically assigned index is 1-indexed. Namely, `GENCLS_1` is the first generator. # This is because when we say, for example, `GENCLS_10`, people usually assume it starts at 1. idx = model_name + '_' + str(count + 1) if idx not in self._idx2model: break else: count += 1 return idx
[docs] def doc(self, export='plain'): """ Return the documentation of the group in a string. """ out = '' if export == 'rest': out += f'.. _{self.class_name}:\n\n' group_header = '=' * 80 + '\n' else: group_header = '' if export == 'rest': out += group_header + f'{self.class_name}\n' + group_header else: out += group_header + f'Group <{self.class_name}>\n' + group_header if self.__doc__ is not None: out += inspect.cleandoc(self.__doc__) + '\n\n' if len(self.common_params): out += 'Common Parameters: ' + ', '.join(self.common_params) out += '\n\n' if len(self.common_vars): out += 'Common Variables: ' + ', '.join(self.common_vars) out += '\n\n' if len(self.models): out += 'Available models:\n' model_name_list = list(self.models.keys()) if export == 'rest': def add_reference(name_list): return [f'{item}_' for item in name_list] model_name_list = add_reference(model_name_list) out += ',\n'.join(model_name_list) + '\n' return out
[docs] def doc_all(self, export='plain'): """ Return documentation of the group and its models. Parameters ---------- export : 'plain' or 'rest' Export format, plain-text or RestructuredText Returns ------- str """ out = self.doc(export=export) out += '\n' for instance in self.models.values(): out += instance.doc(export=export) out += '\n' return out
class Undefined(GroupBase): """ The undefined group. Holds models with no ``group``. """ pass class ACTopology(GroupBase): def __init__(self): super().__init__() self.common_vars.extend(('a', 'v')) class DCTopology(GroupBase): def __init__(self): super().__init__() self.common_vars.extend(('v',)) class Collection(GroupBase): """Collection of topology models""" pass class Calculation(GroupBase): """Group of classes that calculates based on other models.""" pass class StaticGen(GroupBase): """ Static generator group. Static generators include PV and Slack, which are used to impose algebraic equations. Static generators are used primarily for power flow. Static generators do not have the modeling details for stability simulation. Although some of them can stay for time-domain simulation, most of them should be substituted by dynamic generators, including synchronous generators and inverter-based resources upon TDS initialization. The substitution is done by setting the ``gen`` field of a dynamic generator to refer to the static generator. To replace one StaticGen by multiple dynamic generators, see the notes in :ref:`SynGen`. Generators connected to the same bus need to have their `gammap` and `gammaq`, respectively, summed up to be exactly 1.0. TDS initialization will ensure that the dynamic generators impose the same amount of power as the static generator. At the end of initialization, `StaticGen`'s that have been substituted will have their connectivity status ``u`` changed to ``0``. """ def __init__(self): super().__init__() self.common_params.extend(('Sn', 'Vn', 'p0', 'q0', 'ra', 'xs', 'subidx')) self.common_vars.extend(('q', 'a', 'v')) self.SynGen = BackRef() class ACLine(GroupBase): def __init__(self): super(ACLine, self).__init__() self.common_params.extend(('bus1', 'bus2', 'r', 'x')) self.common_vars.extend(('v1', 'v2', 'a1', 'a2')) class ACShort(GroupBase): def __init__(self): super(ACShort, self).__init__() self.common_params.extend(('bus1', 'bus2')) self.common_vars.extend(('v1', 'v2', 'a1', 'a2')) class StaticLoad(GroupBase): """ Static load group. """ pass class StaticShunt(GroupBase): """ Static shunt compensator group. """ pass class DynLoad(GroupBase): """ Dynamic load group. """ pass class SynGen(GroupBase): """ Synchronous generator group. SynGen replaces StaticGen upon the initialization of dynamic studies. SynGen and inverter-based resources contain parameters ``gammap`` and ``gammaq`` for splitting the initial power of a StaticGen into multiple dynamic ones. ``gammap``, for example, is the active power ratio of the dynamic generator to the static one. If a StaticGen is supposed to be replaced by one SynGen, the ``gammap`` and ``gammaq`` should both be ``1``. It is critical to ensure that ``gammap`` and ``gammaq``, respectively, of all dynamic power sources sum up to 1.0. Otherwise, the initial power injections imposed by dynamic sources will differ from the static ones. The initialization will then fail with mismatches power injection equations corresponding to bus ``a`` and ``v``. """ def __init__(self): super().__init__() self.common_params.extend(('Sn', 'Vn', 'fn', 'bus', 'M', 'D', 'subidx')) self.common_vars.extend(('omega', 'delta', )) self.idx_island = [] self.uid_island = [] self.delta_addr = [] self.TurbineGov = BackRef() self.Exciter = BackRef() def store_idx_island(self, bus_idx): """ Get ``idx`` of generators in the given islanded. Also store the addresses of the ``delta`` variable in the largest island. This function can only be called after initializing dynamic devices. Parameters ---------- bus_idx : list A list of bus idx in the largest island """ idx_gen = list(self.uid.keys()) bus_gen = self.get(src='bus', idx=idx_gen, attr='v') def intersect(lst1, lst2): return list(set(lst1) & set(lst2)) bus_gen_island = intersect(bus_idx, bus_gen) self.idx_island = self.find_idx(keys='bus', values=bus_gen_island) self.uid_island = self.idx2uid(self.idx_island) self.delta_addr = self.get('delta', self.idx_island, 'a').astype(int) class RenGen(GroupBase): """ Renewable generator (converter) group. See :ref:`SynGen` for the notes on replacing StaticGen and setting the power ratio parameters. Attention is needed for the power base ``Sn``. When replacing a synchronous generator, the renewable generator should have the same or larger ``Sn``. Improper ``Sn`` will cause the initial values to exceed typical per-unit ranges and cause the initialization to fail. """ def __init__(self): super().__init__() self.common_params.extend(('bus', 'gen', 'Sn')) self.common_vars.extend(('Pe', 'Qe')) class RenExciter(GroupBase): """ Renewable electrical control (exciter) group. """ def __init__(self): super().__init__() self.common_params.extend(('reg',)) self.common_vars.extend(('Pref', 'Qref', 'wg', 'Pord')) class RenPlant(GroupBase): """ Renewable plant control group. """ def __init__(self): super().__init__() class RenGovernor(GroupBase): """ Renewable turbine governor group. """ def __init__(self): super().__init__() self.common_params.extend(('ree', 'w0', 'Sn', 'Pe0')) self.common_vars.extend(('Pm', 'wr0', 'wt', 'wg', 's3_y')) class RenAerodynamics(GroupBase): """ Renewable aerodynamics group. """ def __init__(self): super().__init__() self.common_params.extend(('rego',)) self.common_vars.extend(('theta',)) class RenPitch(GroupBase): """ Renewable generator pitch controller group. """ def __init__(self): super().__init__() self.common_params.extend(('rea',)) class RenTorque(GroupBase): """ Renewable torque (Pref) controller. """ def __init__(self): super().__init__() class DG(GroupBase): """ Distributed generation (small-scale). See :ref:`SynGen` for the notes on replacing StaticGen and setting the power ratio parameters. """ def __init__(self): super().__init__() self.common_params.extend(('bus', 'fn')) class DGProtection(GroupBase): """ Protection model for DG. """ def __init__(self): super().__init__() class TurbineGov(GroupBase): """ Turbine governor group for synchronous generator. """ def __init__(self): super().__init__() self.common_vars.extend(('pout',)) class Exciter(GroupBase): """ Exciter group for synchronous generators. """ def __init__(self): super().__init__() self.common_params.extend(('syn',)) self.common_vars.extend(('vout', 'vi',)) self.VoltComp = BackRef() self.PSS = BackRef() class VoltComp(GroupBase): """ Voltage compensator group for synchronous generators. """ def __init__(self): super().__init__() self.common_params.extend(('rc', 'xc',)) self.common_vars.extend(('vcomp',)) class PSS(GroupBase): """Power system stabilizer group.""" def __init__(self): super().__init__() self.common_vars.extend(('vsout',)) class Experimental(GroupBase): """Experimental group""" pass class DCLink(GroupBase): """Basic DC links""" pass class StaticACDC(GroupBase): """AC DC device for power flow""" pass class TimedEvent(GroupBase): """Timed event group""" pass class FreqMeasurement(GroupBase): """Frequency measurements.""" def __init__(self): super().__init__() self.common_vars.extend(('f',)) class PhasorMeasurement(GroupBase): """Phasor measurements""" def __init__(self): super().__init__() self.common_vars.extend(('am', 'vm')) class PLL(GroupBase): """Phase-locked loop models.""" def __init__(self): super().__init__() self.common_vars.extend(('am',)) class Motor(GroupBase): """Induction Motor group """ def __init__(self): super().__init__() class Information(GroupBase): """ Group for information container models. """ def __init__(self): GroupBase.__init__(self) self.common_params = [] class OutputSelect(GroupBase): """ Group for selecting outputs. """ def __init__(self): super().__init__() self.common_params = [] class Interface(GroupBase): """ Group for interface models. """ def __init__(self): super().__init__() self.common_params = [] class DataSeries(GroupBase): """ Group for TimeSeries models. """ def __init__(self): super().__init__() self.common_params = []