Source code for andes.io.matpower

""" Simple MATPOWER format parser
"""
import logging
import re

import andes.io
import numpy as np
from andes.shared import deg2rad, rad2deg

logger = logging.getLogger(__name__)


[docs]def testlines(infile): """ Test if this file is in the MATPOWER format. NOT YET IMPLEMENTED. """ return True # hard coded
[docs]def read(system, file): """ Read a MATPOWER data file into mpc, and build andes device elements. """ mpc = m2mpc(file) return mpc2system(mpc, system)
[docs]def m2mpc(infile: str) -> dict: """ Parse MATPOWER file and return a dictionary with the data. Supported fields include - baseMVA - bus - bus_names - gen - branch - gencost (parsed but not used) - areas (parsed but not used) Parameters ---------- infile : str Path to the MATPOWER file. Returns ------- dict mpc struct names : numpy arrays """ func = re.compile(r'function\s') mva = re.compile(r'\s*mpc.baseMVA\s*=\s*') bus = re.compile(r'\s*mpc.bus\s*=\s*\[?') gen = re.compile(r'\s*mpc.gen\s*=\s*\[') branch = re.compile(r'\s*mpc.branch\s*=\s*\[') area = re.compile(r'\s*mpc.areas\s*=\s*\[') gencost = re.compile(r'\s*mpc.gencost\s*=\s*\[') bus_name = re.compile(r'\s*mpc.bus_name\s*=\s*{') end = re.compile(r'\s*\];?') has_digit = re.compile(r'.*\d+\s*]?;?') field = None info = True mpc = { 'version': 2, # not in use 'baseMVA': 100, 'bus': [], 'gen': [], 'branch': [], 'area': [], 'gencost': [], 'bus_name': [], } input_list = andes.io.read_file_like(infile) for line in input_list: line = line.strip().rstrip(';') if not line: continue elif func.search(line): # skip function declaration continue elif len(line.split('%')[0]) == 0: if info is True: logger.info(line[1:]) info = False else: continue elif mva.search(line): mpc["baseMVA"] = float(line.split('=')[1]) if not field: if bus.search(line): field = 'bus' elif gen.search(line): field = 'gen' elif branch.search(line): field = 'branch' elif area.search(line): field = 'area' elif gencost.search(line): field = 'gencost' elif bus_name.search(line): field = 'bus_name' else: continue elif end.search(line): field = None continue # parse mpc sections if field: if line.find('=') >= 0: line = line.split('=')[1] if line.find('[') >= 0: line = re.sub(r'\[', '', line) elif line.find('{') >= 0: line = re.sub(r'{', '', line) if line.find('\'') >= 0: # bus_name line = line.split(';') data = [i.strip('\'').strip() for i in line] mpc['bus_name'].extend(data) else: if not has_digit.search(line): continue line = line.split('%')[0].strip() line = line.split(';') for item in line: if not has_digit.search(item): continue try: data = np.array([float(val) for val in item.split()]) except Exception as e: logger.error('Error parsing "%s"', infile) raise e mpc[field].append(data) # convert mpc to np array mpc_array = dict() for key, val in mpc.items(): if isinstance(val, (float, int)): mpc_array[key] = val elif isinstance(val, list): if len(val) == 0: continue if "name" in key: mpc_array[key] = np.array(val, dtype=object) else: mpc_array[key] = np.array(val) else: raise NotImplementedError("Unkonwn type for mpc, ", type(val)) return mpc_array
[docs]def mpc2system(mpc: dict, system) -> bool: """ Load an mpc dict into an empty ANDES system. Parameters ---------- system : andes.system.System Empty system to load the data into. mpc : dict mpc struct names : numpy arrays Returns ------- bool True if successful, False otherwise. """ # list of buses with slack gen sw = [] system.config.mva = base_mva = mpc['baseMVA'] for data in mpc['bus']: # idx ty pd qd gs bs area vmag vang baseKV zone vmax vmin # 0 1 2 3 4 5 6 7 8 9 10 11 12 idx = int(data[0]) ty = data[1] if ty == 3: sw.append(idx) pd = data[2] / base_mva qd = data[3] / base_mva gs = data[4] / base_mva bs = data[5] / base_mva area = data[6] vmag = data[7] vang = data[8] * deg2rad baseKV = data[9] if baseKV == 0: baseKV = 110 zone = data[10] vmax = data[11] vmin = data[12] system.add('Bus', idx=idx, name='Bus ' + str(idx), Vn=baseKV, v0=vmag, a0=vang, vmax=vmax, vmin=vmin, area=area, zone=zone) if pd != 0 or qd != 0: system.add('PQ', bus=idx, name='PQ ' + str(idx), Vn=baseKV, p0=pd, q0=qd) if gs or bs: system.add('Shunt', bus=idx, name='Shunt ' + str(idx), Vn=baseKV, g=gs, b=bs) gen_idx = 0 for data in mpc['gen']: # bus pg qg qmax qmin vg mbase status pmax pmin pc1 pc2 # 0 1 2 3 4 5 6 7 8 9 10 11 # qc1min qc1max qc2min qc2max ramp_agc ramp_10 ramp_30 ramp_q # 12 13 14 15 16 17 18 19 # apf # 20 bus_idx = int(data[0]) gen_idx += 1 vg = data[5] status = int(data[7]) mbase = base_mva pg = data[1] / mbase qg = data[2] / mbase qmax = data[3] / mbase qmin = data[4] / mbase pmax = data[8] / mbase pmin = data[9] / mbase uid = system.Bus.idx2uid(bus_idx) vn = system.Bus.Vn.v[uid] a0 = system.Bus.a0.v[uid] if bus_idx in sw: system.add('Slack', idx=gen_idx, bus=bus_idx, busr=bus_idx, name='Slack ' + str(bus_idx), u=status, Vn=vn, v0=vg, p0=pg, q0=qg, a0=a0, pmax=pmax, pmin=pmin, qmax=qmax, qmin=qmin) else: system.add('PV', idx=gen_idx, bus=bus_idx, busr=bus_idx, name='PV ' + str(bus_idx), u=status, Vn=vn, v0=vg, p0=pg, q0=qg, pmax=pmax, pmin=pmin, qmax=qmax, qmin=qmin) for data in mpc['branch']: # fbus tbus r x b rateA rateB rateC ratio angle # 0 1 2 3 4 5 6 7 8 9 # status angmin angmax Pf Qf Pt Qt # 10 11 12 13 14 15 16 fbus = int(data[0]) tbus = int(data[1]) r = data[2] x = data[3] b = data[4] rate_a = data[5] rate_b = data[6] rate_c = data[7] status = int(data[10]) if (data[8] == 0.0) or (data[8] == 1.0 and data[9] == 0.0): # not a transformer tf = False tap_raio = 1 phase_shift = 0 else: tf = True tap_raio = data[8] phase_shift = data[9] * deg2rad vf = system.Bus.Vn.v[system.Bus.idx2uid(fbus)] vt = system.Bus.Vn.v[system.Bus.idx2uid(tbus)] system.add('Line', u=status, name=f'Line {fbus:.0f}-{tbus:.0f}', Vn1=vf, Vn2=vt, bus1=fbus, bus2=tbus, r=r, x=x, b=b, trans=tf, tap=tap_raio, phi=phase_shift, rate_a=rate_a, rate_b=rate_b, rate_c=rate_c) if ('bus_name' in mpc) and (len(mpc['bus_name']) == len(system.Bus.name.v)): system.Bus.name.v[:] = mpc['bus_name'] return True
def _get_bus_id_caller(bus): """ Helper function to get the bus id. If any of bus ``idx`` is a string, use ``uid`` + 1. Otherwise, use ``idx``. Parameters ---------- bus : andes.models.bus.Bus Bus object Returns ------- lambda function to that takes bus idx and returns bus id for matpower case """ if np.array(bus.idx.v).dtype == object: return lambda x: bus.idx2uid(x) + 1 else: return lambda x: x
[docs]def system2mpc(system) -> dict: """ Convert data from an ANDES system to an mpc dict. In the ``gen`` section, slack generators preceeds PV generators. """ mpc = dict(version='2', baseMVA=system.config.mva, bus=np.zeros((system.Bus.n, 13), dtype=np.float64), gen=np.zeros((system.PV.n + system.Slack.n, 21), dtype=np.float64), branch=np.zeros((system.Line.n, 17), dtype=np.float64), bus_name=np.zeros((system.Bus.n, ), dtype=object), ) base_mva = system.config.mva to_busid = _get_bus_id_caller(system.Bus) # --- bus --- bus = mpc['bus'] gen = mpc['gen'] bus[:, 0] = to_busid(system.Bus.idx.v) bus[:, 1] = 1 bus[:, 7] = system.Bus.v0.v bus[:, 8] = system.Bus.a0.v * rad2deg bus[:, 9] = system.Bus.Vn.v bus[:, 11] = system.Bus.vmax.v bus[:, 12] = system.Bus.vmin.v # area and zone not supported # --- PQ --- if system.PQ.n > 0: pq_pos = system.Bus.idx2uid(system.PQ.bus.v) bus[pq_pos, 2] = system.PQ.p0.v * base_mva bus[pq_pos, 3] = system.PQ.q0.v * base_mva # --- Shunt --- if system.Shunt.n > 0: shunt_pos = system.Bus.idx2uid(system.Shunt.bus.v) bus[shunt_pos, 4] = system.Shunt.g.v * base_mva bus[shunt_pos, 5] = system.Shunt.b.v * base_mva # --- PV --- if system.PV.n > 0: pv_pos = system.Bus.idx2uid(system.PV.bus.v) bus[pv_pos, 1] = 2 gen[system.Slack.n:, 0] = to_busid(system.PV.bus.v) gen[system.Slack.n:, 1] = system.PV.p0.v * base_mva gen[system.Slack.n:, 2] = system.PV.q0.v * base_mva gen[system.Slack.n:, 3] = system.PV.qmax.v * base_mva gen[system.Slack.n:, 4] = system.PV.qmin.v * base_mva gen[system.Slack.n:, 5] = system.PV.v0.v gen[system.Slack.n:, 6] = base_mva gen[system.Slack.n:, 7] = system.PV.u.v gen[system.Slack.n:, 8] = system.PV.pmax.v * base_mva gen[system.Slack.n:, 9] = system.PV.pmin.v * base_mva # --- Slack --- if system.Slack.n > 0: slack_pos = system.Bus.idx2uid(system.Slack.bus.v) bus[slack_pos, 1] = 3 bus[slack_pos, 8] = system.Slack.a0.v * rad2deg gen[:system.Slack.n, 0] = to_busid(system.Slack.bus.v) gen[:system.Slack.n, 1] = system.Slack.p0.v * base_mva gen[:system.Slack.n, 2] = system.Slack.q0.v * base_mva gen[:system.Slack.n, 3] = system.Slack.qmax.v * base_mva gen[:system.Slack.n, 4] = system.Slack.qmin.v * base_mva gen[:system.Slack.n, 5] = system.Slack.v0.v gen[:system.Slack.n, 6] = base_mva gen[:system.Slack.n, 7] = system.Slack.u.v gen[:system.Slack.n, 8] = system.Slack.pmax.v * base_mva gen[:system.Slack.n, 9] = system.Slack.pmin.v * base_mva if system.Line.n > 0: branch = mpc['branch'] branch[:, 0] = to_busid(system.Line.bus1.v) branch[:, 1] = to_busid(system.Line.bus2.v) branch[:, 2] = system.Line.r.v branch[:, 3] = system.Line.x.v branch[:, 4] = system.Line.b.v branch[:, 5] = system.Line.rate_a.v branch[:, 6] = system.Line.rate_b.v branch[:, 7] = system.Line.rate_c.v branch[:, 8] = system.Line.tap.v branch[:, 9] = system.Line.phi.v * rad2deg branch[:, 10] = system.Line.u.v mpc['bus_name'] = np.array(system.Bus.name.v) return mpc