Source code for andes.io.psse

"""
PSS/E file parser.

Include a RAW parser and a DYR parser.
"""

import logging
import os
import yaml

import andes.io

from andes.core.symprocessor import resolve_deps
from andes.models import file_classes
from andes.utils.func import list_flatten
from andes.shared import deg2rad, pd
from andes.utils.misc import to_number
from collections import defaultdict

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Per-version format descriptors.
# Each entry describes the block structure and column layout for that version.
# ---------------------------------------------------------------------------

_VERSION_CONFIGS = {
    33: {
        'blocks': [
            'bus', 'load', 'fshunt', 'gen', 'branch', 'transf', 'area',
            'twotermdc', 'vscdc', 'impedcorr', 'mtdc', 'msline', 'zone',
            'interarea', 'owner', 'facts', 'swshunt', 'gne', 'Q',
        ],
        'initial_block_idx': 0,
        'line_counts': [1, 1, 1, 1, 1, 4, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0],
        'transf_block': 5,
        'branch_cols': {
            'bus1': 0, 'bus2': 1, 'ckt': 2,
            'r': 3, 'x': 4, 'b': 5,
            'rate_a': 6, 'rate_b': 7, 'rate_c': 8,
            'gi': 9, 'bi': 10, 'gj': 11, 'bj': 12,
            'u': 13, 'length': 14,
        },
    },
    34: {
        'blocks': [
            'bus', 'load', 'fshunt', 'gen', 'branch', 'swdev', 'transf',
            'area', 'twotermdc', 'vscdc', 'impedcorr', 'mtdc', 'msline',
            'zone', 'interarea', 'owner', 'facts', 'swshunt', 'gne',
            'indmach', 'substation', 'Q',
        ],
        'initial_block_idx': -1,  # extra "END OF SYSTEM-WIDE DATA" delimiter
        'line_counts': [1, 1, 1, 1, 1, 1, 4, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0],
        'transf_block': 6,
        'branch_cols': {
            'bus1': 0, 'bus2': 1, 'ckt': 2,
            'r': 3, 'x': 4, 'b': 5, 'name': 6,
            'rate_a': 7, 'rate_b': 8, 'rate_c': 9,
            'rate_d': 10, 'rate_e': 11, 'rate_f': 12,
            'rate_g': 13, 'rate_h': 14, 'rate_i': 15,
            'rate_j': 16, 'rate_k': 17, 'rate_l': 18,
            'gi': 19, 'bi': 20, 'gj': 21, 'bj': 22,
            'u': 23, 'met': 24, 'length': 25,
        },
    },
}

# v32 is identical to v33 for our purposes
_VERSION_CONFIGS[32] = _VERSION_CONFIGS[33]


