Source code for line_solver.api.qsys.retrial

"""
Retrial and reneging queueing system analysis framework.

This module provides a foundation for analyzing bufferless retrial queues
(BMAP/PH/N/N) and reneging queues (MAP/M/s+G), with extension points for
future full solver implementation.

The framework includes:
- BMAP (Batch Markovian Arrival Process) matrix utilities
- PH (Phase-Type) distribution utilities
- QBD (Quasi-Birth-Death) state space construction
- Retrial queue topology detection and analysis
- Reneging queue topology detection and analysis

Reference:
    Dudin, A., Klimenok, V., & Vishnevsky, V. (2020).
    "The Theory of Quasi-Birth-Death Processes and Its Applications".
    Springer.

Port from: matlab/src/solvers/MAM/solver_mam_retrial.m
           matlab/src/api/qsys/qsys_bmapphnn_retrial.m
"""

import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum


[docs] class QueueType(Enum): """Queueing system topology types.""" STANDARD = "standard" RETRIAL = "retrial" RENEGING = "reneging" RETRIAL_RENEGING = "retrial_reneging"
[docs] @dataclass class BmapMatrix: """ Batch Markovian Arrival Process matrix representation. D₀ = drift matrix (no arrivals) D₁, D₂, ..., Dₖ = batch arrival matrices for batch sizes 1, 2, ..., K Properties: order: Dimension of the BMAP num_batches: Maximum batch size arrival_rate: Overall arrival rate """ D0: np.ndarray D_batch: List[np.ndarray] # D_batch[k] = D_{k+1}
[docs] def __post_init__(self): """Validate BMAP structure.""" if self.D0.shape[0] != self.D0.shape[1]: raise ValueError("D0 must be square matrix") self.order = self.D0.shape[0] self.num_batches = len(self.D_batch) # Validate batch matrices have same dimension for i, D in enumerate(self.D_batch): if D.shape != self.D0.shape: raise ValueError(f"D_batch[{i}] dimension mismatch with D0")
@property def arrival_rate(self) -> float: """ Compute overall arrival rate of the BMAP. lambda = theta * (-D0) * e where theta is stationary distribution of BMAP Markov chain and e is unit vector. """ # Simple approximation: trace-based estimate # For exact computation, requires solving theta (-D0) * e D_sum = self.D0.copy() for D in self.D_batch: D_sum = D_sum + D # Eigenvalue-based approximation try: eigvals = np.linalg.eigvals(-D_sum) return np.max(np.real(eigvals)) if len(eigvals) > 0 else 0.0 except: return 0.0 @property def fundamental_arrival_rate(self) -> float: """Arrival rate from fundamental matrix.""" try: # Compute -D0 inverse (if invertible) inv_D0 = np.linalg.inv(-self.D0) e = np.ones(self.order) # Sum of all batch matrices D_sum = np.zeros_like(self.D0) for D in self.D_batch: D_sum = D_sum + D # Arrival rate approximation return np.sum(inv_D0 @ D_sum) except: return self.arrival_rate
[docs] @dataclass class PhDistribution: """ Phase-Type (PH) distribution representation. Beta = initial probability vector (shape: m,) S = transient generator matrix (shape: m x m) where m is the number of phases. Properties: mean: Mean of PH distribution (1/mu in queue notation) scv: Squared coefficient of variation num_phases: Number of phases """ beta: np.ndarray S: np.ndarray
[docs] def __post_init__(self): """Validate PH distribution structure.""" if self.beta.ndim != 1: raise ValueError("beta must be 1-D vector") if self.S.ndim != 2 or self.S.shape[0] != self.S.shape[1]: raise ValueError("S must be square matrix") if self.S.shape[0] != len(self.beta): raise ValueError("S dimension must match beta length") self.num_phases = len(self.beta)
@property def mean(self) -> float: """ Compute mean of PH distribution. E[X] = -beta * S^{-1} * e """ try: S_inv = np.linalg.inv(self.S) e = np.ones(self.num_phases) return -np.dot(self.beta, S_inv) @ e except: return 1.0 # Fallback @property def mean_squared(self) -> float: """ Compute second moment of PH distribution. E[X²] = 2 * beta * S^{-2} * e """ try: S_inv = np.linalg.inv(self.S) e = np.ones(self.num_phases) return 2 * np.dot(self.beta, S_inv) @ (S_inv @ e) except: mean = self.mean return mean ** 2 # Fallback: assume deterministic @property def scv(self) -> float: """ Squared coefficient of variation of PH distribution. SCV = (E[X²] / E[X]²) - 1 """ mean = self.mean mean_sq = self.mean_squared if mean > 0: return (mean_sq / (mean ** 2)) - 1 return 0.0
[docs] @dataclass class QbdStatespace: """ Quasi-Birth-Death Markov chain state space representation. For a QBD process, states are of the form (n, i) where: - n = level (number of retrying customers in orbit) - i = phase (service phase or phase-type stage) Generator matrix has block structure: Q = | B_0 A_0 0 0 ... | | A_2 A_1 A_0 0 ... | | 0 A_2 A_1 A_0 ... | | ... | Properties: max_level: Maximum retrial orbit size (truncation level) phase_dim: Number of phases at each level total_states: Total state space dimension """ A0: np.ndarray # Arrival matrix (birth) A1: np.ndarray # Service matrix (drift) A2: np.ndarray # Retrial matrix (death) B0: np.ndarray # Boundary matrix at level 0 max_level: int
[docs] def __post_init__(self): """Validate QBD state space.""" phase_dim = self.A0.shape[0] if not all(M.shape == (phase_dim, phase_dim) for M in [self.A1, self.A2, self.B0]): raise ValueError("All matrices must have same dimension") self.phase_dim = phase_dim # Total states = phases * (max_level + 1) levels self.total_states = phase_dim * (self.max_level + 1)
[docs] def get_state_index(self, level: int, phase: int) -> int: """ Map (level, phase) to linear state index. Args: level: Retrial orbit size (0 ≤ level ≤ max_level) phase: Phase within level (0 ≤ phase < phase_dim) Returns: Linear state index """ if not (0 <= level <= self.max_level and 0 <= phase < self.phase_dim): raise ValueError(f"Invalid state ({level}, {phase})") return level * self.phase_dim + phase
[docs] def get_level_phase(self, idx: int) -> Tuple[int, int]: """ Map linear state index to (level, phase). Args: idx: Linear state index Returns: Tuple (level, phase) """ if not (0 <= idx < self.total_states): raise ValueError(f"Invalid state index {idx}") level = idx // self.phase_dim phase = idx % self.phase_dim return level, phase
@property def binomial_state_dimension(self) -> int: """ Compute state space dimension using binomial coefficient formula. For a retrial queue with N total customers and m servers, dimension = C(N + m - 1, m - 1) This is a rough upper bound for QBD truncation. """ # Placeholder - requires N and m parameters return self.phase_dim * (self.max_level + 1)
[docs] @dataclass class RetrialQueueResult: """ Analysis results for BMAP/PH/N/N retrial queue. Attributes: queue_type: Type of queue (retrial, reneging, etc.) L_orbit: Expected number of customers in orbit N_server: Expected number of customers being served utilization: Server utilization throughput: System throughput P_idle: Probability that server is idle P_empty_orbit: Probability that orbit is empty stationary_dist: Stationary distribution vector truncation_level: QBD truncation level used converged: Whether numerical solution converged iterations: Number of iterations to convergence error: Final error estimate """ queue_type: QueueType L_orbit: float N_server: float utilization: float throughput: float P_idle: float P_empty_orbit: float stationary_dist: Optional[np.ndarray] = None truncation_level: int = 0 converged: bool = False iterations: int = 0 error: float = np.inf
[docs] class RetrialQueueAnalyzer: """ Framework for analyzing BMAP/PH/N/N retrial queues. This class provides the foundation for future full solver implementation, including topology detection, parameter extraction, and QBD setup. Example: analyzer = RetrialQueueAnalyzer(model) queue_type = analyzer.detect_queue_type() if queue_type == QueueType.RETRIAL: result = analyzer.analyze() """
[docs] def __init__(self, sn: Any, options: Optional[Dict] = None): """ Initialize retrial queue analyzer. Args: sn: NetworkStruct with queue configuration options: Analysis options (tolerance, max iterations, etc.) """ self.sn = sn self.options = options or {} self.tolerance = self.options.get('tolerance', 1e-8) self.max_iterations = self.options.get('max_iterations', 1000) self.max_retrial_orbit = self.options.get('max_retrial_orbit', 100)
[docs] def detect_queue_type(self) -> QueueType: """ Detect the type of queueing system from topology. Returns: QueueType enum indicating retrial, reneging, or combination """ # Check for retrial nodes (orbit with retrial rate) has_retrial = False has_reneging = False # Placeholder logic - would examine sn structure for: # - Retrial probability/rate (orbit impatience) # - Reneging probability/patience times # - Queue abandonment if has_retrial and has_reneging: return QueueType.RETRIAL_RENEGING elif has_retrial: return QueueType.RETRIAL elif has_reneging: return QueueType.RENEGING else: return QueueType.STANDARD
[docs] def extract_bmap(self) -> Optional[BmapMatrix]: """ Extract BMAP parameters from arrival process. LINE stores arrival processes in MAP format: {D0, D1, D2, ...} where D0 is the "hidden" generator and D1, D2, ... are arrival matrices. Returns: BmapMatrix instance or None if arrival is not BMAP/MAP """ sn = self.sn # Find source station source_idx = None for i in range(sn.nstations): node_idx = int(sn.stationToNode[i]) # Check if this is a source node if hasattr(sn, 'nodetype') and sn.nodetype[node_idx] == 0: # Source type source_idx = i break if source_idx is None: return None # Extract process from sn.proc # Assume single class for now (classIdx = 0) class_idx = 0 if not hasattr(sn, 'proc') or sn.proc is None: return None try: proc = sn.proc[source_idx][class_idx] except (IndexError, KeyError, TypeError): return None if proc is None or not isinstance(proc, (list, tuple)) or len(proc) < 2: return None D0 = np.atleast_2d(proc[0]) D_batch = [] # Check if D0 looks like a subgenerator (negative diagonal) if D0.shape[0] == D0.shape[1]: diag_D0 = np.diag(D0) if np.all(diag_D0 <= 0): # This is in MAP format {D0, D1, ...} for i in range(1, len(proc)): D_batch.append(np.atleast_2d(proc[i])) return BmapMatrix(D0=D0, D_batch=D_batch) # Try to interpret as PH format {alpha, T} and convert to MAP alpha = np.atleast_1d(proc[0]).flatten() T = np.atleast_2d(proc[1]) if len(alpha) == T.shape[0] == T.shape[1]: n = len(alpha) e = np.ones(n) # Convert PH to MAP: D0 = T, D1 = (-T*e)*alpha D0_new = T D1_new = np.outer(-T @ e, alpha) return BmapMatrix(D0=D0_new, D_batch=[D1_new]) return None
[docs] def extract_ph_service(self) -> Optional[PhDistribution]: """ Extract Phase-Type service parameters. LINE stores PH in MAP format: {D0, D1} D0 = T (subgenerator matrix) D1 = S0 * alpha (exit rate times initial prob) Returns: PhDistribution instance or None if service is not PH """ sn = self.sn # Find queue station queue_idx = None for i in range(sn.nstations): node_idx = int(sn.stationToNode[i]) # Check if this is a queue node if hasattr(sn, 'nodetype') and sn.nodetype[node_idx] == 1: # Queue type queue_idx = i break if queue_idx is None: return None # Extract process from sn.proc class_idx = 0 if not hasattr(sn, 'proc') or sn.proc is None: return None try: proc = sn.proc[queue_idx][class_idx] except (IndexError, KeyError, TypeError): return None if proc is None or not isinstance(proc, (list, tuple)) or len(proc) < 2: return None D0 = np.atleast_2d(proc[0]) D1 = np.atleast_2d(proc[1]) # Validate dimensions n = D0.shape[0] if D0.shape[1] != n or D1.shape[0] != n or D1.shape[1] != n: return None # S = D0 (subgenerator) S = D0 # S0 = -S * ones (exit rates) e = np.ones(n) S0 = -S @ e # Extract alpha from D1 = S0 * alpha # Find a row with non-zero exit rate idx = np.where(S0 > 1e-10)[0] if len(idx) == 0: # All rows have zero exit rate - use uniform beta = np.ones(n) / n else: # alpha = D1(idx,:) / S0(idx) beta = D1[idx[0], :] / S0[idx[0]] # Ensure beta is properly normalized beta = beta.flatten() if abs(np.sum(beta) - 1) > 1e-6: if np.sum(beta) > 0: beta = beta / np.sum(beta) else: beta = np.ones(n) / n return PhDistribution(beta=beta, S=S)
[docs] def extract_retrial_parameters(self) -> Optional[Dict[str, float]]: """ Extract retrial-specific parameters. Returns: Dict with keys: - alpha: Retrial rate (rate at which customers retry) - gamma: Orbit impatience rate (reneging rate) - p: Batch rejection probability - R: Threshold for admission control - N: Number of servers """ sn = self.sn params = { 'alpha': 0.1, # Default retrial rate 'gamma': 0.0, # Default orbit impatience (no abandonment) 'p': 0.0, # Default batch rejection probability 'R': 0, # Default admission threshold 'N': 1 # Default number of servers } # Find queue station queue_idx = None for i in range(sn.nstations): node_idx = int(sn.stationToNode[i]) if hasattr(sn, 'nodetype') and sn.nodetype[node_idx] == 1: queue_idx = i break if queue_idx is None: return None class_idx = 0 # Extract number of servers if hasattr(sn, 'nservers'): try: params['N'] = int(sn.nservers[queue_idx]) except (IndexError, TypeError): pass # Extract retrial rate from retrialDelays if available if hasattr(sn, 'retrialDelays') and sn.retrialDelays is not None: try: retrial_dist = sn.retrialDelays[queue_idx][class_idx] if retrial_dist is not None and isinstance(retrial_dist, (list, tuple)): # For Exp(alpha), the rate matrix is -alpha params['alpha'] = -float(retrial_dist[1][0, 0]) except (IndexError, TypeError, KeyError): pass # Extract orbit impatience rate if hasattr(sn, 'orbitImpatience') and sn.orbitImpatience is not None: try: impatience_dist = sn.orbitImpatience[queue_idx][class_idx] if impatience_dist is not None and isinstance(impatience_dist, (list, tuple)): params['gamma'] = -float(impatience_dist[1][0, 0]) except (IndexError, TypeError, KeyError): pass # Extract batch rejection probability if hasattr(sn, 'batchRejectProb') and sn.batchRejectProb is not None: try: params['p'] = float(sn.batchRejectProb[queue_idx, class_idx]) except (IndexError, TypeError): pass # Set admission threshold R = N - 1 by default params['R'] = params['N'] - 1 return params
[docs] def build_qbd_statespace(self, bmap: BmapMatrix, ph_service: PhDistribution, retrial_params: Dict[str, float] ) -> Optional[QbdStatespace]: """ Build QBD state space for the retrial queue. This constructs the generator matrix blocks for the QBD process following the BMAP/PH/N/N retrial queue formulation. Args: bmap: BMAP arrival process ph_service: PH service distribution retrial_params: Retrial parameters (alpha, gamma, p, R, N) Returns: QbdStatespace instance or None if construction fails """ if bmap is None or ph_service is None or retrial_params is None: return None # Extract parameters D0 = bmap.D0 D1 = bmap.D_batch[0] if bmap.D_batch else np.zeros_like(D0) beta = ph_service.beta S = ph_service.S alpha = retrial_params.get('alpha', 0.1) gamma = retrial_params.get('gamma', 0.0) N = retrial_params.get('N', 1) # Dimensions m_a = D0.shape[0] # BMAP phases m_s = S.shape[0] # Service phases phase_dim = m_a * m_s * N # Combined phase dimension # Exit rate vector for service e_s = np.ones(m_s) S0 = -S @ e_s # Identity matrices I_a = np.eye(m_a) I_s = np.eye(m_s) # Build QBD generator blocks # A0: transitions that increase level (arrivals to orbit) # A1: transitions at same level # A2: transitions that decrease level (retrials from orbit) # Simplified construction for single-server case if N == 1: # Phase = (arrival phase, service phase, server state) # Server state: 0 = idle, 1 = busy # Build A0 (arrivals when server busy - go to orbit) A0 = np.kron(D1, I_s) # Build A2 (retrials from orbit when server becomes free) A2 = alpha * np.kron(I_a, np.outer(S0, beta)) # Build A1 (internal transitions + arrivals when idle) A1 = np.kron(D0, I_s) + np.kron(I_a, S) # Add impatience (customers leaving orbit) if gamma > 0: A1 = A1 + gamma * np.eye(phase_dim) # Boundary block B0 B0 = np.kron(D0 + D1, I_s) + np.kron(I_a, S) else: # Multi-server case - more complex construction # Placeholder: return None for now return None return QbdStatespace( A0=A0, A1=A1, A2=A2, B0=B0, max_level=self.max_retrial_orbit, phase_dim=phase_dim )
[docs] def analyze(self) -> RetrialQueueResult: """ Analyze the retrial queue. This is the main entry point for analysis: 1. Detect queue type 2. Extract arrival and service parameters 3. Build QBD state space 4. Solve for stationary distribution using matrix-analytic methods 5. Compute performance metrics Returns: RetrialQueueResult with performance metrics """ queue_type = self.detect_queue_type() if queue_type == QueueType.STANDARD: raise ValueError("Standard queue - use standard analyzer") # Extract parameters bmap = self.extract_bmap() ph_service = self.extract_ph_service() retrial_params = self.extract_retrial_parameters() if bmap is None or ph_service is None or retrial_params is None: return RetrialQueueResult( queue_type=queue_type, L_orbit=0.0, N_server=0.0, utilization=0.0, throughput=0.0, P_idle=1.0, P_empty_orbit=1.0, converged=False, error=np.inf ) # Build QBD state space qbd = self.build_qbd_statespace(bmap, ph_service, retrial_params) if qbd is None: return RetrialQueueResult( queue_type=queue_type, L_orbit=0.0, N_server=0.0, utilization=0.0, throughput=0.0, P_idle=1.0, P_empty_orbit=1.0, converged=False, error=np.inf ) # Solve QBD using matrix-geometric method try: pi, R, converged, iterations, error = self._solve_qbd_matrix_geometric(qbd) except Exception: return RetrialQueueResult( queue_type=queue_type, L_orbit=0.0, N_server=0.0, utilization=0.0, throughput=0.0, P_idle=1.0, P_empty_orbit=1.0, converged=False, error=np.inf ) # Compute performance metrics N = retrial_params.get('N', 1) arrival_rate = bmap.arrival_rate service_rate = ph_service.rate if hasattr(ph_service, 'rate') else 1.0 / ph_service.mean # Expected number in orbit (sum over levels weighted by level) L_orbit = 0.0 for level in range(len(pi)): L_orbit += level * np.sum(pi[level]) # Server utilization P_idle = np.sum(pi[0]) if len(pi) > 0 else 1.0 utilization = 1.0 - P_idle # Throughput via departure rate throughput = utilization * N * service_rate # Expected number being served N_server = utilization * N # Probability orbit is empty P_empty_orbit = np.sum(pi[0]) if len(pi) > 0 else 1.0 return RetrialQueueResult( queue_type=queue_type, L_orbit=L_orbit, N_server=N_server, utilization=utilization, throughput=throughput, P_idle=P_idle, P_empty_orbit=P_empty_orbit, stationary_dist=pi if converged else None, truncation_level=len(pi) - 1 if pi is not None else 0, converged=converged, iterations=iterations, error=error )
def _solve_qbd_matrix_geometric(self, qbd: QbdStatespace) -> Tuple[np.ndarray, np.ndarray, bool, int, float]: """ Solve QBD process using matrix-geometric method. The rate matrix R satisfies: A0 + R*A1 + R^2*A2 = 0 Args: qbd: QBD state space with generator blocks Returns: Tuple of (stationary distribution, R matrix, converged, iterations, error) """ A0 = qbd.A0 A1 = qbd.A1 A2 = qbd.A2 n = A0.shape[0] # Initialize R matrix R = np.zeros((n, n)) # Iteration for R: R = -A0 * (A1 + R*A2)^{-1} converged = False iterations = 0 error = np.inf for it in range(self.max_iterations): iterations = it + 1 try: # Compute (A1 + R*A2)^{-1} inv_term = np.linalg.inv(A1 + R @ A2) R_new = -A0 @ inv_term except np.linalg.LinAlgError: break # Check convergence error = np.max(np.abs(R_new - R)) R = R_new if error < self.tolerance: converged = True break if not converged: return None, R, False, iterations, error # Compute boundary probabilities # pi_0 satisfies: pi_0 * (B0 + R*A2) = 0, sum(pi_0) * (I - R)^{-1} * e = 1 try: B = qbd.B0 + R @ A2 # Find null space eigvals, eigvecs = np.linalg.eig(B.T) null_idx = np.argmin(np.abs(eigvals)) pi_0 = np.real(eigvecs[:, null_idx]) pi_0 = np.abs(pi_0) # Normalize I_minus_R_inv = np.linalg.inv(np.eye(n) - R) e = np.ones(n) norm_factor = pi_0 @ I_minus_R_inv @ e if norm_factor > 0: pi_0 = pi_0 / norm_factor except np.linalg.LinAlgError: return None, R, False, iterations, error # Build stationary distribution up to truncation level pi = [pi_0] current = pi_0 for level in range(1, qbd.max_level + 1): current = current @ R if np.sum(current) < self.tolerance: break pi.append(current.copy()) return np.array(pi), R, converged, iterations, error
[docs] def qsys_bmapphnn_retrial( arrival_matrix: Dict[str, np.ndarray], service_params: Dict[str, float], N: int, retrial_params: Optional[Dict[str, float]] = None, options: Optional[Dict] = None ) -> RetrialQueueResult: """ Analyze BMAP/PH/N/N bufferless retrial queue. Parameters: arrival_matrix: Dict with 'D0', 'D1', ... for BMAP service_params: Dict with 'beta' (initial prob) and 'S' (gen matrix) for PH N: Number of customers (for closed queue) retrial_params: Dict with 'alpha' (retrial rate), 'gamma' (impatience), 'p' (rejection prob), 'R' (threshold) options: Solver options Returns: RetrialQueueResult with performance metrics Raises: NotImplementedError: Full solver not yet implemented References: Dudin, A., Klimenok, V., & Vishnevsky, V. (2020). "The Theory of Quasi-Birth-Death Processes and Its Applications". Springer. """ if options is None: options = {} # Extract parameters try: D0 = arrival_matrix.get('D0') D_batch = [arrival_matrix.get(f'D{i+1}') for i in range(10)] D_batch = [D for D in D_batch if D is not None] bmap = BmapMatrix(D0=D0, D_batch=D_batch) beta = service_params.get('beta', np.array([1.0])) S = service_params.get('S', np.array([[-1.0]])) ph_service = PhDistribution(beta=beta, S=S) except Exception as e: raise ValueError(f"Invalid BMAP/PH parameters: {e}") # Set retrial parameters if retrial_params is None: retrial_params = { 'alpha': 1.0, # Default retrial rate 'gamma': 0.0, # No impatience 'p': 0.0, # No rejection 'R': N # No threshold } # Create placeholder result result = RetrialQueueResult( queue_type=QueueType.RETRIAL, L_orbit=0.0, N_server=0.0, utilization=0.0, throughput=0.0, P_idle=1.0, P_empty_orbit=1.0, truncation_level=options.get('max_retrial_orbit', 100), converged=False, error=np.inf ) return result
[docs] @dataclass class RetrialInfo: """Information about a valid retrial queue topology.""" is_retrial: bool station_idx: Optional[int] = None node_idx: Optional[int] = None source_idx: Optional[int] = None class_idx: Optional[int] = None error_msg: str = '' N: Optional[int] = None # Number of servers alpha: float = 0.1 # Retrial rate (default) gamma: float = 0.0 # Orbit impatience (default 0) p: float = 0.0 # Batch rejection prob (default 0) R: Optional[int] = None # Admission threshold
[docs] def qsys_is_retrial(sn: Any) -> Tuple[bool, RetrialInfo]: """ Check if network is a valid BMAP/PH/N/N bufferless retrial queue. Validates that the network structure matches the requirements for the BMAP/PH/N/N retrial queue solver: - Single bufferless queue (capacity == number of servers) - Retrial drop strategy configured - BMAP/MAP arrival process at source - PH/Exp service at queue - Open class model Based on: Dudin et al., "Analysis of BMAP/PH/N-Type Queueing System with Flexible Retrials Admission Control", Mathematics 2025, 13(9), 1434. Args: sn: NetworkStruct object Returns: Tuple of (is_retrial, retrial_info) where: is_retrial: True if network is valid BMAP/PH/N/N retrial topology retrial_info: RetrialInfo with parameters for the retrial solver References: Original MATLAB: matlab/src/api/qsys/qsys_is_retrial.m """ from ..sn import sn_is_open_model, NodeType, DropStrategy ret_info = RetrialInfo(is_retrial=False) # Check if model is open try: if not sn_is_open_model(sn): ret_info.error_msg = 'BMAP/PH/N/N retrial solver requires open queueing model.' return False, ret_info except: ret_info.error_msg = 'Could not determine if model is open.' return False, ret_info # Check for single class (current limitation) if sn.nclasses > 1: ret_info.error_msg = 'BMAP/PH/N/N retrial solver currently supports single class only.' return False, ret_info ret_info.class_idx = 0 # Find bufferless queue stations (capacity == nservers, finite capacity) bufferless_stations = [] for ist in range(sn.nstations): node_idx = sn.stationToNode[ist] if hasattr(sn, 'stationToNode') else ist if node_idx < len(sn.nodetype) and sn.nodetype[node_idx] == NodeType.QUEUE: cap = sn.cap[ist] if hasattr(sn, 'cap') and sn.cap is not None else np.inf nservers = sn.nservers[ist] if hasattr(sn, 'nservers') and sn.nservers is not None else 1 if np.isfinite(cap) and cap == nservers: bufferless_stations.append(ist) if not bufferless_stations: ret_info.error_msg = 'No bufferless queue found (capacity must equal number of servers).' return False, ret_info # Check retrial drop strategy retrial_station = None for ist in bufferless_stations: if hasattr(sn, 'droprule') and sn.droprule is not None: droprule = sn.droprule[ist, :] if sn.droprule.ndim > 1 else sn.droprule # Check for RETRIAL or RETRIAL_WITH_LIMIT drop strategy has_retrial = False try: has_retrial = any( dr == DropStrategy.RETRIAL or dr == DropStrategy.RETRIAL_WITH_LIMIT for dr in droprule ) except: # If DropStrategy comparison fails, try numeric comparison has_retrial = any(dr in [6, 7] for dr in droprule) # Assuming 6,7 are retrial codes if has_retrial: retrial_station = ist break if retrial_station is None: ret_info.error_msg = 'No retrial drop strategy configured on bufferless queue.' return False, ret_info ret_info.station_idx = retrial_station ret_info.node_idx = sn.stationToNode[retrial_station] if hasattr(sn, 'stationToNode') else retrial_station ret_info.N = int(sn.nservers[retrial_station]) if hasattr(sn, 'nservers') and sn.nservers is not None else 1 # Find source station source_station = None for ist in range(sn.nstations): node_idx = sn.stationToNode[ist] if hasattr(sn, 'stationToNode') else ist if node_idx < len(sn.nodetype) and sn.nodetype[node_idx] == NodeType.SOURCE: source_station = ist break if source_station is None: ret_info.error_msg = 'No Source node found.' return False, ret_info ret_info.source_idx = source_station # Validate arrival process (must be MAP/BMAP) if hasattr(sn, 'proc') and sn.proc is not None: try: arrival_proc = sn.proc[source_station][ret_info.class_idx] if arrival_proc is None or not isinstance(arrival_proc, (list, tuple)) or len(arrival_proc) < 2: ret_info.error_msg = 'Invalid arrival process at source.' return False, ret_info except (IndexError, TypeError): ret_info.error_msg = 'Invalid arrival process at source.' return False, ret_info # Validate service process (must be PH/Exp) if hasattr(sn, 'proc') and sn.proc is not None: try: service_proc = sn.proc[retrial_station][ret_info.class_idx] if service_proc is None or not isinstance(service_proc, (list, tuple)) or len(service_proc) < 2: ret_info.error_msg = 'Invalid service process at queue.' return False, ret_info except (IndexError, TypeError): ret_info.error_msg = 'Invalid service process at queue.' return False, ret_info # Set default admission threshold ret_info.R = ret_info.N - 1 # Check for FCR (Finite Capacity Region) containing this queue if hasattr(sn, 'region') and sn.region is not None and hasattr(sn, 'nregions') and sn.nregions > 0: for f in range(sn.nregions): if f < len(sn.region) and sn.region[f] is not None: region_matrix = sn.region[f] if retrial_station < region_matrix.shape[0]: global_cap_col = region_matrix.shape[1] - 1 if region_matrix[retrial_station, global_cap_col] > 0: R_fcr = int(region_matrix[retrial_station, global_cap_col]) if R_fcr <= ret_info.N - 1: ret_info.R = R_fcr break # All validations passed ret_info.is_retrial = True return True, ret_info
__all__ = [ 'QueueType', 'BmapMatrix', 'PhDistribution', 'QbdStatespace', 'RetrialQueueResult', 'RetrialQueueAnalyzer', 'qsys_bmapphnn_retrial', 'qsys_is_retrial', 'RetrialInfo', ]