Source code for line_solver.api.sn.network_struct

"""
NetworkStruct - Native Python implementation.

This dataclass summarizes the characteristics of a Network object,
providing all parameters needed for queueing network analysis.

Ported from MATLAB implementation.
"""

import numpy as np
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple, Union
from enum import IntEnum


[docs] class MatrixArray(np.ndarray): """ Numpy array subclass with .get() and .set() methods for API compatibility. This class provides compatibility with the wrapper mode that uses JLine's Matrix class which has get(i, j) and set(i, j, value) methods. """
[docs] def __new__(cls, input_array): """Create MatrixArray from existing array.""" obj = np.asarray(input_array).view(cls) return obj
[docs] def __array_finalize__(self, obj): """Handle view casting and new-from-template.""" pass
[docs] def __getitem__(self, key): """ Override indexing to handle 2D indexing on 1D arrays. This provides compatibility with MATLAB-style row/column vectors where a 1D array can be indexed as (0, j) or (i, 0). """ # Handle 2D tuple indexing on 1D arrays if isinstance(key, tuple) and len(key) == 2 and self.ndim == 1: i, j = key # For row vector style (0, j) -> return element j if i == 0: return super().__getitem__(j) # For column vector style (i, 0) -> return element i elif j == 0: return super().__getitem__(i) return super().__getitem__(key)
[docs] def __setitem__(self, key, value): """ Override item setting to handle 2D indexing on 1D arrays. """ # Handle 2D tuple indexing on 1D arrays if isinstance(key, tuple) and len(key) == 2 and self.ndim == 1: i, j = key # For row vector style (0, j) -> set element j if i == 0: super().__setitem__(j, value) return # For column vector style (i, 0) -> set element i elif j == 0: super().__setitem__(i, value) return super().__setitem__(key, value)
[docs] def get(self, i, j=None): """ Get element at index (i, j) or just i if 1D. Args: i: Row index (or element index for 1D) j: Column index (optional, for 2D arrays) Returns: Element value at the specified index """ if j is None: return self[i] # For 1D arrays, handle MATLAB-style row/column vector indexing if self.ndim == 1: # Row vector style: (0, j) -> element j if i == 0: return self[j] # Column vector style: (i, 0) -> element i elif j == 0: return self[i] # Otherwise just return element at first index return self[i] return self[i, j]
[docs] def set(self, i, j, value=None): """ Set element at index (i, j) or just i if 1D. Args: i: Row index (or element index for 1D) j: Column index or value (for 1D arrays) value: Value to set (optional, for 2D arrays) """ if value is None: # Called as set(i, value) for 1D self[i] = j else: # Called as set(i, j, value) for 2D self[i, j] = value
[docs] class NodeType(IntEnum): """Node types in a queueing network. NOTE: Values must match lang/base.py NodeType enum. """ SOURCE = 0 SINK = 1 QUEUE = 2 DELAY = 3 FORK = 4 JOIN = 5 CACHE = 6 ROUTER = 7 CLASSSWITCH = 8 PLACE = 9 TRANSITION = 10 LOGGER = 11 FINITE_CAPACITY_REGION = 12
[docs] @staticmethod def toText(node_type: 'NodeType') -> str: """Convert node type to text representation.""" names = { NodeType.SOURCE: 'Source', NodeType.SINK: 'Sink', NodeType.QUEUE: 'Queue', NodeType.DELAY: 'Delay', NodeType.FORK: 'Fork', NodeType.JOIN: 'Join', NodeType.CACHE: 'Cache', NodeType.ROUTER: 'Router', NodeType.CLASSSWITCH: 'ClassSwitch', NodeType.PLACE: 'Place', NodeType.TRANSITION: 'Transition', NodeType.LOGGER: 'Logger', NodeType.FINITE_CAPACITY_REGION: 'Region', } return names.get(node_type, f'Unknown({node_type})')
[docs] class SchedStrategy(IntEnum): """Scheduling strategies.""" FCFS = 0 # First-Come First-Served LCFS = 1 # Last-Come First-Served LCFSPR = 2 # LCFS Preemptive Resume LCFSPI = 3 # LCFS Preemptive Identical PS = 4 # Processor Sharing DPS = 5 # Discriminatory PS GPS = 6 # Generalized PS INF = 7 # Infinite Server (Delay) RAND = 8 # Random HOL = 9 # Head of Line SEPT = 10 # Shortest Expected Processing Time LEPT = 11 # Longest Expected Processing Time SIRO = 12 # Service in Random Order SJF = 13 # Shortest Job First LJF = 14 # Longest Job First POLLING = 15 EXT = 16 # External LPS = 17 # Least Progress Scheduling SETF = 18 # Shortest Elapsed Time First DPSPRIO = 19 # DPS with Priority GPSPRIO = 20 # GPS with Priority PSPRIO = 21 # PS with Priority
[docs] class RoutingStrategy(IntEnum): """Routing strategies.""" DISABLED = 0 RAND = 1 # Random PROB = 2 # Probabilistic RROBIN = 3 # Round Robin WRROBIN = 4 # Weighted Round Robin JSQ = 5 # Join Shortest Queue RL = 6 # Reinforcement Learning KCHOICES = 7 # K-Choices CLASS_SWITCH = 8
[docs] class DropStrategy(IntEnum): """Drop strategies for finite capacity.""" WAITQ = 0 # Wait in queue DROP = 1 # Drop on full BAS = 2 # Blocking after service
[docs] @dataclass class NetworkStruct: """ Data structure summarizing network characteristics. This class is the Python equivalent in native Python. It contains all parameters needed by solvers to analyze a queueing network. Attributes: nstations: Number of stations (queues, delays, sources, joins, places) nstateful: Number of stateful nodes nnodes: Total number of nodes nclasses: Number of job classes nchains: Number of chains (routing chains) nclosedjobs: Total number of jobs in closed classes njobs: (1, K) Population per class (inf for open classes) nservers: (M, 1) Number of servers per station rates: (M, K) Service rates scv: (M, K) Squared coefficient of variation visits: Dict[int, ndarray] - Chain ID -> (M, K) visit ratios inchain: Dict[int, ndarray] - Chain ID -> class indices in chain chains: (K, 1) Chain membership per class refstat: (K, 1) Reference station per class refclass: (1, C) Reference class per chain sched: Dict[int, SchedStrategy] - Station ID -> scheduling strategy routing: (N, K) Routing strategy matrix rt: Routing probability matrix nodetype: List[NodeType] - Node types isstation: (N, 1) Boolean mask for stations isstateful: (N, 1) Boolean mask for stateful nodes nodeToStation: (N, 1) Node index -> station index mapping nodeToStateful: (N, 1) Node index -> stateful index mapping stationToNode: (M, 1) Station index -> node index mapping stationToStateful: (M, 1) Station index -> stateful index mapping statefulToNode: (S, 1) Stateful index -> node index mapping statefulToStation: (S, 1) Stateful index -> station index mapping state: Dict State per stateful node lldscaling: (M, Nmax) Load-dependent scaling matrix cdscaling: Class-dependent scaling functions cap: (M, 1) Station capacities classcap: (M, K) Per-class capacities connmatrix: (N, N) Connection matrix nodenames: List[str] - Node names classnames: List[str] - Class names """ # Dimensions nstations: int = 0 nstateful: int = 0 nnodes: int = 0 nclasses: int = 0 nchains: int = 0 nclosedjobs: int = 0 # Population and capacity njobs: np.ndarray = field(default_factory=lambda: np.array([])) nservers: np.ndarray = field(default_factory=lambda: np.array([])) cap: Optional[np.ndarray] = None classcap: Optional[np.ndarray] = None # Service parameters rates: np.ndarray = field(default_factory=lambda: np.array([])) scv: np.ndarray = field(default_factory=lambda: np.array([])) phases: Optional[np.ndarray] = None phasessz: Optional[np.ndarray] = None phaseshift: Optional[np.ndarray] = None # Network structure - chains visits: Dict[int, np.ndarray] = field(default_factory=dict) nodevisits: Dict[int, np.ndarray] = field(default_factory=dict) inchain: Dict[int, np.ndarray] = field(default_factory=dict) chains: np.ndarray = field(default_factory=lambda: np.array([])) refstat: np.ndarray = field(default_factory=lambda: np.array([])) refclass: np.ndarray = field(default_factory=lambda: np.array([])) # Scheduling and routing sched: Dict[int, int] = field(default_factory=dict) # station_id -> SchedStrategy schedparam: Optional[np.ndarray] = None routing: np.ndarray = field(default_factory=lambda: np.array([])) rt: Optional[np.ndarray] = None rtnodes: Optional[np.ndarray] = None # Node classification nodetype: List[int] = field(default_factory=list) isstation: np.ndarray = field(default_factory=lambda: np.array([])) isstateful: np.ndarray = field(default_factory=lambda: np.array([])) isstatedep: Optional[np.ndarray] = None # (N, 3) - buffer, srv, routing # Node mappings (0-indexed) nodeToStation: np.ndarray = field(default_factory=lambda: np.array([])) nodeToStateful: np.ndarray = field(default_factory=lambda: np.array([])) stationToNode: np.ndarray = field(default_factory=lambda: np.array([])) stationToStateful: np.ndarray = field(default_factory=lambda: np.array([])) statefulToNode: np.ndarray = field(default_factory=lambda: np.array([])) statefulToStation: np.ndarray = field(default_factory=lambda: np.array([])) # State information state: Dict[int, np.ndarray] = field(default_factory=dict) stateprior: Dict[int, np.ndarray] = field(default_factory=dict) space: Dict[int, np.ndarray] = field(default_factory=dict) # Load-dependent scaling lldscaling: Optional[np.ndarray] = None # (M, Nmax) - limited load dependence cdscaling: Optional[Dict] = None # class-dependent scaling functions # Class properties classprio: Optional[np.ndarray] = None # (1, K) - class priorities classdeadline: Optional[np.ndarray] = None # (1, K) - class deadlines isslc: Optional[np.ndarray] = None # (K, 1) - is self-looping class issignal: Optional[np.ndarray] = None # (K, 1) - is signal class signaltype: Optional[List] = None # signal types syncreply: Optional[np.ndarray] = None # (K, 1) - sync reply class mapping # Connectivity connmatrix: Optional[np.ndarray] = None # (N, N) connection matrix # Naming nodenames: List[str] = field(default_factory=list) classnames: List[str] = field(default_factory=list) # Process information mu: Optional[Dict] = None # station -> class -> service rate matrix phi: Optional[Dict] = None # station -> class -> service phase probs proc: Optional[Dict] = None # station -> class -> process cell pie: Optional[Dict] = None # station -> class -> initial phase probs procid: Optional[Dict] = None # station -> class -> process type lst: Optional[Dict] = None # station -> class -> Laplace-Stieltjes transform # Fork-join fj: Optional[np.ndarray] = None # fork-join topology matrix # Drop rules droprule: Optional[Dict] = None # station -> class -> drop strategy # Finite capacity regions nregions: int = 0 region: Optional[List] = None regionrule: Optional[np.ndarray] = None regionweight: Optional[np.ndarray] = None # Matrix(F, K) - class weights per region regionsz: Optional[np.ndarray] = None # Matrix(F, K) - class sizes per region # Synchronization sync: Optional[Dict] = None gsync: Optional[Dict] = None # Node parameters nodeparam: Optional[Dict] = None # Reward functions reward: Optional[Dict] = None # Original routing (for caching) rtorig: Optional[Dict] = None # Additional matrices csmask: Optional[np.ndarray] = None # class-switching mask nvars: Optional[np.ndarray] = None # number of variables # Fields that should be wrapped with MatrixArray for API compatibility _MATRIX_ARRAY_FIELDS = frozenset([ 'njobs', 'nservers', 'rates', 'scv', 'chains', 'refstat', 'refclass', 'routing', 'isstation', 'isstateful', 'nodeToStation', 'nodeToStateful', 'stationToNode', 'stationToStateful', 'statefulToNode', 'statefulToStation', 'classprio', 'rt', 'rtnodes' ])
[docs] def __setattr__(self, name: str, value): """Override to convert numpy arrays to MatrixArray for API compatibility.""" # Convert numpy arrays to MatrixArray for specified fields if name in NetworkStruct._MATRIX_ARRAY_FIELDS: if value is not None and not isinstance(value, MatrixArray): if isinstance(value, np.ndarray) or isinstance(value, (list, tuple)): value = MatrixArray(value) object.__setattr__(self, name, value)
[docs] def __post_init__(self): """Ensure arrays are MatrixArray (numpy arrays with .get()/.set() methods).""" for fname in NetworkStruct._MATRIX_ARRAY_FIELDS: val = getattr(self, fname) if val is not None: # Convert to MatrixArray for API compatibility (.get()/.set() methods) if not isinstance(val, MatrixArray): object.__setattr__(self, fname, MatrixArray(val))
[docs] def validate(self) -> None: """ Validate structural consistency. Raises: ValueError: If structural consistency is violated """ # Check basic counts if self.nstations < 0 or self.nstateful < 0 or self.nnodes < 0: raise ValueError("Node counts must be non-negative") if self.nstations > self.nstateful: raise ValueError("Number of stations cannot exceed number of stateful nodes") if self.nstateful > self.nnodes: raise ValueError("Number of stateful nodes cannot exceed total number of nodes") # Validate matrix dimensions if self.isstation is not None and len(self.isstation) > 0: if len(self.isstation) != self.nnodes: raise ValueError(f"isstation length {len(self.isstation)} != nnodes {self.nnodes}") if np.sum(self.isstation) != self.nstations: raise ValueError("nstations must equal sum of isstation") if self.isstateful is not None and len(self.isstateful) > 0: if len(self.isstateful) != self.nnodes: raise ValueError(f"isstateful length {len(self.isstateful)} != nnodes {self.nnodes}") if np.sum(self.isstateful) != self.nstateful: raise ValueError("nstateful must equal sum of isstateful") # Validate hierarchy: all stations must be stateful if (self.isstation is not None and self.isstateful is not None and len(self.isstation) > 0 and len(self.isstateful) > 0): for i in range(self.nnodes): if self.isstation[i] > 0 and self.isstateful[i] == 0: raise ValueError(f"All stations must be stateful nodes (violation at node {i})") # Validate rates matrix dimensions if self.rates is not None and len(self.rates) > 0: if self.rates.ndim == 2: if self.rates.shape[0] != self.nstations: raise ValueError(f"rates rows {self.rates.shape[0]} != nstations {self.nstations}") if self.rates.shape[1] != self.nclasses: raise ValueError(f"rates cols {self.rates.shape[1]} != nclasses {self.nclasses}")
[docs] def is_valid(self) -> bool: """ Check if structure is valid. Returns: True if structure passes validation, False otherwise """ try: self.validate() return True except ValueError: return False
[docs] def get_chain_population(self, chain_id: int) -> float: """ Get total population in a chain. Args: chain_id: Chain index (0-based) Returns: Total number of jobs in the chain """ if chain_id not in self.inchain: return 0.0 class_indices = self.inchain[chain_id] return np.sum(self.njobs.flat[class_indices.astype(int)])
[docs] def is_closed_chain(self, chain_id: int) -> bool: """ Check if a chain is closed (finite population). Args: chain_id: Chain index (0-based) Returns: True if chain is closed, False if open """ if chain_id not in self.inchain: return False class_indices = self.inchain[chain_id] # Chain is closed if all classes have finite population return np.all(np.isfinite(self.njobs.flat[class_indices.astype(int)]))
[docs] def is_open_chain(self, chain_id: int) -> bool: """ Check if a chain is open (infinite population). Args: chain_id: Chain index (0-based) Returns: True if chain is open, False if closed """ if chain_id not in self.inchain: return False class_indices = self.inchain[chain_id] # Chain is open if any class has infinite population return np.any(np.isinf(self.njobs.flat[class_indices.astype(int)]))
[docs] def get_station_indices(self) -> np.ndarray: """ Get indices of station nodes. Returns: Array of node indices that are stations """ return np.where(self.isstation > 0)[0]
[docs] def get_stateful_indices(self) -> np.ndarray: """ Get indices of stateful nodes. Returns: Array of node indices that are stateful """ return np.where(self.isstateful > 0)[0]
[docs] def get_scheduling_at_station(self, station_id: int) -> int: """ Get scheduling strategy at a station. Args: station_id: Station index (0-based) Returns: SchedStrategy value """ return self.sched.get(station_id, SchedStrategy.FCFS)
[docs] def has_multi_server(self) -> bool: """Check if any station has multiple servers.""" if self.nservers is None or len(self.nservers) == 0: return False return np.any(self.nservers > 1)
[docs] def has_load_dependence(self) -> bool: """Check if model has load-dependent service rates.""" return self.lldscaling is not None and np.any(self.lldscaling != 1.0)
[docs] def has_class_dependence(self) -> bool: """Check if model has class-dependent scaling.""" return self.cdscaling is not None and len(self.cdscaling) > 0
[docs] def has_open_classes(self) -> bool: """Check if model has open (infinite population) classes.""" return np.any(np.isinf(self.njobs))
[docs] def has_closed_classes(self) -> bool: """Check if model has closed (finite population) classes.""" return np.any(np.isfinite(self.njobs) & (self.njobs > 0))
[docs] def get_total_population(self) -> float: """Get total population across all closed classes.""" return np.sum(self.njobs[np.isfinite(self.njobs)])
[docs] def get_open_class_indices(self) -> np.ndarray: """Get indices of open classes.""" return np.where(np.isinf(self.njobs.flatten()))[0]
[docs] def get_closed_class_indices(self) -> np.ndarray: """Get indices of closed classes.""" return np.where(np.isfinite(self.njobs.flatten()))[0]
[docs] def copy(self) -> 'NetworkStruct': """Create a deep copy of this NetworkStruct.""" import copy return copy.deepcopy(self)
[docs] def __repr__(self) -> str: """String representation.""" return (f"NetworkStruct(nstations={self.nstations}, nstateful={self.nstateful}, " f"nnodes={self.nnodes}, nclasses={self.nclasses}, nchains={self.nchains}, " f"nclosedjobs={self.nclosedjobs})")
@property def obj(self): """Return self for compatibility with wrapper code that accesses .obj""" return self