[docs]def testlines(infile): """ Check the raw file for frequency base. """ lines_list = andes.io.read_file_like(infile) if len(lines_list) == 0: return False first = lines_list[0] first = first.strip().split('/') first = first[0].split(',') # get raw file version if len(first) >= 3: version = int(first[2]) logger.debug(f'PSSE raw version {version} detected') if version not in _VERSION_CONFIGS: logger.warning(f'RAW file version {version} is not explicitly supported. ' 'Falling back to v33 format. Errors may occur.') return True else: return False
[docs]def get_block_lines(b, mdata, cfg): """ Return the number of lines based on the block index in the RAW file. """ if b == cfg['transf_block']: if mdata[0][2] == 0: # two-winding transformer return 4 else: # three-winding transformer return 5 return cfg['line_counts'][b]
def _parse_csv_with_quotes(line): """ Parse a line of PSS/E data that may contain single-quoted strings with commas or slashes. Parameters ---------- line : str A line from a PSS/E file that needs parsing Returns ------- list List of values with quoted strings preserved as single elements """ # Initialize result list and variables result = [] current = "" in_quotes = False # Handle empty input if not line: return [""] # Process each character for char in line: if char == "'" and not in_quotes: # Starting quotes in_quotes = True current += char elif char == "'" and in_quotes: # Ending quotes in_quotes = False current += char elif char == ',' and not in_quotes: # Field separator outside quotes result.append(current) current = "" else: # Add character to current field current += char # Add the last field result.append(current) # Process each field to remove quotes and strip whitespace for i in range(len(result)): field = result[i] # Remove quotes and strip if field and len(field) >= 2 and field[0] == "'" and field[-1] == "'": field = field[1:-1] result[i] = field.strip() return result def _split_line_with_quoted_parts(line, separator='/'): """ Split a line by a separator character, but preserve the separator inside quoted strings. Parameters ---------- line : str Line to split separator : str Character to split by, defaults to '/' Returns ------- list List of parts """ result = [] current = "" in_quotes = False for char in line: if char == "'" and not in_quotes: in_quotes = True current += char elif char == "'" and in_quotes: in_quotes = False current += char elif char == separator and not in_quotes: result.append(current) current = "" else: current += char if current: result.append(current) return result
[docs]def read(system, file): """ Read PSS/E RAW file v32/v33/v34 formats. """ ret = True mva = 100 version = 0 data = [] mdata = [] # multi-line data dev_line = 0 # line counter for multi-line models # read file into `line_list` line_list = andes.io.read_file_like(file) # detect version from header if line_list: first_parts = _split_line_with_quoted_parts(line_list[0].strip()) first_data = _parse_csv_with_quotes(first_parts[0]) if len(first_data) >= 3: version = int(first_data[2]) cfg = _VERSION_CONFIGS.get(version, _VERSION_CONFIGS[33]) blocks = cfg['blocks'] block_idx = cfg['initial_block_idx'] raw = {item: [] for item in blocks} # parse file into `raw` with to_number conversions for num, line in enumerate(line_list): line = line.strip() # get basemva and nominal frequency if num == 0: parts = _split_line_with_quoted_parts(line) data = _parse_csv_with_quotes(parts[0]) mva = float(data[1]) system.config.mva = mva try: system.config.freq = float(data[5]) except (IndexError, ValueError): logger.debug('System frequency not specified in RAW header. ' 'Defaulting to 60 Hz.') system.config.freq = 60.0 continue elif num == 1 or num == 2: # store the case info line if len(line) > 0: logger.info(" " + line) continue elif num >= 3: if line[0:2] == '0 ' or line[0:3] == ' 0 ': # end of block block_idx += 1 continue elif line[0] == 'Q': # end of file break elif line.startswith('@!'): # v34 column header line continue parts = _split_line_with_quoted_parts(line) data = _parse_csv_with_quotes(parts[0]) data = [to_number(item) for item in data] mdata.append(data) dev_line += 1 block_lines = get_block_lines(block_idx, mdata, cfg) if dev_line >= block_lines: if block_lines == 1: mdata = mdata[0] raw[blocks[block_idx]].append(mdata) mdata = [] dev_line = 0 # add device elements to system bus_params, bus_idx_list, sw = _parse_bus_v33(raw, system) max_bus = max(bus_idx_list) _parse_load_v33(raw, system) _parse_fshunt_v33(raw, system) _parse_gen_v33(raw, system, sw) _parse_line(raw, system, cfg) _parse_transf_v33(raw, system, max_bus) _parse_swshunt_v33(raw, system) _parse_area_v33(raw, system) return ret
def _read_dyr_dict(file): """ Parse dyr file into a dict where keys are model names and values are dataframes. """ input_list = andes.io.read_file_like(file) # concatenate multi-line device data input_concat_dict = defaultdict(list) multi_line = list() for line in input_list: if line == '': continue if '/' not in line: multi_line.append(line) else: multi_line.append(line.split('/')[0]) single_line = ' '.join(multi_line) if single_line.strip() == '': continue single_list = single_line.split("'") psse_model = single_list[1].strip() input_concat_dict[psse_model].append(single_list[0] + ' '.join(single_list[2:])) multi_line = list() # construct pandas dataframe for all models dyr_dict = dict() # input data from dyr file for psse_model, all_rows in input_concat_dict.items(): # DYR files are space-separated, not comma-separated dev_params_num = [([to_number(cell) for cell in row.replace(',', ' ').split()]) for row in all_rows] dyr_dict[psse_model] = pd.DataFrame(dev_params_num) return dyr_dict
[docs]def read_add(system, file): """ Read an addition PSS/E dyr file. Parameters ---------- system : System System instance to which data will be loaded file : str Path to the additional `dyr` file Returns ------- bool data parsing status """ dyr_dict = _read_dyr_dict(file) system.dyr_dict = dyr_dict # read yaml and set header for each pss/e model dirname = os.path.dirname(__file__) with open(f'{dirname}/psse-dyr.yaml', 'r') as f: dyr_yaml = yaml.full_load(f) sorted_models = sort_psse_models(dyr_yaml, system) for psse_model in dyr_dict: if psse_model in dyr_yaml: if 'inputs' in dyr_yaml[psse_model]: dyr_dict[psse_model].columns = dyr_yaml[psse_model]['inputs'] # collect not supported models not_supported = [] for model in dyr_dict: if model not in sorted_models: not_supported.append(model) # print out debug messages if len(dyr_dict): logger.debug('dyr contains models %s', ", ".join(dyr_dict.keys())) if len(not_supported): logger.warning('Models not yet supported: %s', ", ".join(not_supported)) else: logger.debug('All dyr models are supported.') # load data into models for psse_model in sorted_models: if psse_model not in dyr_dict: # device not exist continue if psse_model not in dyr_yaml: logger.error(f"PSS/E Model <{psse_model}> is not supported.") continue logger.debug(f'Parsing PSS/E model {psse_model}') dest = dyr_yaml[psse_model]['destination'] find = {} if 'find' in dyr_yaml[psse_model]: for name, source in dyr_yaml[psse_model]['find'].items(): for model, conditions in source.items(): allow_none = conditions.pop('allow_none', 0) cond_names = conditions.keys() cond_values = [] for col in conditions.values(): if col in find: cond_values.append(find[col]) else: cond_values.append(dyr_dict[psse_model][col]) try: logger.debug("<%s> trying to find <%s> using cond_names=%s and cond_values=%s", psse_model, model, cond_names, cond_values) logger.debug("<%s> contains %d devices", model, system.__dict__[model].n) find[name] = system.__dict__[model].find_idx(cond_names, cond_values, allow_none=allow_none, default=None) except IndexError as e: logger.error("Data file likely contains references to unsupported models.") logger.error(e) return False if 'get' in dyr_yaml[psse_model]: for name, source in dyr_yaml[psse_model]['get'].items(): for model, conditions in source.items(): idx_name = conditions['idx'] if idx_name in dyr_dict[psse_model]: conditions['idx'] = dyr_dict[psse_model][idx_name] else: conditions['idx'] = find[idx_name] find[name] = system.__dict__[model].get(**conditions) if 'outputs' in dyr_yaml[psse_model]: output_keys = list(dyr_yaml[psse_model]['outputs'].keys()) output_exprs = list(dyr_yaml[psse_model]['outputs'].values()) out_dict = {} for idx in range(len(output_exprs)): out_key = output_keys[idx] expr = output_exprs[idx] if expr in find: out_dict[out_key] = find[expr] elif ';' in expr: args, func = expr.split(';') func = eval(func) args = args.split(',') # support local and external model parameters argv = list() for param in args: if '.' in param: argv.append(param.split('.')) else: argv.append((psse_model, param)) argv = [dyr_dict[model][param] for model, param in argv] out_dict[output_keys[idx]] = func(*argv) else: out_dict[output_keys[idx]] = dyr_dict[psse_model][expr] df = pd.DataFrame.from_dict(out_dict) for row in df.to_dict(orient='records'): system.add(dest, row) system.link_ext_param(system.__dict__[dest]) return True
def _parse_bus_v33(raw, system): # version 32: # 0, 1, 2, 3, 4, 5, 6, 7, 8 # ID, NAME, BasekV, Type, Area Zone Owner Vm, Va # out = defaultdict(list) bus_idx_list = list() sw = dict() for data in raw['bus']: idx = data[0] bus_idx_list.append(idx) ty = data[3] a0 = data[8] * deg2rad if ty == 3: sw[idx] = a0 param = {'idx': idx, 'name': data[1], 'Vn': data[2], 'v0': data[7], 'a0': a0, 'area': data[4], 'zone': data[5], 'owner': data[6]} out['Bus'].append(param) _add_devices_from_dict(out, system) return out, bus_idx_list, sw def _parse_load_v33(raw, system): # version 32: # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 # Bus, Id, Status, Area, Zone, PL(MW), QL (MW), IP, IQ, YP, YQ, OWNER mva = system.config.mva out = defaultdict(list) for data in raw['load']: bus = data[0] vn = system.Bus.get(src='Vn', idx=bus, attr='v') v0 = system.Bus.get(src='v0', idx=bus, attr='v') param = {'bus': bus, 'u': data[2], 'Vn': vn, 'p0': (data[5] + data[7] * v0 + data[9] * v0 ** 2) / mva, 'q0': (data[6] + data[8] * v0 - data[10] * v0 ** 2) / mva, 'owner': data[11]} out['PQ'].append(param) _add_devices_from_dict(out, system) return out def _parse_fshunt_v33(raw, system): # 0, 1, 2, 3, 4 # Bus, name, Status, g (MW), b (Mvar) mva = system.config.mva out = defaultdict(list) for data in raw['fshunt']: bus = data[0] vn = system.Bus.get(src='Vn', idx=bus, attr='v') param = {'bus': bus, 'Vn': vn, 'u': data[2], 'Sn': mva, 'g': data[3] / mva, 'b': data[4] / mva} out['Shunt'].append(param) _add_devices_from_dict(out, system) return out def _parse_gen_v33(raw, system, sw): """ Helper function for parsing static generator section. """ # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, # I, ID, PG, QG, QT, QB, VS, IREG, MBASE, ZR, ZX, RT, XT, GTAP, STAT, # 15, 16, 17, 18, 19, ..., 26, 27 # RMPCT, PT, PB, O1, F1, ..., O4, F4, WMOD, WPF # The columns above for v33 is different from the manual of v34.5, which includes two new columns: # `NREG`` at 8 and `BSLOD` before `O1` mva = system.config.mva out = defaultdict(list) gen_idx = 0 for data in raw['gen']: bus = data[0] subidx = data[1] vn = system.Bus.get(src='Vn', idx=bus, attr='v') gen_mva = data[8] gen_idx += 1 status = data[14] wmod = data[26] if len(data) > 26 else 0 param = {'Sn': gen_mva, 'Vn': vn, 'u': status, 'bus': bus, 'subidx': subidx, 'idx': gen_idx, 'p0': data[2] / mva, 'q0': data[3] / mva, 'pmax': data[16] / mva, 'pmin': data[17] / mva, 'qmax': data[4] / mva, 'qmin': data[5] / mva, 'v0': data[6], 'ra': data[9], # ra - armature resistance 'xs': data[10], # xs - synchronous reactance 'wmod': wmod, # generator control mode } if data[0] in sw.keys(): param.update({'a0': sw[data[0]]}) out['Slack'].append(param) else: out['PV'].append(param) _add_devices_from_dict(out, system) return out def _parse_line(raw, system, cfg): # Column indices come from cfg['branch_cols'], which varies by version. # v33: I,J,CKT,R,X,B,RATEA,RATEB,RATEC,GI,BI,GJ,BJ,ST,LEN,O1,F1,... # v34: I,J,CKT,R,X,B,NAME,RATE1..RATE12,GI,BI,GJ,BJ,ST,MET,LEN,... cols = cfg['branch_cols'] out = defaultdict(list) for data in raw['branch']: param = { 'u': data[cols['u']], 'bus1': data[cols['bus1']], 'bus2': data[cols['bus2']], 'r': data[cols['r']], 'x': data[cols['x']], 'b': data[cols['b']], 'rate_a': data[cols['rate_a']], 'rate_b': data[cols['rate_b']], 'rate_c': data[cols['rate_c']], 'Vn1': system.Bus.get(src='Vn', idx=data[cols['bus1']], attr='v'), 'Vn2': system.Bus.get(src='Vn', idx=data[cols['bus2']], attr='v'), } out['Line'].append(param) _add_devices_from_dict(out, system) return out def _parse_transf_v33(raw, system, max_bus): out = defaultdict(list) xf_3_count = 1 for data in raw['transf']: if len(data) == 4: # """ # I,J,K,CKT,CW,CZ,CM,MAG1,MAG2,NMETR,'NAME',STAT,O1,F1,...,O4,F4 # R1-2,X1-2,SBASE1-2 # WINDV1,NOMV1,ANG1,RATA1,RATB1,RATC1,COD1,CONT1,RMA1,RMI1,VMA1,VMI1,NTP1,TAB1,CR1,CX1 # WINDV2,NOMV2 # # """ bus_Vn1 = system.Bus.get(src='Vn', idx=data[0][0], attr='v') bus_Vn2 = system.Bus.get(src='Vn', idx=data[0][1], attr='v') Vn1 = data[2][1] if data[2][1] != 0.0 else bus_Vn1 Vn2 = data[3][1] if data[3][1] != 0.0 else bus_Vn2 transf = True tap = data[2][0] # pu or in kV phi = data[2][2] * deg2rad # `ANG1` is entered in degree; convert to rad rate_a = data[2][3] rate_b = data[2][4] rate_c = data[2][5] # CW - Winding I/O code, 1-turn ratio on pu bus base kV, 2: winding V, 3: turn ratio pu on norn wind V if data[0][4] == 2: tap = (data[2][0] / bus_Vn1) / (data[3][0] / bus_Vn2) elif data[0][4] == 3: tap = tap * (Vn1 / bus_Vn1) / (Vn2 / bus_Vn2) # PSS/E convention: WINDV=0 means use nominal (1.0 per-unit tap ratio) if tap == 0.0: tap = 1.0 # CZ - Z code, 1-system base, 2-winding base, 3-load loss and |z| if data[0][5] == 1: Sn = system.config.mva elif data[0][5] == 2: Sn = data[1][2] elif data[0][5] == 3: # CZ=3: Load loss & |Z| # Convert power loss and impedance magnitude to R and X on winding base Sn = data[1][2] # Use winding base MVA # Convert load loss (W) to R (pu on winding base) # R = W / (SBASE_winding * 1e6) r_pu_wb = data[1][0] / (Sn * 1e6) # Calculate X from |Z| and R: X = sqrt(|Z|^2 - R^2) # Handle numeric issues - ensure we don't get imaginary numbers if data[1][1]**2 > r_pu_wb**2: x_pu_wb = (data[1][1]**2 - r_pu_wb**2)**0.5 else: # If |Z| is too small compared to R, assume X is very small logger.warning(f"Branch {data[0][0]}-{data[0][1]}:") logger.warning(" CZ=3 conversion issue: |Z|^2 < R^2. Setting X to small value.") x_pu_wb = 1e-6 # Replace the data[1][0] and data[1][1] with calculated R and X data[1][0] = r_pu_wb data[1][1] = x_pu_wb # Now it's in CZ=2 format and will be processed accordingly else: logger.warning('Unknown impedance code %s', data[0][5]) # CM - Y code, 1-system base, 2-No load loss and exc. loss if data[0][6] == 2: # CM=2: No load loss & exc. loss mag1 = data[0][7] # No-load loss in watts mag2 = data[0][8] # Excitation current in pu # Vbase for winding 1 (kV) Vn1 = data[2][1] if data[2][1] != 0.0 else bus_Vn1 # Step 1: Convert power loss to conductance (G) in Siemens # G [S] = MAG1 / VNOM1_kV^2 / 1e6 g_s = mag1 / (Vn1 ** 2) / 1e6 # Step 2: Calculate B in Siemens from excitation current # First convert excitation current to |Y| in Siemens y_mag_s = abs(mag2) * system.config.mva / (Vn1**2) # B [S] = sqrt(|Y|^2 - G^2) # Handle numeric issues if y_mag_s**2 > g_s**2: b_s = (y_mag_s**2 - g_s**2)**0.5 logger.info(f"CM=2 Conversion ok: G={g_s}, B={b_s}") else: # If |Y| is too small compared to G, assume B is very small logger.warning("CM=2 conversion issue: |Y|^2 < G^2. Setting B to small value.") b_s = 1e-6 # Convert back to per unit on system base g_pu = g_s * (Vn1**2) / system.config.mva b_pu = b_s * (Vn1**2) / system.config.mva # Replace MAG1 and MAG2 with calculated G and B in pu data[0][7] = g_pu data[0][8] = b_pu elif data[0][6] != 1: logger.warning('Unknown magnetizing admittance code %s', data[0][6]) param = {'bus1': data[0][0], 'bus2': data[0][1], 'u': data[0][11], 'b': data[0][8], 'r': data[1][0], 'x': data[1][1], 'trans': transf, 'tap': tap, 'phi': phi, 'Sn': Sn, 'Vn1': Vn1, 'Vn2': Vn2, 'rate_a': rate_a, 'rate_b': rate_b, 'rate_c': rate_c, } out['Line'].append(param) else: # I, J, K, CKT, CW, CZ, CM, MAG1, MAG2, NMETR, 'NAME', STAT, Ol, Fl,...,o4, F4 # R1-2, X1-2, SBASE1-2, R2-3, X2-3, SBASE2-3, R3-1, X3-1, SBASE3-1, VMSTAR, ANSTAR # WINDV1, NOMV1, ANG1, RATA1, BATB1, RATC1, COD1, CONT1, RMA1, RMI1, VMA1, VMI1, NTP1, TAB1, CR1, CX1 # WINDV2, NOMV2, ANG2, RATA2, BATB2, RATC2, COD2, CONT2, RMA2, RMI2, VMA2, VMI2, NTP2, TAB2, CR2, CX2 # WINDV3, NOMV3, ANG3, RATA3, BATB3, RATC3, COD3, CONT3, RMA3, RMI3, VMA3, VMI3, NTP3, TAB3, CR3, CX3 # Process CZ=3 and CM=2 for 3-winding transformers if data[0][5] == 3: # CZ=3 data = _process_3wt_cz3(data, system) # After processing, treat this as CZ=2 data[0][5] = 2 if data[0][6] == 2: # CM=2 data = _process_3wt_cm2(data, system) # After processing, treat this as CM=1 data[0][6] = 1 new_bus = data[0][2] + 1 if new_bus in system.Bus.idx.v: new_bus = max_bus + xf_3_count logger.debug('Added bus <%s> for 3-winding transformer <%s-%s-%s>', new_bus, data[0][0], data[0][1], data[0][2]) # Assign `area`, `owner`, and `zone` using the high-voltage side bus values high_voltage_bus = data[0][0] area = system.Bus.get(src='area', attr='v', idx=high_voltage_bus) zone = system.Bus.get(src='zone', attr='v', idx=high_voltage_bus) owner = system.Bus.get(src='owner', attr='v', idx=high_voltage_bus) param = {'idx': new_bus, 'name': '_'.join([str(i) for i in data[0][:3]]), 'Vn': 1.0, 'v0': data[1][-2], 'a0': data[1][-1] * deg2rad, 'area': area, 'owner': owner, 'zone': zone, } out['Bus'].append(param) # Get the original impedance values r_12 = data[1][0] x_12 = data[1][1] r_23 = data[1][3] x_23 = data[1][4] r_31 = data[1][6] x_31 = data[1][7] # Convert to system base if CZ=2 if data[0][5] == 2: # Convert from winding base to system base sbase = system.config.mva r_12 = r_12 * sbase / data[1][2] # SBASE1-2 x_12 = x_12 * sbase / data[1][2] r_23 = r_23 * sbase / data[1][5] # SBASE2-3 x_23 = x_23 * sbase / data[1][5] r_31 = r_31 * sbase / data[1][8] # SBASE3-1 x_31 = x_31 * sbase / data[1][8] # Calculate star-point resistances and reactances # These values are on system base after the conversion above r = [] x = [] r.append((r_12 + r_31 - r_23)/2) r.append((r_23 + r_12 - r_31)/2) r.append((r_31 + r_23 - r_12)/2) x.append((x_12 + x_31 - x_23)/2) x.append((x_23 + x_12 - x_31)/2) x.append((x_31 + x_23 - x_12)/2) for i in range(0, 3): # Always use system base after conversion Sn = system.config.mva # Set magnetizing conductance and susceptance for winding 1 only (first branch) if i == 0: g1_value = data[0][7] # Magnetizing conductance (G) b1_value = data[0][8] # Magnetizing susceptance (B) else: g1_value = 0.0 # No magnetization for other windings b1_value = 0.0 # No magnetization for other windings # PSS/E convention: WINDV=0 means use nominal (1.0 per-unit tap ratio) tap_value = data[2+i][0] if data[2+i][0] != 0.0 else 1.0 param = {'trans': True, 'bus1': data[0][i], 'bus2': new_bus, 'g1': g1_value, 'b1': b1_value, 'r': r[i], 'x': x[i], 'tap': tap_value, 'phi': data[2+i][2] * deg2rad, 'Vn1': system.Bus.get(src='Vn', idx=data[0][i], attr='v'), 'Vn2': 1.0, 'Sn': Sn, } out['Line'].append(param) xf_3_count += 1 _add_devices_from_dict(out, system) return out, xf_3_count def _parse_swshunt_v33(raw, system): # I, MODSW, ADJM, STAT, VSWHI, VSWLO, SWREM, RMPCT, RMIDNT, # BINIT, N1, B1, N2, B2, ... N8, B8 out = defaultdict(list) mva = system.config.mva for data in raw['swshunt']: bus = data[0] vn = system.Bus.get(src='Vn', idx=bus, attr='v') param = {'bus': bus, 'Vn': vn, 'Sn': mva, 'u': data[3], 'b': data[9] / mva} out['Shunt'].append(param) _add_devices_from_dict(out, system) return out def _parse_area_v33(raw, system): out = defaultdict(list) for data in raw['area']: # ID, ISW, PDES, PTOL, ARNAME param = {'idx': data[0], 'name': data[4], # 'isw': data[1], # 'pdes': data[2], # 'ptol': data[3], } out['Area'].append(param) for data in raw['zone']: # """ID, NAME""" param = {'idx': data[0], 'name': data[1]} # TODO: add back # system.add('Zone', param) _add_devices_from_dict(out, system) return out def _add_devices_from_dict(params, system): """ Add devices from a dict, where the key is the model name, and the value is a list of parameters. """ for name, plist in params.items(): for p in plist: system.add(name, p)
[docs]def sort_psse_models(dyr_yaml, system): """ Sort supported models so that model names are ordered by dependency. Dependency is determined by checking the ``find`` key in ``psse-dyr.yaml`` for each model. Returns ------- list The sequence of model names for loading parameters. """ _, andes_models = map(list, zip(*file_classes)) andes_models = list_flatten(andes_models) graph = defaultdict(list) for psse_model in dyr_yaml: # build model dependency graph if 'find' in dyr_yaml[psse_model]: # below is a list of `dict_keys` dep_models = [item.keys() for item in dyr_yaml[psse_model]['find'].values()] # extract strings from single-element `dict_keys` dep_models = [list(item)[0] for item in dep_models] graph[psse_model].extend(dep_models) # resolve PSS/E model dependency based on `find` # `find` requires the model indices to have existed for key in graph: value = list(graph[key]) for item in graph[key]: if item in system.groups: value.remove(item) value.extend(list(system.groups[item].models.keys())) graph[key] = value sequence = resolve_deps(graph) # does not support ciruclar dependencies for item in sequence: if not isinstance(item, str): raise NotImplementedError("Circular depencency exists for models %s. %s". item, "Please report this bug.") # prepend no-dependency models to the front nodep_models = list() for item in andes_models: if item not in sequence: nodep_models.append(item) return nodep_models + sequence
def _process_3wt_cz3(data, system): """ Process 3-winding transformer data with CZ=3 (Load loss & |Z|). Converts power loss (W) and impedance magnitude to R and X on winding base (CZ=2). Parameters ---------- data : list Multi-line transformer data system : System The ANDES system object Returns ------- list Updated data with converted R and X values """ # For each winding pair, convert load loss and |Z| to R and X # Winding pairs are 1-2, 2-3, and 3-1 sbase = [data[1][2], data[1][5], data[1][8]] # SBASE1-2, SBASE2-3, SBASE3-1 # Data indices for each winding pair indices = [ (0, 1), # R1-2, X1-2 indices (3, 4), # R2-3, X2-3 indices (6, 7), # R3-1, X3-1 indices ] for i, (loss_idx, z_idx) in enumerate(indices): # Load loss in watts loss = data[1][loss_idx] # |Z| in pu on winding base z_mag = data[1][z_idx] # Convert load loss to R in pu on winding base r_pu = loss / (sbase[i] * 1e6) # Calculate X from |Z| and R: X = sqrt(|Z|^2 - R^2) # Handle numeric issues - ensure we don't get imaginary numbers if z_mag**2 > r_pu**2: x_pu = (z_mag**2 - r_pu**2)**0.5 else: # If |Z| is too small compared to R, assume X is very small logger.warning(f"CZ=3 conversion issue: |Z|^2 < R^2 for winding pair {i+1}. Setting X to small value.") x_pu = 1e-6 # Replace the original values with calculated R and X data[1][loss_idx] = r_pu data[1][z_idx] = x_pu return data def _process_3wt_cm2(data, system): """ Process 3-winding transformer data with CM=2 (No load loss & exc. loss). Converts no-load loss and excitation current to G and B on system base (CM=1). Parameters ---------- data : list Multi-line transformer data system : System The ANDES system object Returns ------- list Updated data with converted G and B values """ # Get no-load loss in watts and excitation current in pu mag1 = data[0][7] # No-load loss in watts mag2 = data[0][8] # Excitation current in pu # Get the rated voltage of winding 1 (kV) Vn1 = data[2][1] if data[2][1] != 0.0 else system.Bus.get(src='Vn', idx=data[0][0], attr='v') # Convert power loss to conductance (G) in Siemens # G [S] = MAG1 / VNOM1_V^2 / 1e6 g_s = mag1 / (Vn1 ** 2) / 1e6 # Convert |Y| pu to actual in Siemens # |Y| [S] = |MAG2| * SBASE_MVA / NOMV1_kV^2 y_mag_s = abs(mag2) * system.config.mva / (Vn1**2) # Calculate B in Siemens # B [S] = sqrt(|Y|^2 - G^2) # Handle numeric issues if y_mag_s**2 > g_s**2: b_s = (y_mag_s**2 - g_s**2)**0.5 logger.info(f"CM=2 Conversion ok: G={g_s}, B={b_s}") else: # If |Y| is too small compared to G, assume B is very small logger.warning(f"Branch {data[0][0]}-{data[0][1]}-{data[0][2]}:") logger.warning(" CM=2 conversion issue: |Y|^2 < G^2. Setting B to small value.") b_s = 1e-6 # Convert G and B to per unit on system base g_pu = g_s * (Vn1**2) / system.config.mva b_pu = b_s * (Vn1**2) / system.config.mva # Replace MAG1 and MAG2 with calculated G and B in pu data[0][7] = g_pu data[0][8] = b_pu return data