Source code for andes.io.streaming

"""
Simulation data streaming interface for CURENT DiME2.
"""

import logging
from time import sleep
import numpy as np
from numpy import ndarray, array

logger = logging.getLogger(__name__)

try:
    from dime import DimeClient
except ImportError:
    logger.debug("Dime import failed.")


[docs]class Streaming: """ ANDES data streaming class to interface with CURENT LTB. """
[docs] def __init__(self, system): self.system = system self.params_built = False self.SysParam = dict() self.SysName = dict() self.Idxvgs = dict() self.ModuleInfo = dict() self.Varheader = list() self.last_devices = list() self.has_pmu = False self.dimec = None
[docs] def connect(self): """ Connect to DiME 2 server. If ``dime_address`` is specified from the command-line, streaming will be automatically enabled. Otherwise, settings from the Config file will be used. """ config = self.system.config options = self.system.options # enable only when both arguments are supplied if options.get("dime_address") is not None: config.dime_enabled = True config.dime_address = options.get("dime_address") if not config.dime_enabled: return False try: self.dimec = DimeClient(config.dime_address) self.dimec.join(config.dime_name) logger.info('Dime connection to "%s" was successful.', config.dime_address) return True except NameError: logger.error('Dime not installed. Set System config `dime_enabled` to `0` to suppress warning.') self.system.config.dime_enabled = False except FileNotFoundError: logger.error('Dime sever not found at "%s".', config.dime_address) self.system.config.dime_enabled = False return False
def _build_SysParam(self): self.SysParam = self.system.as_dict(vin=True, skip_empty=True) self.params_built = True def _build_SysName(self): self.SysName['Bus'] = self.system.Bus.name.v if self.system.Area.n: self.SysName['Areas'] = self.system.Area.name.v def _build_Varheader(self): self.Varheader = self.system.dae.xy_name def _build_Idxvgs(self): m = self.system.dae.m n = self.system.dae.n mn = m + n # NOQA self.Idxvgs['System'] = { 'nBus': self.system.Bus.n, 'nLine': self.system.Line.n, } self.Idxvgs['Bus'] = { 'theta': 1 + n + self.system.Bus.a.a, 'V': 1 + n + self.system.Bus.v.a, 'w_Busfreq': 1 + n + self.system.BusFreq.f.a, # NO LONGER SUPPORTED # 'P': 1 + mn + array(range(self.system.Bus.n)), # 'Q': 1 + mn + self.system.Bus.n + array(range(self.system.Bus.n)), } self.Idxvgs['Pmu'] = { # NOT YET SUPPORTED 'vm': 1 + self.system.PMU.vm.a, 'am': 1 + self.system.PMU.am.a, } # NOT YET SUPPORTED # line0 = 1 + mn + 2 * self.system.Bus.n self.Idxvgs['Line'] = { # 'Pij': line0 + array(range(self.system.Line.n)), # 'Pji': line0 + self.system.Line.n + array(range(self.system.Line.n)), # 'Qij': line0 + 2 * self.system.Line.n + array(range(self.system.Line.n)), # 'Qji': line0 + 3 * self.system.Line.n + array(range(self.system.Line.n)), } self.Idxvgs['Syn'] = { 'delta': 1 + np.append(self.system.GENCLS.delta.a, self.system.GENROU.delta.a), 'omega': 1 + np.append(self.system.GENCLS.omega.a, self.system.GENROU.omega.a), 'e1d': 1 + np.append([0] * self.system.GENCLS.n, self.system.GENROU.e1d.a), 'e1q': 1 + np.append([0] * self.system.GENCLS.n, self.system.GENROU.e1q.a), 'e2d': 1 + np.append([0] * self.system.GENCLS.n, self.system.GENROU.e2d.a), 'e2q': 1 + np.append([0] * self.system.GENCLS.n, self.system.GENROU.e2q.a), 'psid': 1 + np.append([0] * self.system.GENCLS.n, self.system.GENROU.psid.a), 'psiq': 1 + np.append([0] * self.system.GENCLS.n, self.system.GENROU.psiq.a), # NOT SUPPORTED # 'p': 1 + n + array([0] * self.system.GENCLS.n + self.system.GENROU.p.a), # 'q': 1 + n + array([0] * self.system.GENCLS.n + self.system.GENROU.q.a), } self.Idxvgs['Tg'] = { 'pm': 1 + n + self.system.TG2.pout.a, 'wref': 1 + n + self.system.TG2.wref.a, } self.Idxvgs['Exc'] = { # NOT YET READY # 'vf': # 1 + n + array(self.system.AVR1.vfout + self.system.AVR2.vfout + # self.system.AVR3.vfout), # 'vm': # 1 + array(self.system.AVR1.vm + self.system.AVR2.vm + # self.system.AVR3.vm), } # NOT YET READY # if self.system.WTG3.n: # self.Idxvgs['Dfig'] = { # 'omega_m': 1 + array(self.system.WTG3.omega_m), # 'theta_p': 1 + array(self.system.WTG3.theta_p), # 'idr': 1 + array(self.system.WTG3.ird), # 'iqr': 1 + array(self.system.WTG3.irq), # } # if self.system.Node.n: # self.Idxvgs['Node'] = {'v': 1 + n + array(self.system.Node.v)} # # dev_id = { # 1: 'R', # 2: 'C', # 3: 'L', # 4: 'RCp', # 5: 'RCs', # 6: 'RLCp', # 7: 'RLCs', # 8: 'RLs' # } # if 'DCLine' in self.SysParam: # DCLine_types = set(self.SysParam['DCLine'][:, 2]) # idx = [] # for item in DCLine_types: # item = int(item) # idx.extend(self.system.__dict__[dev_id[item]].Idc) # self.Idxvgs['DCLine'] = {'Idc': 1 + array(idx)} # else: # DCLine_types = () # # self.Idxvgs['DCLine'] = {} def _build_list(self, model, params, ret=None): if not ret: ret = [] else: ret = list(ret) for p in params: if isinstance(p, (int, float)): ret.append([p] * len(ret[0])) elif isinstance(p, list): if len(p) != len(ret[0]): raise ValueError("number of params does not match ret list") ret.append(p) else: val = list(self.system.__dict__[model].__dict__[p]) # make sure val does not contain list if isinstance(val[0], list): logger.warning('{}.{} contains list. Reset to zeros.'.format(model, p)) val = [0] * len(val) ret.append(val) return ret def _find_pos(self, model, fkey, src_col=0): """Find the positions of foreign keys in the source model index list""" if isinstance(fkey, ndarray): fkey = fkey.tolist() elif isinstance(fkey, (int, float)): fkey = [fkey] ret = [] model_idx_list = self.SysParam[model][:, src_col].tolist() for item in fkey: ret.append( model_idx_list.index(item) if item in model_idx_list else 0) return ret
[docs] def build_init(self): """ Build `Varheader`, `Idxvgs` and `SysParam` after power flow routine """ self._build_SysParam() self._build_SysName() self._build_Idxvgs() self._build_Varheader()
[docs] def send_init(self, recepient='all'): """ Broadcast `Varheader`, `Idxvgs` and `SysParam` to all DiME clients after power flow routine """ if not self.system.config.dime_enabled: return if not self.params_built: self.build_init() if recepient == 'all': self.last_devices = self.dimec.devices() logger.debug('Connected modules are: ' + ','.join(self.dimec.devices())) logger.debug( 'Broadcasting Varheader, Idxvgs, SysParam and SysName...') sleep(0.05) self.dimec.broadcast_r(Varheader=self.Varheader) sleep(0.05) self.dimec.broadcast_r(Idxvgs=self.Idxvgs) sleep(0.05) try: self.dimec.broadcast_r(SysParam=self.SysParam) self.dimec.broadcast_r(SysName=self.SysName) except: # NOQA logger.warning( 'SysParam or SysName broadcast error.' ' Check bus coordinates.' ) sleep(0.5) else: if not isinstance(recepient, list): recepient = [recepient] for item in recepient: self.dimec.send_r(item, Varheader=self.Varheader) self.dimec.send_r(item, Idxvgs=self.Idxvgs) self.dimec.send_r(item, SysParam=self.SysParam) self.dimec.send_r(item, SysName=self.SysName)
[docs] def record_module_init(self, name, init_var): """ Record the variable requests from modules """ ivar = dict(init_var) var_idx = ivar['vgsvaridx'] ivar['lastk'] = 0 if name not in self.ModuleInfo: self.ModuleInfo[name] = {} if isinstance(var_idx, int): var_idx = array(var_idx, dtype=int) elif isinstance(var_idx, ndarray): var_idx = var_idx.tolist() # unwrap if nested if isinstance(var_idx[0], list): var_idx = array(var_idx[0], dtype=int) else: var_idx = array(var_idx, dtype=int) ivar['vgsvaridx'] = (var_idx - 1).tolist() ivar['lastk'] = 0 self.ModuleInfo[name].update(ivar) logger.debug('Module <%s> requests index %s', name, var_idx)
[docs] @staticmethod def transpose_matlab_row(a): if isinstance(a, ndarray): if a.shape[0] == 1: a = a[0] return a
[docs] def handle_alter(self, Alter): """Handle parameter altering""" pass
[docs] def handle_event(self, Event): """Handle Fault, Breaker, Syn and Load Events""" fields = ('name', 'id', 'action', 'time', 'duration') for key in fields: if key not in Event: logger.warning( 'Event has missing key {}.'.format(key)) return names = self.transpose_matlab_row(Event.get('name')) idxes = self.transpose_matlab_row(Event.get('id')) actions = self.transpose_matlab_row(Event.get('action')) times = self.transpose_matlab_row(Event.get('time')) durations = self.transpose_matlab_row(Event.get('duration')) n = len(names) for i in range(n): try: name = names[i] idx = idxes[i] action = actions[i] # NOQA time = times[i] duration = durations[i] except IndexError: logger.warning( 'Event key values might have different lengths.') continue if time == -1: time = max(self.system.dae.t, 0) + self.system.tds.config.tstep tf = time + duration if duration == 0.: tf = 9999 if name.lower() == 'bus': param = {'tf': time, 'tc': tf, 'bus': idx} self.system.Fault.insert(**param) logger.debug( 'Event <Fault> added for bus %s at t = %.6g and tf = %g', idx, time, tf) elif name.lower() == 'line': bus = self.system.Line.get_field( 'bus1', ['Line_' + str(int(idx - 1))])[0] param = { 'line': 'Line_' + str(idx - 1), 'bus': bus, 't1': time, 't2': tf, 'u1': 1, 'u2': 1 if duration else 0, } self.system.Breaker.insert(**param) logger.debug( 'Event <Breaker> added for line %s at t = %.6g and tf = %g', idx, time, tf) self.system.call.build_vec() self.system.call._compile_int() self.system.dae.rebuild = True
[docs] def sync_and_handle(self): """ Sync until the queue is empty. Handle sync'ed commands. """ if not self.system.config.dime_enabled: return current_devices = self.dimec.devices() # record MiniPMU if not self.has_pmu: for item in current_devices: if item.startswith('PMU_'): self.has_pmu = True # send Varheader, SysParam and Idxvgs to modules on the fly if set(current_devices) != set(self.last_devices): new_devices = list(current_devices) new_devices.remove(self.system.config.dime_name) for item in self.last_devices: if item in new_devices: new_devices.remove(item) self.send_init(new_devices) self.last_devices = current_devices while True: var_names = self.dimec.sync(1) if not var_names: break workspace = self.dimec.workspace for var_name in var_names: var_value = workspace[var_name] if var_name in current_devices: self.record_module_init(var_name, var_value) elif var_name == 'Event': self.handle_event(var_value) else: logger.warning( 'Synced variable {} not handled'.format(var_name))
[docs] def vars_to_pmu(self): """ Broadcast all PMU measurements and BusFreq measurements in the variable `pmudata` """ if not self.system.config.dime_enabled: return if not self.has_pmu: return idx = np.concatenate((self.system.PMU.vm.a, self.system.PMU.am.a, self.system.dae.n + self.system.BusFreq.f.a, )) t = self.system.dae.t.tolist() k = 0 # field `k` is not no use values = self.system.dae.xy[idx] # a 1-d array as opposed to a N-by-1 2-d matrix pmudata = { 't': t, 'k': k, 'vars': values, } self.dimec.broadcast_r(pmudata=pmudata)
[docs] def vars_to_modules(self): """ Stream the results from the last step to modules :return: None """ if not self.system.config.dime_enabled: return for mod in self.ModuleInfo.keys(): # skip PMU modules in this function. offload it to vars_to_pmu() if mod.startswith('PMU_'): continue limitsample = self.ModuleInfo[mod].get('limitsample', 0) idx = self.ModuleInfo[mod]['vgsvaridx'] t = self.system.dae.t.tolist() k = 0 lastk = self.ModuleInfo[mod]['lastk'] if limitsample: every = 1 / self.system.tds.config.tstep / limitsample if (k - lastk) / every < 1: continue else: self.ModuleInfo[mod]['lastk'] = k values = self.system.dae.xy[idx] Varvgs = { 't': t, 'k': k, 'vars': values, 'accurate': values, } self.dimec.send_r(mod, Varvgs=Varvgs) logger.debug("Send Varvgs to module <%s>", mod)
[docs] def finalize(self): """ Send ``DONE`` signal when simulation completes :return: None """ if not self.system.config.dime_enabled: return self.system.streaming.dimec.broadcast_r(DONE=1) self.system.streaming.dimec.close()