"""
Retrial and reneging queueing system analysis framework.
This module provides a foundation for analyzing bufferless retrial queues
(BMAP/PH/N/N) and reneging queues (MAP/M/s+G), with extension points for
future full solver implementation.
The framework includes:
- BMAP (Batch Markovian Arrival Process) matrix utilities
- PH (Phase-Type) distribution utilities
- QBD (Quasi-Birth-Death) state space construction
- Retrial queue topology detection and analysis
- Reneging queue topology detection and analysis
Reference:
Dudin, A., Klimenok, V., & Vishnevsky, V. (2020).
"The Theory of Quasi-Birth-Death Processes and Its Applications".
Springer.
Port from: matlab/src/solvers/MAM/solver_mam_retrial.m
matlab/src/api/qsys/qsys_bmapphnn_retrial.m
"""
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
[docs]
class QueueType(Enum):
"""Queueing system topology types."""
STANDARD = "standard"
RETRIAL = "retrial"
RENEGING = "reneging"
RETRIAL_RENEGING = "retrial_reneging"
[docs]
@dataclass
class BmapMatrix:
"""
Batch Markovian Arrival Process matrix representation.
D₀ = drift matrix (no arrivals)
D₁, D₂, ..., Dₖ = batch arrival matrices for batch sizes 1, 2, ..., K
Properties:
order: Dimension of the BMAP
num_batches: Maximum batch size
arrival_rate: Overall arrival rate
"""
D0: np.ndarray
D_batch: List[np.ndarray] # D_batch[k] = D_{k+1}
[docs]
def __post_init__(self):
"""Validate BMAP structure."""
if self.D0.shape[0] != self.D0.shape[1]:
raise ValueError("D0 must be square matrix")
self.order = self.D0.shape[0]
self.num_batches = len(self.D_batch)
# Validate batch matrices have same dimension
for i, D in enumerate(self.D_batch):
if D.shape != self.D0.shape:
raise ValueError(f"D_batch[{i}] dimension mismatch with D0")
@property
def arrival_rate(self) -> float:
"""
Compute overall arrival rate of the BMAP.
lambda = theta * (-D0) * e where theta is stationary distribution
of BMAP Markov chain and e is unit vector.
"""
# Simple approximation: trace-based estimate
# For exact computation, requires solving theta (-D0) * e
D_sum = self.D0.copy()
for D in self.D_batch:
D_sum = D_sum + D
# Eigenvalue-based approximation
try:
eigvals = np.linalg.eigvals(-D_sum)
return np.max(np.real(eigvals)) if len(eigvals) > 0 else 0.0
except:
return 0.0
@property
def fundamental_arrival_rate(self) -> float:
"""Arrival rate from fundamental matrix."""
try:
# Compute -D0 inverse (if invertible)
inv_D0 = np.linalg.inv(-self.D0)
e = np.ones(self.order)
# Sum of all batch matrices
D_sum = np.zeros_like(self.D0)
for D in self.D_batch:
D_sum = D_sum + D
# Arrival rate approximation
return np.sum(inv_D0 @ D_sum)
except:
return self.arrival_rate
[docs]
@dataclass
class PhDistribution:
"""
Phase-Type (PH) distribution representation.
Beta = initial probability vector (shape: m,)
S = transient generator matrix (shape: m x m)
where m is the number of phases.
Properties:
mean: Mean of PH distribution (1/mu in queue notation)
scv: Squared coefficient of variation
num_phases: Number of phases
"""
beta: np.ndarray
S: np.ndarray
[docs]
def __post_init__(self):
"""Validate PH distribution structure."""
if self.beta.ndim != 1:
raise ValueError("beta must be 1-D vector")
if self.S.ndim != 2 or self.S.shape[0] != self.S.shape[1]:
raise ValueError("S must be square matrix")
if self.S.shape[0] != len(self.beta):
raise ValueError("S dimension must match beta length")
self.num_phases = len(self.beta)
@property
def mean(self) -> float:
"""
Compute mean of PH distribution.
E[X] = -beta * S^{-1} * e
"""
try:
S_inv = np.linalg.inv(self.S)
e = np.ones(self.num_phases)
return -np.dot(self.beta, S_inv) @ e
except:
return 1.0 # Fallback
@property
def mean_squared(self) -> float:
"""
Compute second moment of PH distribution.
E[X²] = 2 * beta * S^{-2} * e
"""
try:
S_inv = np.linalg.inv(self.S)
e = np.ones(self.num_phases)
return 2 * np.dot(self.beta, S_inv) @ (S_inv @ e)
except:
mean = self.mean
return mean ** 2 # Fallback: assume deterministic
@property
def scv(self) -> float:
"""
Squared coefficient of variation of PH distribution.
SCV = (E[X²] / E[X]²) - 1
"""
mean = self.mean
mean_sq = self.mean_squared
if mean > 0:
return (mean_sq / (mean ** 2)) - 1
return 0.0
[docs]
@dataclass
class QbdStatespace:
"""
Quasi-Birth-Death Markov chain state space representation.
For a QBD process, states are of the form (n, i) where:
- n = level (number of retrying customers in orbit)
- i = phase (service phase or phase-type stage)
Generator matrix has block structure:
Q = | B_0 A_0 0 0 ... |
| A_2 A_1 A_0 0 ... |
| 0 A_2 A_1 A_0 ... |
| ... |
Properties:
max_level: Maximum retrial orbit size (truncation level)
phase_dim: Number of phases at each level
total_states: Total state space dimension
"""
A0: np.ndarray # Arrival matrix (birth)
A1: np.ndarray # Service matrix (drift)
A2: np.ndarray # Retrial matrix (death)
B0: np.ndarray # Boundary matrix at level 0
max_level: int
[docs]
def __post_init__(self):
"""Validate QBD state space."""
phase_dim = self.A0.shape[0]
if not all(M.shape == (phase_dim, phase_dim)
for M in [self.A1, self.A2, self.B0]):
raise ValueError("All matrices must have same dimension")
self.phase_dim = phase_dim
# Total states = phases * (max_level + 1) levels
self.total_states = phase_dim * (self.max_level + 1)
[docs]
def get_state_index(self, level: int, phase: int) -> int:
"""
Map (level, phase) to linear state index.
Args:
level: Retrial orbit size (0 ≤ level ≤ max_level)
phase: Phase within level (0 ≤ phase < phase_dim)
Returns:
Linear state index
"""
if not (0 <= level <= self.max_level and 0 <= phase < self.phase_dim):
raise ValueError(f"Invalid state ({level}, {phase})")
return level * self.phase_dim + phase
[docs]
def get_level_phase(self, idx: int) -> Tuple[int, int]:
"""
Map linear state index to (level, phase).
Args:
idx: Linear state index
Returns:
Tuple (level, phase)
"""
if not (0 <= idx < self.total_states):
raise ValueError(f"Invalid state index {idx}")
level = idx // self.phase_dim
phase = idx % self.phase_dim
return level, phase
@property
def binomial_state_dimension(self) -> int:
"""
Compute state space dimension using binomial coefficient formula.
For a retrial queue with N total customers and m servers,
dimension = C(N + m - 1, m - 1)
This is a rough upper bound for QBD truncation.
"""
# Placeholder - requires N and m parameters
return self.phase_dim * (self.max_level + 1)
[docs]
@dataclass
class RetrialQueueResult:
"""
Analysis results for BMAP/PH/N/N retrial queue.
Attributes:
queue_type: Type of queue (retrial, reneging, etc.)
L_orbit: Expected number of customers in orbit
N_server: Expected number of customers being served
utilization: Server utilization
throughput: System throughput
P_idle: Probability that server is idle
P_empty_orbit: Probability that orbit is empty
stationary_dist: Stationary distribution vector
truncation_level: QBD truncation level used
converged: Whether numerical solution converged
iterations: Number of iterations to convergence
error: Final error estimate
"""
queue_type: QueueType
L_orbit: float
N_server: float
utilization: float
throughput: float
P_idle: float
P_empty_orbit: float
stationary_dist: Optional[np.ndarray] = None
truncation_level: int = 0
converged: bool = False
iterations: int = 0
error: float = np.inf
[docs]
class RetrialQueueAnalyzer:
"""
Framework for analyzing BMAP/PH/N/N retrial queues.
This class provides the foundation for future full solver implementation,
including topology detection, parameter extraction, and QBD setup.
Example:
analyzer = RetrialQueueAnalyzer(model)
queue_type = analyzer.detect_queue_type()
if queue_type == QueueType.RETRIAL:
result = analyzer.analyze()
"""
[docs]
def __init__(self, sn: Any, options: Optional[Dict] = None):
"""
Initialize retrial queue analyzer.
Args:
sn: NetworkStruct with queue configuration
options: Analysis options (tolerance, max iterations, etc.)
"""
self.sn = sn
self.options = options or {}
self.tolerance = self.options.get('tolerance', 1e-8)
self.max_iterations = self.options.get('max_iterations', 1000)
self.max_retrial_orbit = self.options.get('max_retrial_orbit', 100)
[docs]
def detect_queue_type(self) -> QueueType:
"""
Detect the type of queueing system from topology.
Returns:
QueueType enum indicating retrial, reneging, or combination
"""
# Check for retrial nodes (orbit with retrial rate)
has_retrial = False
has_reneging = False
# Placeholder logic - would examine sn structure for:
# - Retrial probability/rate (orbit impatience)
# - Reneging probability/patience times
# - Queue abandonment
if has_retrial and has_reneging:
return QueueType.RETRIAL_RENEGING
elif has_retrial:
return QueueType.RETRIAL
elif has_reneging:
return QueueType.RENEGING
else:
return QueueType.STANDARD
[docs]
def build_qbd_statespace(self,
bmap: BmapMatrix,
ph_service: PhDistribution,
retrial_params: Dict[str, float]
) -> Optional[QbdStatespace]:
"""
Build QBD state space for the retrial queue.
This constructs the generator matrix blocks for the QBD process
following the BMAP/PH/N/N retrial queue formulation.
Args:
bmap: BMAP arrival process
ph_service: PH service distribution
retrial_params: Retrial parameters (alpha, gamma, p, R, N)
Returns:
QbdStatespace instance or None if construction fails
"""
if bmap is None or ph_service is None or retrial_params is None:
return None
# Extract parameters
D0 = bmap.D0
D1 = bmap.D_batch[0] if bmap.D_batch else np.zeros_like(D0)
beta = ph_service.beta
S = ph_service.S
alpha = retrial_params.get('alpha', 0.1)
gamma = retrial_params.get('gamma', 0.0)
N = retrial_params.get('N', 1)
# Dimensions
m_a = D0.shape[0] # BMAP phases
m_s = S.shape[0] # Service phases
phase_dim = m_a * m_s * N # Combined phase dimension
# Exit rate vector for service
e_s = np.ones(m_s)
S0 = -S @ e_s
# Identity matrices
I_a = np.eye(m_a)
I_s = np.eye(m_s)
# Build QBD generator blocks
# A0: transitions that increase level (arrivals to orbit)
# A1: transitions at same level
# A2: transitions that decrease level (retrials from orbit)
# Simplified construction for single-server case
if N == 1:
# Phase = (arrival phase, service phase, server state)
# Server state: 0 = idle, 1 = busy
# Build A0 (arrivals when server busy - go to orbit)
A0 = np.kron(D1, I_s)
# Build A2 (retrials from orbit when server becomes free)
A2 = alpha * np.kron(I_a, np.outer(S0, beta))
# Build A1 (internal transitions + arrivals when idle)
A1 = np.kron(D0, I_s) + np.kron(I_a, S)
# Add impatience (customers leaving orbit)
if gamma > 0:
A1 = A1 + gamma * np.eye(phase_dim)
# Boundary block B0
B0 = np.kron(D0 + D1, I_s) + np.kron(I_a, S)
else:
# Multi-server case - more complex construction
# Placeholder: return None for now
return None
return QbdStatespace(
A0=A0,
A1=A1,
A2=A2,
B0=B0,
max_level=self.max_retrial_orbit,
phase_dim=phase_dim
)
[docs]
def analyze(self) -> RetrialQueueResult:
"""
Analyze the retrial queue.
This is the main entry point for analysis:
1. Detect queue type
2. Extract arrival and service parameters
3. Build QBD state space
4. Solve for stationary distribution using matrix-analytic methods
5. Compute performance metrics
Returns:
RetrialQueueResult with performance metrics
"""
queue_type = self.detect_queue_type()
if queue_type == QueueType.STANDARD:
raise ValueError("Standard queue - use standard analyzer")
# Extract parameters
bmap = self.extract_bmap()
ph_service = self.extract_ph_service()
retrial_params = self.extract_retrial_parameters()
if bmap is None or ph_service is None or retrial_params is None:
return RetrialQueueResult(
queue_type=queue_type,
L_orbit=0.0,
N_server=0.0,
utilization=0.0,
throughput=0.0,
P_idle=1.0,
P_empty_orbit=1.0,
converged=False,
error=np.inf
)
# Build QBD state space
qbd = self.build_qbd_statespace(bmap, ph_service, retrial_params)
if qbd is None:
return RetrialQueueResult(
queue_type=queue_type,
L_orbit=0.0,
N_server=0.0,
utilization=0.0,
throughput=0.0,
P_idle=1.0,
P_empty_orbit=1.0,
converged=False,
error=np.inf
)
# Solve QBD using matrix-geometric method
try:
pi, R, converged, iterations, error = self._solve_qbd_matrix_geometric(qbd)
except Exception:
return RetrialQueueResult(
queue_type=queue_type,
L_orbit=0.0,
N_server=0.0,
utilization=0.0,
throughput=0.0,
P_idle=1.0,
P_empty_orbit=1.0,
converged=False,
error=np.inf
)
# Compute performance metrics
N = retrial_params.get('N', 1)
arrival_rate = bmap.arrival_rate
service_rate = ph_service.rate if hasattr(ph_service, 'rate') else 1.0 / ph_service.mean
# Expected number in orbit (sum over levels weighted by level)
L_orbit = 0.0
for level in range(len(pi)):
L_orbit += level * np.sum(pi[level])
# Server utilization
P_idle = np.sum(pi[0]) if len(pi) > 0 else 1.0
utilization = 1.0 - P_idle
# Throughput via departure rate
throughput = utilization * N * service_rate
# Expected number being served
N_server = utilization * N
# Probability orbit is empty
P_empty_orbit = np.sum(pi[0]) if len(pi) > 0 else 1.0
return RetrialQueueResult(
queue_type=queue_type,
L_orbit=L_orbit,
N_server=N_server,
utilization=utilization,
throughput=throughput,
P_idle=P_idle,
P_empty_orbit=P_empty_orbit,
stationary_dist=pi if converged else None,
truncation_level=len(pi) - 1 if pi is not None else 0,
converged=converged,
iterations=iterations,
error=error
)
def _solve_qbd_matrix_geometric(self, qbd: QbdStatespace) -> Tuple[np.ndarray, np.ndarray, bool, int, float]:
"""
Solve QBD process using matrix-geometric method.
The rate matrix R satisfies: A0 + R*A1 + R^2*A2 = 0
Args:
qbd: QBD state space with generator blocks
Returns:
Tuple of (stationary distribution, R matrix, converged, iterations, error)
"""
A0 = qbd.A0
A1 = qbd.A1
A2 = qbd.A2
n = A0.shape[0]
# Initialize R matrix
R = np.zeros((n, n))
# Iteration for R: R = -A0 * (A1 + R*A2)^{-1}
converged = False
iterations = 0
error = np.inf
for it in range(self.max_iterations):
iterations = it + 1
try:
# Compute (A1 + R*A2)^{-1}
inv_term = np.linalg.inv(A1 + R @ A2)
R_new = -A0 @ inv_term
except np.linalg.LinAlgError:
break
# Check convergence
error = np.max(np.abs(R_new - R))
R = R_new
if error < self.tolerance:
converged = True
break
if not converged:
return None, R, False, iterations, error
# Compute boundary probabilities
# pi_0 satisfies: pi_0 * (B0 + R*A2) = 0, sum(pi_0) * (I - R)^{-1} * e = 1
try:
B = qbd.B0 + R @ A2
# Find null space
eigvals, eigvecs = np.linalg.eig(B.T)
null_idx = np.argmin(np.abs(eigvals))
pi_0 = np.real(eigvecs[:, null_idx])
pi_0 = np.abs(pi_0)
# Normalize
I_minus_R_inv = np.linalg.inv(np.eye(n) - R)
e = np.ones(n)
norm_factor = pi_0 @ I_minus_R_inv @ e
if norm_factor > 0:
pi_0 = pi_0 / norm_factor
except np.linalg.LinAlgError:
return None, R, False, iterations, error
# Build stationary distribution up to truncation level
pi = [pi_0]
current = pi_0
for level in range(1, qbd.max_level + 1):
current = current @ R
if np.sum(current) < self.tolerance:
break
pi.append(current.copy())
return np.array(pi), R, converged, iterations, error
[docs]
def qsys_bmapphnn_retrial(
arrival_matrix: Dict[str, np.ndarray],
service_params: Dict[str, np.ndarray],
N: int,
retrial_params: Optional[Dict[str, float]] = None,
options: Optional[Dict] = None
) -> RetrialQueueResult:
"""
Analyze BMAP/PH/N/N bufferless retrial queue.
Implements the algorithm from Dudin et al., "Analysis of BMAP/PH/N-Type
Queueing System with Flexible Retrials Admission Control",
Mathematics 2025, 13(9), 1434.
Parameters:
arrival_matrix: Dict with 'D0', 'D1', ... for BMAP matrices.
D0: hidden transition matrix (V x V).
D1, ..., DK: arrival matrices for batch sizes 1, ..., K.
service_params: Dict with 'beta' (initial prob vector, 1xM) and
'S' (PH subgenerator matrix, MxM).
N: Number of servers (also capacity, hence bufferless).
retrial_params: Dict with 'alpha' (retrial rate per customer),
'gamma' (impatience/abandonment rate), 'p' (batch rejection
probability), 'R' (admission threshold, scalar or 1xV).
options: Dict with optional keys:
'MaxLevel': max orbit level for truncation (default: auto).
'Tolerance': convergence tolerance (default: 1e-10).
'Verbose': print progress (default: False).
Returns:
RetrialQueueResult with performance metrics.
References:
Dudin, A., Klimenok, V., & Vishnevsky, V. (2020).
Port from: matlab/src/api/qsys/qsys_bmapphnn_retrial.m
"""
if options is None:
options = {}
tol = options.get('Tolerance', 1e-10)
verbose = options.get('Verbose', False)
max_level_param = options.get('MaxLevel', None)
# --- Extract BMAP matrices D = [D0, D1, ..., DK] ---
D0 = np.atleast_2d(arrival_matrix.get('D0'))
D_list = [D0]
for i in range(1, 100):
Dk = arrival_matrix.get(f'D{i}')
if Dk is None:
break
D_list.append(np.atleast_2d(Dk))
K = len(D_list) - 1 # max batch size
V = D0.shape[0] # number of BMAP states
# Stationary distribution of fundamental process D^(1) = sum(D_k)
D1_gen = sum(D_list)
theta = _compute_stationary_vector(D1_gen)
# Mean arrival rate: lambda = theta * sum(k * D_k) * e
sum_k_Dk = np.zeros_like(D0)
for k in range(1, K + 1):
sum_k_Dk += k * D_list[k]
lam = float(theta @ sum_k_Dk @ np.ones(V))
# PH service parameters
beta = np.atleast_1d(service_params.get('beta', np.array([1.0]))).astype(float).flatten()
S = np.atleast_2d(service_params.get('S', np.array([[-1.0]]))).astype(float)
M_ph = S.shape[0]
S0 = -S @ np.ones(M_ph)
b1 = float(beta @ np.linalg.solve(-S, np.ones(M_ph))) # mean service time
# Retrial parameters
if retrial_params is None:
retrial_params = {}
alpha = float(retrial_params.get('alpha', 1.0))
gamma = float(retrial_params.get('gamma', 0.0))
p = float(retrial_params.get('p', 0.0))
R_param = retrial_params.get('R', N)
if np.isscalar(R_param):
R_vec = np.full(V, float(R_param))
else:
R_vec = np.asarray(R_param, dtype=float).flatten()
# T_n = C(n+M-1, M-1) = number of service states with n busy servers
from math import comb
T = np.array([comb(n + M_ph - 1, M_ph - 1) for n in range(N + 1)], dtype=int)
d = int(np.sum(T)) # total dimension per BMAP state
# Build state mapping (weak compositions)
state_map = [_generate_compositions(n, M_ph) for n in range(N + 1)]
# Truncation level
rho = lam * b1 / N if N > 0 else 0.0
if max_level_param is None:
trunc_level = max(100, int(np.ceil(50.0 / (1.0 - min(rho, 0.99)))))
else:
trunc_level = int(max_level_param)
if verbose:
print(f"Solving BMAP/PH/N/N retrial queue...")
print(f" V={V}, M={M_ph}, N={N}, K={K}")
print(f" d={d}, block size Vd={V * d}")
print(f" lambda={lam:.4f}, mu={1.0/b1 if b1 > 0 else float('inf'):.4f}")
print(f" Offered load rho={rho:.4f}")
print(f" Truncation level: {trunc_level}")
Vd = V * d
total_dim = (trunc_level + 1) * Vd
if verbose:
print(f"Total matrix dimension: {total_dim} x {total_dim}")
# Context object for helper functions
ctx = _RetrialCtx(D_list, beta, S, S0, M_ph, N, V, K, d, T, R_vec,
alpha, gamma, p, state_map)
# Build generator matrix
use_sparse = total_dim > 5000
if use_sparse:
from scipy.sparse import lil_matrix
Q = lil_matrix((total_dim, total_dim))
else:
Q = np.zeros((total_dim, total_dim))
for i in range(trunc_level + 1):
if verbose and i % 20 == 0:
print(f" Level {i}/{trunc_level}")
row_s = i * Vd
row_e = (i + 1) * Vd
for j in range(max(0, i - 1), min(trunc_level, i + K) + 1):
col_s = j * Vd
col_e = (j + 1) * Vd
Qij = _build_generator_level(ctx, i, j)
if use_sparse:
Q[row_s:row_e, col_s:col_e] = Qij
else:
Q[row_s:row_e, col_s:col_e] = Qij
# Ensure rows sum to zero (set diagonal)
if use_sparse:
Q = Q.tocsr()
Q_dense = Q.toarray()
else:
Q_dense = Q
for i in range(total_dim):
Q_dense[i, i] -= np.sum(Q_dense[i, :])
if verbose:
print("Solving linear system...")
# Solve pi * Q = 0, pi * e = 1
# Replace last column with ones for normalization
Q_dense[:, -1] = 1.0
b_vec = np.zeros(total_dim)
b_vec[-1] = 1.0
pi_flat = np.linalg.solve(Q_dense.T, b_vec)
# Reshape to level structure: (trunc_level+1, Vd)
pi = pi_flat.reshape(trunc_level + 1, Vd)
# Handle numerical issues
if np.any(pi < -1e-8):
pi[pi < 0] = 0.0
pi /= np.sum(pi)
# --- Compute performance measures ---
max_lvl = pi.shape[0] - 1
# Mean number in orbit
L_orbit = sum(i * np.sum(pi[i, :]) for i in range(1, max_lvl + 1))
# Mean number of busy servers
N_server = 0.0
for i in range(max_lvl + 1):
pi_level = pi[i, :]
for nu in range(V):
for n in range(N + 1):
offset = nu * d + _get_block_offset(T, n)
for t in range(T[n]):
idx = offset + t
if idx < Vd:
N_server += n * pi_level[idx]
# Probability all servers idle
P_idle = 0.0
for i in range(max_lvl + 1):
pi_level = pi[i, :]
for nu in range(V):
offset = nu * d # n=0 block starts at offset
P_idle += pi_level[offset]
# Probability orbit empty
P_empty_orbit = float(np.sum(pi[0, :]))
# Probability system empty
P_empty = 0.0
pi_level0 = pi[0, :]
for nu in range(V):
offset = nu * d
P_empty += pi_level0[offset]
result = RetrialQueueResult(
queue_type=QueueType.RETRIAL,
L_orbit=L_orbit,
N_server=N_server,
utilization=N_server / N if N > 0 else 0.0,
throughput=N_server / b1 if b1 > 0 else 0.0,
P_idle=P_idle,
P_empty_orbit=P_empty_orbit,
truncation_level=trunc_level,
converged=True,
error=0.0
)
if verbose:
print("Solution complete.")
return result
# ========== Retrial solver helpers ==========
class _RetrialCtx:
"""Context for retrial solver helper functions (matches MATLAB ctx struct)."""
__slots__ = ('D', 'beta', 'S', 'S0', 'M', 'N', 'V', 'K', 'd', 'T',
'R', 'alpha', 'gamma', 'p', 'state_map')
def __init__(self, D, beta, S, S0, M, N, V, K, d, T, R, alpha, gamma, p, state_map):
self.D = D
self.beta = beta
self.S = S
self.S0 = S0
self.M = M
self.N = N
self.V = V
self.K = K
self.d = d
self.T = T
self.R = R
self.alpha = alpha
self.gamma = gamma
self.p = p
self.state_map = state_map
def _compute_stationary_vector(Q: np.ndarray) -> np.ndarray:
"""Solve theta * Q = 0, theta * e = 1."""
n = Q.shape[0]
A = Q.T.copy()
A[-1, :] = 1.0
b = np.zeros(n)
b[-1] = 1.0
return np.linalg.solve(A, b)
def _generate_compositions(n: int, M: int) -> np.ndarray:
"""Generate all weak compositions of n into M parts (reverse lexicographic)."""
if M == 1:
return np.array([[n]])
from math import comb
num_comps = comb(n + M - 1, M - 1)
comps = np.zeros((num_comps, M), dtype=int)
idx = 0
for m1 in range(n, -1, -1):
sub = _generate_compositions(n - m1, M - 1)
num_sub = sub.shape[0]
comps[idx:idx + num_sub, 0] = m1
comps[idx:idx + num_sub, 1:] = sub
idx += num_sub
return comps
def _get_block_offset(T: np.ndarray, n: int) -> int:
"""Get starting index (0-based) for states with n busy servers."""
return int(np.sum(T[:n])) if n > 0 else 0
def _compute_L(ctx: _RetrialCtx, n: int) -> Optional[np.ndarray]:
"""Matrix L_n: service completion transitions (T_n x T_{n-1})."""
if n == 0:
return None
L = np.zeros((ctx.T[n], ctx.T[n - 1]))
comps_n = ctx.state_map[n]
comps_nm1 = ctx.state_map[n - 1]
for i in range(comps_n.shape[0]):
m = comps_n[i, :]
for l in range(ctx.M):
if m[l] > 0:
m_prime = m.copy()
m_prime[l] -= 1
for j in range(comps_nm1.shape[0]):
if np.array_equal(comps_nm1[j, :], m_prime):
L[i, j] += m[l] * ctx.S0[l]
break
return L
def _compute_A(ctx: _RetrialCtx, n: int) -> np.ndarray:
"""Matrix A_n: phase change transitions (T_n x T_n)."""
if n == 0:
return np.zeros((1, 1))
A = np.zeros((ctx.T[n], ctx.T[n]))
comps = ctx.state_map[n]
for i in range(comps.shape[0]):
m = comps[i, :]
for l in range(ctx.M):
if m[l] > 0:
for lp in range(ctx.M):
if lp != l and ctx.S[l, lp] > 0:
m_prime = m.copy()
m_prime[l] -= 1
m_prime[lp] += 1
for j in range(comps.shape[0]):
if np.array_equal(comps[j, :], m_prime):
A[i, j] += m[l] * ctx.S[l, lp]
break
return A
def _compute_P(ctx: _RetrialCtx, n: int) -> Optional[np.ndarray]:
"""Matrix P_n: new arrival transitions (T_n x T_{n+1})."""
if n >= ctx.N:
return None
P = np.zeros((ctx.T[n], ctx.T[n + 1]))
comps_n = ctx.state_map[n]
comps_np1 = ctx.state_map[n + 1]
for i in range(comps_n.shape[0]):
m = comps_n[i, :]
for l in range(ctx.M):
if ctx.beta[l] > 0:
m_prime = m.copy()
m_prime[l] += 1
for j in range(comps_np1.shape[0]):
if np.array_equal(comps_np1[j, :], m_prime):
P[i, j] += ctx.beta[l]
break
return P
def _compute_Delta(ctx: _RetrialCtx, n: int) -> np.ndarray:
"""Diagonal matrix Delta_n: exit rates (T_n x T_n)."""
if n == 0:
return np.zeros((1, 1))
comps = ctx.state_map[n]
diag_vals = np.zeros(ctx.T[n])
for i in range(comps.shape[0]):
m = comps[i, :]
diag_vals[i] = sum(m[l] * (-ctx.S[l, l]) for l in range(ctx.M))
return np.diag(diag_vals)
def _compute_Gamma(ctx: _RetrialCtx, nu: int) -> np.ndarray:
"""Diagonal matrix Gamma^(nu): 0 for n<=R_nu, 1 for n>R_nu."""
diag_vals = np.zeros(ctx.d)
offset = 0
for n in range(ctx.N + 1):
if n > ctx.R[nu]:
diag_vals[offset:offset + ctx.T[n]] = 1.0
offset += ctx.T[n]
return np.diag(diag_vals)
def _compute_G_nn(ctx: _RetrialCtx, n: int, nu: int, nu_prime: int) -> np.ndarray:
"""G_{n,n}^{(nu,nu')} matrix for batch losses."""
if n <= ctx.N - ctx.K:
return np.zeros((ctx.T[n], ctx.T[n]))
total = 0.0
for k in range(ctx.N - n + 1, ctx.K + 1):
if 1 <= k <= ctx.K:
total += ctx.D[k][nu, nu_prime]
return ctx.p * total * np.eye(ctx.T[n])
def _compute_B(ctx: _RetrialCtx, nu: int) -> np.ndarray:
"""Block matrix B^(nu) of size d x d."""
B = np.zeros((ctx.d, ctx.d))
L_mats = [_compute_L(ctx, n) for n in range(ctx.N + 1)]
A_mats = [_compute_A(ctx, n) for n in range(ctx.N + 1)]
P_mats = [_compute_P(ctx, n) for n in range(ctx.N + 1)]
Delta_mats = [_compute_Delta(ctx, n) for n in range(ctx.N + 1)]
for n in range(ctx.N + 1):
rs = _get_block_offset(ctx.T, n)
re = rs + ctx.T[n]
# Diagonal block
G_nn = _compute_G_nn(ctx, n, nu, nu)
if n == 0:
B[rs, rs] = G_nn[0, 0] if G_nn.size > 0 else 0.0
else:
B[rs:re, rs:re] = A_mats[n] + Delta_mats[n] + G_nn
# Subdiagonal block
if n >= 1 and L_mats[n] is not None:
cs = _get_block_offset(ctx.T, n - 1)
ce = cs + ctx.T[n - 1]
B[rs:re, cs:ce] = L_mats[n]
# Superdiagonal blocks
for k in range(1, ctx.K + 1):
if n + k <= ctx.N:
cs = _get_block_offset(ctx.T, n + k)
ce = cs + ctx.T[n + k]
D_k_nu_nu = ctx.D[k][nu, nu]
Pprod = np.eye(ctx.T[n])
for j in range(n, n + k):
if j < ctx.N and P_mats[j] is not None:
Pprod = Pprod @ P_mats[j]
B[rs:re, cs:ce] = D_k_nu_nu * Pprod
return B
def _compute_Bbar(ctx: _RetrialCtx, nu: int) -> np.ndarray:
"""B_bar^(nu) matrix for successful retrials."""
Bbar = np.zeros((ctx.d, ctx.d))
for n in range(min(int(ctx.R[nu]), ctx.N - 1) + 1):
rs = _get_block_offset(ctx.T, n)
re = rs + ctx.T[n]
cs = _get_block_offset(ctx.T, n + 1)
ce = cs + ctx.T[n + 1]
P_n = _compute_P(ctx, n)
if P_n is not None:
Bbar[rs:re, cs:ce] = P_n
return Bbar
def _compute_Btilde(ctx: _RetrialCtx, nu: int, nu_prime: int) -> np.ndarray:
"""B_tilde^(nu, nu') for BMAP state transitions."""
Bt = np.zeros((ctx.d, ctx.d))
P_mats = [_compute_P(ctx, n) for n in range(ctx.N + 1)]
for n in range(ctx.N + 1):
rs = _get_block_offset(ctx.T, n)
re = rs + ctx.T[n]
# Diagonal block
G_nn = _compute_G_nn(ctx, n, nu, nu_prime)
Bt[rs:re, rs:re] = G_nn
# Superdiagonal blocks
for k in range(1, ctx.K + 1):
if n + k <= ctx.N:
cs = _get_block_offset(ctx.T, n + k)
ce = cs + ctx.T[n + k]
D_k = ctx.D[k][nu, nu_prime]
Pprod = np.eye(ctx.T[n])
for j in range(n, n + k):
if j < ctx.N and P_mats[j] is not None:
Pprod = Pprod @ P_mats[j]
Bt[rs:re, cs:ce] = D_k * Pprod
return Bt
def _compute_C(ctx: _RetrialCtx, n: int, k: int, nu: int, nu_prime: int) -> Optional[np.ndarray]:
"""C_{n,k}^(nu, nu') for partial batch admission to orbit."""
if n < ctx.N - ctx.K + k:
return np.zeros((ctx.T[n], ctx.T[ctx.N]))
elif n < ctx.N:
batch_size = ctx.N - n + k
if 1 <= batch_size <= ctx.K:
D_batch = ctx.D[batch_size][nu, nu_prime]
P_mats = [_compute_P(ctx, nn) for nn in range(ctx.N + 1)]
Pprod = np.eye(ctx.T[n])
for j in range(n, ctx.N):
if P_mats[j] is not None:
Pprod = Pprod @ P_mats[j]
return (1 - ctx.p) * D_batch * Pprod
else:
return np.zeros((ctx.T[n], ctx.T[ctx.N]))
else: # n == N
if 1 <= k <= ctx.K:
D_k = ctx.D[k][nu, nu_prime]
return (1 - ctx.p) * D_k * np.eye(ctx.T[ctx.N])
else:
return np.zeros((ctx.T[ctx.N], ctx.T[ctx.N]))
def _build_generator_level(ctx: _RetrialCtx, i: int, j: int) -> np.ndarray:
"""Build generator block Q_{i,j}."""
Vd = ctx.V * ctx.d
Qij = np.zeros((Vd, Vd))
if j < max(0, i - 1) or j > i + ctx.K:
return Qij
# Precompute per-BMAP-state matrices
B_mats = [_compute_B(ctx, nu) for nu in range(ctx.V)]
Bbar_mats = [_compute_Bbar(ctx, nu) for nu in range(ctx.V)]
Gamma_mats = [_compute_Gamma(ctx, nu) for nu in range(ctx.V)]
if i == j: # Diagonal block
for nu in range(ctx.V):
rs = nu * ctx.d
re = (nu + 1) * ctx.d
for nu_prime in range(ctx.V):
cs = nu_prime * ctx.d
ce = (nu_prime + 1) * ctx.d
if nu == nu_prime:
D0_nn = ctx.D[0][nu, nu]
block = (D0_nn * np.eye(ctx.d) + B_mats[nu]
- i * (ctx.gamma + ctx.alpha) * np.eye(ctx.d)
+ i * ctx.alpha * Gamma_mats[nu])
Qij[rs:re, cs:ce] = block
else:
Bt = _compute_Btilde(ctx, nu, nu_prime)
D0_nn_p = ctx.D[0][nu, nu_prime]
Qij[rs:re, cs:ce] = Bt + D0_nn_p * np.eye(ctx.d)
elif j == i - 1 and i >= 1: # Subdiagonal block
for nu in range(ctx.V):
rs = nu * ctx.d
re = (nu + 1) * ctx.d
cs = rs
ce = re
block = i * ctx.gamma * np.eye(ctx.d) + i * ctx.alpha * Bbar_mats[nu]
Qij[rs:re, cs:ce] = block
elif j > i and j <= i + ctx.K: # Superdiagonal blocks
k = j - i
for nu in range(ctx.V):
rs = nu * ctx.d
re = (nu + 1) * ctx.d
for nu_prime in range(ctx.V):
cs = nu_prime * ctx.d
ce = (nu_prime + 1) * ctx.d
block = np.zeros((ctx.d, ctx.d))
for n in range(ctx.N + 1):
C_nk = _compute_C(ctx, n, k, nu, nu_prime)
if C_nk is not None and np.any(C_nk != 0):
n_rs = _get_block_offset(ctx.T, n)
n_re = n_rs + ctx.T[n]
N_cs = _get_block_offset(ctx.T, ctx.N)
N_ce = N_cs + ctx.T[ctx.N]
if C_nk.shape[1] == ctx.T[ctx.N]:
block[n_rs:n_re, N_cs:N_ce] = C_nk
Qij[rs:re, cs:ce] = block
return Qij
[docs]
@dataclass
class RetrialInfo:
"""Information about a valid retrial queue topology."""
is_retrial: bool
station_idx: Optional[int] = None
node_idx: Optional[int] = None
source_idx: Optional[int] = None
class_idx: Optional[int] = None
error_msg: str = ''
N: Optional[int] = None # Number of servers
alpha: float = 0.1 # Retrial rate (default)
gamma: float = 0.0 # Orbit impatience (default 0)
p: float = 0.0 # Batch rejection prob (default 0)
R: Optional[int] = None # Admission threshold
[docs]
def qsys_is_retrial(sn: Any) -> Tuple[bool, RetrialInfo]:
"""
Check if network is a valid BMAP/PH/N/N bufferless retrial queue.
Validates that the network structure matches the requirements for
the BMAP/PH/N/N retrial queue solver:
- Single bufferless queue (capacity == number of servers)
- Retrial drop strategy configured
- BMAP/MAP arrival process at source
- PH/Exp service at queue
- Open class model
Based on: Dudin et al., "Analysis of BMAP/PH/N-Type Queueing System with
Flexible Retrials Admission Control", Mathematics 2025, 13(9), 1434.
Args:
sn: NetworkStruct object
Returns:
Tuple of (is_retrial, retrial_info) where:
is_retrial: True if network is valid BMAP/PH/N/N retrial topology
retrial_info: RetrialInfo with parameters for the retrial solver
References:
Original MATLAB: matlab/src/api/qsys/qsys_is_retrial.m
"""
from ..sn import sn_is_open_model, NodeType, DropStrategy
ret_info = RetrialInfo(is_retrial=False)
# Check if model is open
try:
if not sn_is_open_model(sn):
ret_info.error_msg = 'BMAP/PH/N/N retrial solver requires open queueing model.'
return False, ret_info
except:
ret_info.error_msg = 'Could not determine if model is open.'
return False, ret_info
# Check for single class (current limitation)
if sn.nclasses > 1:
ret_info.error_msg = 'BMAP/PH/N/N retrial solver currently supports single class only.'
return False, ret_info
ret_info.class_idx = 0
# Find bufferless queue stations (capacity == nservers, finite capacity)
bufferless_stations = []
for ist in range(sn.nstations):
node_idx = sn.stationToNode[ist] if hasattr(sn, 'stationToNode') else ist
if node_idx < len(sn.nodetype) and sn.nodetype[node_idx] == NodeType.QUEUE:
cap = sn.cap[ist] if hasattr(sn, 'cap') and sn.cap is not None else np.inf
nservers = sn.nservers[ist] if hasattr(sn, 'nservers') and sn.nservers is not None else 1
if np.isfinite(cap) and cap == nservers:
bufferless_stations.append(ist)
if not bufferless_stations:
ret_info.error_msg = 'No bufferless queue found (capacity must equal number of servers).'
return False, ret_info
# Check retrial drop strategy
retrial_station = None
for ist in bufferless_stations:
if hasattr(sn, 'droprule') and sn.droprule is not None:
droprule = sn.droprule[ist, :] if sn.droprule.ndim > 1 else sn.droprule
# Check for RETRIAL or RETRIAL_WITH_LIMIT drop strategy
has_retrial = False
try:
has_retrial = any(
dr == DropStrategy.RETRIAL or dr == DropStrategy.RETRIAL_WITH_LIMIT
for dr in droprule
)
except:
# If DropStrategy comparison fails, try numeric comparison
has_retrial = any(dr in [6, 7] for dr in droprule) # Assuming 6,7 are retrial codes
if has_retrial:
retrial_station = ist
break
if retrial_station is None:
ret_info.error_msg = 'No retrial drop strategy configured on bufferless queue.'
return False, ret_info
ret_info.station_idx = retrial_station
ret_info.node_idx = sn.stationToNode[retrial_station] if hasattr(sn, 'stationToNode') else retrial_station
ret_info.N = int(sn.nservers[retrial_station]) if hasattr(sn, 'nservers') and sn.nservers is not None else 1
# Find source station
source_station = None
for ist in range(sn.nstations):
node_idx = sn.stationToNode[ist] if hasattr(sn, 'stationToNode') else ist
if node_idx < len(sn.nodetype) and sn.nodetype[node_idx] == NodeType.SOURCE:
source_station = ist
break
if source_station is None:
ret_info.error_msg = 'No Source node found.'
return False, ret_info
ret_info.source_idx = source_station
# Validate arrival process (must be MAP/BMAP)
if hasattr(sn, 'proc') and sn.proc is not None:
try:
arrival_proc = sn.proc[source_station][ret_info.class_idx]
if arrival_proc is None or not isinstance(arrival_proc, (list, tuple)) or len(arrival_proc) < 2:
ret_info.error_msg = 'Invalid arrival process at source.'
return False, ret_info
except (IndexError, TypeError):
ret_info.error_msg = 'Invalid arrival process at source.'
return False, ret_info
# Validate service process (must be PH/Exp)
if hasattr(sn, 'proc') and sn.proc is not None:
try:
service_proc = sn.proc[retrial_station][ret_info.class_idx]
if service_proc is None or not isinstance(service_proc, (list, tuple)) or len(service_proc) < 2:
ret_info.error_msg = 'Invalid service process at queue.'
return False, ret_info
except (IndexError, TypeError):
ret_info.error_msg = 'Invalid service process at queue.'
return False, ret_info
# Set default admission threshold
ret_info.R = ret_info.N - 1
# Check for FCR (Finite Capacity Region) containing this queue
if hasattr(sn, 'region') and sn.region is not None and hasattr(sn, 'nregions') and sn.nregions > 0:
for f in range(sn.nregions):
if f < len(sn.region) and sn.region[f] is not None:
region_matrix = sn.region[f]
if retrial_station < region_matrix.shape[0]:
global_cap_col = region_matrix.shape[1] - 1
if region_matrix[retrial_station, global_cap_col] > 0:
R_fcr = int(region_matrix[retrial_station, global_cap_col])
if R_fcr <= ret_info.N - 1:
ret_info.R = R_fcr
break
# All validations passed
ret_info.is_retrial = True
return True, ret_info
[docs]
@dataclass
class RenegingInfo:
"""Information about a valid reneging queue topology."""
is_reneging: bool
source_idx: Optional[int] = None
queue_idx: Optional[int] = None
class_idx: Optional[int] = None
n_servers: Optional[int] = None
service_rate: Optional[float] = None
error_msg: str = ''
[docs]
def detect_reneging_topology(sn: Any) -> Tuple[bool, RenegingInfo]:
"""
Detect if model is suitable for MAP/M/s+G (MAPMsG) reneging solver.
Requirements:
- Open model, single class
- Single queue station with reneging/patience configured
- MAP/BMAP arrival at source
- Exponential service at queue (single-phase PH)
- FCFS scheduling
Args:
sn: NetworkStruct object
Returns:
Tuple of (is_reneging, reneging_info)
References:
Original MATLAB: matlab/src/solvers/MAM/solver_mam_retrial.m (detectRenegingTopology)
"""
from ..sn import sn_is_open_model, NodeType
from ...lang.base import ImpatienceType, SchedStrategy
info = RenegingInfo(is_reneging=False)
# Check open model
try:
if not sn_is_open_model(sn):
info.error_msg = 'MAPMsG requires open queueing model.'
return False, info
except Exception:
info.error_msg = 'Could not determine if model is open.'
return False, info
# Check single class (current limitation)
if sn.nclasses > 1:
info.error_msg = 'MAPMsG currently supports single class only.'
return False, info
info.class_idx = 0
# Find source and queue stations
source_idx = None
queue_idx = None
for ist in range(sn.nstations):
node_idx = sn.stationToNode[ist] if hasattr(sn, 'stationToNode') else ist
if node_idx < len(sn.nodetype):
if sn.nodetype[node_idx] == NodeType.SOURCE:
source_idx = ist
elif sn.nodetype[node_idx] == NodeType.QUEUE:
if queue_idx is not None:
info.error_msg = 'MAPMsG requires single queue station.'
return False, info
queue_idx = ist
if source_idx is None:
info.error_msg = 'No Source node found.'
return False, info
if queue_idx is None:
info.error_msg = 'No Queue node found.'
return False, info
info.source_idx = source_idx
info.queue_idx = queue_idx
# Check for reneging patience configuration
if not hasattr(sn, 'impatienceClass') or sn.impatienceClass is None:
info.error_msg = 'No patience/impatience configuration found.'
return False, info
try:
imp_val = sn.impatienceClass[queue_idx, info.class_idx]
if imp_val != ImpatienceType.RENEGING:
info.error_msg = 'Queue does not have reneging configured.'
return False, info
except (IndexError, TypeError):
info.error_msg = 'Queue does not have reneging configured.'
return False, info
# Check patience distribution exists
if not hasattr(sn, 'patienceProc') or sn.patienceProc is None:
info.error_msg = 'No patience distribution found.'
return False, info
try:
pat_proc = sn.patienceProc[queue_idx, info.class_idx] if hasattr(sn.patienceProc, '__getitem__') else None
if pat_proc is None or (isinstance(pat_proc, (list, tuple)) and len(pat_proc) == 0):
info.error_msg = 'No patience distribution for this class.'
return False, info
except (IndexError, TypeError, KeyError):
info.error_msg = 'No patience distribution for this class.'
return False, info
# Check FCFS scheduling
sched_val = sn.sched.get(queue_idx, None) if isinstance(sn.sched, dict) else (
sn.sched[queue_idx] if queue_idx < len(sn.sched) else None
)
if sched_val is not None:
sched_id = sched_val.value if hasattr(sched_val, 'value') else int(sched_val)
if sched_id != SchedStrategy.FCFS:
info.error_msg = 'MAPMsG requires FCFS scheduling.'
return False, info
# Check exponential service (single-phase)
try:
service_proc = sn.proc[queue_idx][info.class_idx]
if service_proc is None or not isinstance(service_proc, (list, tuple)) or len(service_proc) < 2:
info.error_msg = 'Invalid service process.'
return False, info
D0 = np.atleast_2d(service_proc[0])
if D0.shape[0] != 1:
info.error_msg = 'MAPMsG requires exponential service (single-phase).'
return False, info
info.service_rate = -float(D0[0, 0])
except (IndexError, TypeError):
info.error_msg = 'Invalid service process.'
return False, info
info.n_servers = int(sn.nservers[queue_idx]) if hasattr(sn, 'nservers') and sn.nservers is not None else 1
# Check MAP arrival process
try:
arrival_proc = sn.proc[source_idx][info.class_idx]
if arrival_proc is None or not isinstance(arrival_proc, (list, tuple)) or len(arrival_proc) < 2:
info.error_msg = 'Invalid arrival process.'
return False, info
except (IndexError, TypeError):
info.error_msg = 'Invalid arrival process.'
return False, info
# All checks passed
info.is_reneging = True
return True, info
[docs]
def has_reneging_patience(sn: Any) -> bool:
"""
Check if model has reneging/patience configured on any queue station.
Returns True if any queue station has ImpatienceType.RENEGING
configured with a patience distribution.
Args:
sn: NetworkStruct object
Returns:
True if reneging patience is configured
References:
Original MATLAB: matlab/src/solvers/MAM/solver_mam_analyzer.m (hasRenegingPatience)
"""
from ...lang.base import ImpatienceType
if not hasattr(sn, 'impatienceClass') or sn.impatienceClass is None:
return False
if not hasattr(sn, 'patienceProc') or sn.patienceProc is None:
return False
for ist in range(sn.nstations):
for r in range(sn.nclasses):
try:
if sn.impatienceClass[ist, r] == ImpatienceType.RENEGING:
pat_proc = sn.patienceProc[ist, r] if hasattr(sn.patienceProc, '__getitem__') else None
if pat_proc is not None:
return True
except (IndexError, TypeError, KeyError):
continue
return False
[docs]
def convert_patience_to_regimes(
patience_proc: Any,
options: Optional[Dict] = None
) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Convert patience distribution to piecewise-constant abandonment regimes for MAPMsG.
Converts a patience distribution (in MAP/PH format) to boundary levels
and abandonment function values for the MRMFQ solver.
Args:
patience_proc: Patience distribution from sn.patienceProc[station, class]
options: Dict with optional 'mapmsg_quantization' key (default 11)
Returns:
Tuple of (boundary_levels, ga, quantization) where:
boundary_levels: Array of regime boundary time points
ga: Array of abandonment probabilities at each regime
quantization: Number of regimes
References:
Original MATLAB: matlab/src/solvers/MAM/solver_mam_retrial.m (convertPatienceToRegimes)
"""
if options is None:
options = {}
quantization = options.get('mapmsg_quantization', 11)
# Extract patience rate from the distribution
# For exponential patience Exp(gamma): patienceProc = {-gamma, gamma}
gamma = 0.1 # default
if isinstance(patience_proc, (list, tuple)) and len(patience_proc) >= 2:
D0 = np.atleast_2d(patience_proc[0])
gamma = -float(D0[0, 0])
elif isinstance(patience_proc, dict):
gamma = patience_proc.get('rate', 0.1)
# Generate boundary levels and abandonment probabilities
max_time = 10.0
boundary_levels = np.linspace(0, max_time, quantization)
# Compute abandonment probability at each boundary
# ga(k) = F(BoundaryLevels(k)) for piecewise constant approximation
ga = np.zeros(quantization + 1)
ga[0] = 0.0 # No abandonment at time 0
for k in range(quantization):
midpoint = (boundary_levels[k] + boundary_levels[min(k + 1, quantization - 1)]) / 2
if k == 0:
midpoint = boundary_levels[k] / 2
ga[k + 1] = 1.0 - np.exp(-gamma * midpoint)
return boundary_levels, ga, quantization
[docs]
def solver_mam_retrial(sn: Any, options: Optional[Dict] = None) -> Tuple[
np.ndarray, np.ndarray, np.ndarray, np.ndarray,
np.ndarray, np.ndarray, int
]:
"""
Solve queueing models with customer impatience (retrial or reneging).
Dispatches to either:
1. RETRIAL: BMAP/PH/N/N bufferless retrial solver
2. RENEGING: MAP/M/s+G solver (MAPMsG)
Args:
sn: NetworkStruct object
options: Solver options dict with optional keys:
'iter_max': Maximum truncation level (default 150)
'tol': Convergence tolerance (default 1e-10)
'verbose': Print progress messages (default False)
'config': Dict with 'mapmsg_quantization' (default 11)
Returns:
Tuple of (QN, UN, RN, TN, CN, XN, totiter) where:
QN: (M, K) queue lengths
UN: (M, K) server utilizations
RN: (M, K) response times
TN: (M, K) throughputs
CN: (1, K) cycle times
XN: (1, K) system throughputs
totiter: iteration/truncation level count
References:
Original MATLAB: matlab/src/solvers/MAM/solver_mam_retrial.m
"""
if options is None:
options = {}
# Check for reneging (queue abandonment) first
is_reneging, reneging_info = detect_reneging_topology(sn)
if is_reneging:
return _solve_reneging(sn, options, reneging_info)
# Check for retrial topology
is_retrial, ret_info = qsys_is_retrial(sn)
if not is_retrial:
raise ValueError(
'No valid impatience configuration detected (retrial or reneging). '
+ ret_info.error_msg
)
# Initialize output arrays
M = sn.nstations
K = sn.nclasses
QN = np.zeros((M, K))
UN = np.zeros((M, K))
RN = np.zeros((M, K))
TN = np.zeros((M, K))
CN = np.zeros((1, K))
XN = np.zeros((1, K))
# Extract indices
source_idx = ret_info.source_idx
queue_idx = ret_info.station_idx
class_idx = ret_info.class_idx
N = ret_info.N
# Extract arrival process (BMAP) from source
arrival_proc = sn.proc[source_idx][class_idx]
D = extract_bmap_matrices(arrival_proc)
if D is None:
raise ValueError('Could not extract BMAP matrices from arrival process.')
# Extract PH service distribution from queue
service_proc = sn.proc[queue_idx][class_idx]
beta, S = extract_ph_params(service_proc)
if beta is None or S is None:
raise ValueError('Could not extract PH parameters from service process.')
# Extract retrial rate alpha from network configuration
alpha = 0.1 # default
if hasattr(sn, 'retrialDelays') and sn.retrialDelays is not None:
try:
retrial_dist = sn.retrialDelays[queue_idx, class_idx] if hasattr(sn.retrialDelays, '__getitem__') else None
if retrial_dist is None:
retrial_dist = sn.retrialDelays[queue_idx][class_idx]
if retrial_dist is not None and isinstance(retrial_dist, (list, tuple)):
alpha = -float(np.atleast_2d(retrial_dist[1])[0, 0])
except (IndexError, TypeError, KeyError):
pass
# Extract orbit impatience gamma (default 0)
gamma = ret_info.gamma
if hasattr(sn, 'orbitImpatience') and sn.orbitImpatience is not None:
try:
imp_dist = sn.orbitImpatience[queue_idx, class_idx] if hasattr(sn.orbitImpatience, '__getitem__') else None
if imp_dist is None:
imp_dist = sn.orbitImpatience[queue_idx][class_idx]
if imp_dist is not None and isinstance(imp_dist, (list, tuple)):
gamma = -float(np.atleast_2d(imp_dist[1])[0, 0])
except (IndexError, TypeError, KeyError):
pass
# Extract batch rejection probability p (default 0)
p = ret_info.p
if hasattr(sn, 'batchRejectProb') and sn.batchRejectProb is not None:
try:
p = float(sn.batchRejectProb[queue_idx, class_idx])
except (IndexError, TypeError):
pass
# Extract admission threshold R (from FCR or default N-1)
R = ret_info.R
# Solve with options
max_level = options.get('iter_max', 150)
tol = options.get('tol', 1e-10)
verbose = options.get('verbose', False)
# Build arrival_matrix dict for qsys_bmapphnn_retrial
arrival_matrix = {}
for i, Di in enumerate(D):
arrival_matrix[f'D{i}'] = Di
service_params = {'beta': beta, 'S': S}
retrial_params = {'alpha': alpha, 'gamma': gamma, 'p': p, 'R': R}
solver_options = {'MaxLevel': max_level, 'Tolerance': tol, 'Verbose': verbose}
perf = qsys_bmapphnn_retrial(arrival_matrix, service_params, N,
retrial_params, solver_options)
# Map to LINE output format
# Queue length includes both orbit and servers
QN[queue_idx, class_idx] = perf.L_orbit + perf.N_server
UN[queue_idx, class_idx] = perf.utilization
TN[queue_idx, class_idx] = perf.throughput
# Response time via Little's law
if perf.throughput > 0:
RN[queue_idx, class_idx] = QN[queue_idx, class_idx] / perf.throughput
else:
RN[queue_idx, class_idx] = np.inf
# System-level metrics
XN[0, class_idx] = perf.throughput
CN[0, class_idx] = RN[queue_idx, class_idx]
# Return iteration count (truncation level used)
totiter = perf.truncation_level
return QN, UN, RN, TN, CN, XN, totiter
def _solve_reneging(
sn: Any,
options: Dict,
info: RenegingInfo
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray,
np.ndarray, np.ndarray, int]:
"""
Solve MAP/M/s+G queue using MRMFQ-based approach (MAPMsG).
Builds the MRMFQ (Multi-Regime Markov Fluid Queue) matrices for the
reneging queue and solves for steady-state metrics.
Note: The full MRMFQ solver (MRMFQSolver) requires a specialized numerical
engine. This implementation builds the MRMFQ matrices and performs the
steady-state analysis. If the MRMFQ solver is not available, a simplified
fluid approximation is used.
Args:
sn: NetworkStruct object
options: Solver options dict
info: RenegingInfo with topology parameters
Returns:
Tuple of (QN, UN, RN, TN, CN, XN, totiter)
References:
O. Gursoy, K. A. Mehr, N. Akar, "The MAP/M/s + G Call Center Model
with Generally Distributed Patience Times"
Original MATLAB: matlab/src/solvers/MAM/solver_mam_retrial.m (solveReneging)
"""
from scipy.linalg import expm
M_stations = sn.nstations
K = sn.nclasses
QN = np.zeros((M_stations, K))
UN = np.zeros((M_stations, K))
RN = np.zeros((M_stations, K))
TN = np.zeros((M_stations, K))
CN = np.zeros((1, K))
XN = np.zeros((1, K))
source_idx = info.source_idx
queue_idx = info.queue_idx
class_idx = info.class_idx
# Extract MAP arrival process matrices (C, D in MAPMsG notation)
arrival_proc = sn.proc[source_idx][class_idx]
C = np.atleast_2d(arrival_proc[0]) # D0 (subgenerator)
D_arr = np.atleast_2d(arrival_proc[1]) # D1 (arrival transitions)
MAPSIZE = C.shape[0]
# Extract service rate and server count
mu = info.service_rate
SERVERSIZE = info.n_servers
# Extract patience distribution and convert to regimes
patience_proc = sn.patienceProc[queue_idx, class_idx] if hasattr(sn.patienceProc, '__getitem__') else sn.patienceProc[queue_idx][class_idx]
config = options.get('config', {}) if isinstance(options, dict) else {}
boundary_levels, ga, QUANTIZATION = convert_patience_to_regimes(patience_proc, config)
# Build MRMFQ matrices following MAPMsGCompiler.m logic
I = np.eye(MAPSIZE)
em = np.ones((MAPSIZE, 1))
lmap = D_arr @ em # Arrival rate vector
# Initialize Qy0 (boundary generator at level 0)
dim0 = (SERVERSIZE + 1) * MAPSIZE
Qy0 = np.zeros((dim0, dim0))
for row in range(1, SERVERSIZE + 2): # 1-indexed rows matching MATLAB
rs = (row - 1) * MAPSIZE
re = row * MAPSIZE
if row == 1:
Qy0[rs:re, rs:re] = C
Qy0[rs:re, re:re + MAPSIZE] = D_arr
elif row == SERVERSIZE + 1:
Qy0[rs:re, rs:re] = -(row - 1) * mu * I
Qy0[rs:re, rs - MAPSIZE:rs] = (row - 1) * mu * I
else:
Qy0[rs:re, rs:re] = C - (row - 1) * mu * I
Qy0[rs:re, rs - MAPSIZE:rs] = (row - 1) * mu * I
Qy0[rs:re, re:re + MAPSIZE] = D_arr
# Initialize Qy for each regime (with abandonment)
Qy = np.zeros((dim0, dim0, QUANTIZATION))
for regimecount in range(QUANTIZATION):
# Only rows SERVERSIZE and SERVERSIZE+1 have regime-dependent entries
# (0-indexed: rows SERVERSIZE-1 and SERVERSIZE)
row_s = SERVERSIZE # MATLAB row = SERVERSIZE (1-indexed)
rs = (row_s - 1) * MAPSIZE
re = row_s * MAPSIZE
# Row = SERVERSIZE (1-indexed)
ga_val = ga[regimecount + 1]
Qy[rs:re, rs:re, regimecount] = ga_val * D_arr + C
Qy[rs:re, re:re + MAPSIZE, regimecount] = (1.0 - ga_val) * D_arr
# Row = SERVERSIZE+1 (1-indexed)
row_s2 = SERVERSIZE + 1
rs2 = (row_s2 - 1) * MAPSIZE
re2 = row_s2 * MAPSIZE
Qy[rs2:re2, rs2:re2, regimecount] = -(row_s2 - 1) * mu * I
Qy[rs2:re2, rs2 - MAPSIZE:rs2, regimecount] = (row_s2 - 1) * mu * I
# Build drift matrices
Rydiag = -np.ones(dim0)
Rydiag[dim0 - MAPSIZE:dim0] = 1.0 # Last MAPSIZE entries are +1
Ry = np.diag(Rydiag)
ydriftregimes = np.tile(Rydiag, (QUANTIZATION, 1))
Ryregimes = np.tile(Ry, (1, 1)).reshape(dim0, dim0, 1)
Ryregimes = np.repeat(Ryregimes, QUANTIZATION, axis=2)
# Combine boundaries and regimes
Qybounds = np.concatenate([Qy0[:, :, np.newaxis], Qy], axis=2)
ydriftbounds = np.concatenate([Rydiag[np.newaxis, :], ydriftregimes], axis=0)
# Prepare boundary levels (remove first, add large value at end)
B = list(boundary_levels)
if len(B) > 0:
B.pop(0)
B.append(10000000.0)
# Attempt to call MRMFQ solver
# The MRMFQ solver is a specialized numerical engine for multi-regime
# Markov fluid queues. If not available, use simplified fluid approximation.
try:
coefficients, boundaries, Lzeromulti, Lnegmulti, Lposmulti, Anegmulti, Aposmulti = \
_mrmfq_solver(Qy, Qybounds, ydriftregimes, ydriftbounds, B)
# Compute steady-state results following MAPMsGCompiler.m
zeromass = boundaries[0]
# Compute integrals for each regime
integral = np.zeros((QUANTIZATION, len(zeromass)))
waitintegral = np.zeros((QUANTIZATION, len(zeromass)))
abandonintegral = np.zeros((QUANTIZATION, len(zeromass)))
for d_idx in range(QUANTIZATION):
if d_idx == 0:
prev_B = 0.0
else:
prev_B = B[d_idx - 1]
Lz = Lzeromulti[d_idx]
Ln = Lnegmulti[d_idx]
Lp = Lposmulti[d_idx]
An = Anegmulti[d_idx]
Ap = Aposmulti[d_idx]
coef = coefficients[d_idx]
delta_B = B[d_idx] - prev_B
integrand = np.concatenate([
Lz * delta_B,
np.linalg.solve(An, expm(An * delta_B) - np.eye(An.shape[0])) @ Ln,
np.linalg.solve(Ap, np.eye(Ap.shape[0]) - expm(-Ap * delta_B)) @ Lp,
])
integral[d_idx, :] = coef @ integrand
waitintegral[d_idx, :] = ((B[d_idx] + prev_B) / 2) * (1 - ga[d_idx + 1]) * coef @ integrand
abandonintegral[d_idx, :] = ga[d_idx + 1] * coef @ integrand
# Map integrals to arrival rates
normalization = SERVERSIZE * MAPSIZE
IntegralMapped = np.zeros_like(integral)
AbandonIntegralMapped = np.zeros_like(integral)
WaitIntegralMapped = np.zeros_like(integral)
ZeroMassMapped = np.zeros(len(zeromass))
for r in range(len(zeromass)):
lmap_idx = r % MAPSIZE
IntegralMapped[:, r] = integral[:, r] * lmap[lmap_idx, 0]
AbandonIntegralMapped[:, r] = abandonintegral[:, r] * lmap[lmap_idx, 0]
WaitIntegralMapped[:, r] = waitintegral[:, r] * lmap[lmap_idx, 0]
ZeroMassMapped[r] = zeromass[r] * lmap[lmap_idx, 0]
# Compute performance metrics
totalMass = np.sum(ZeroMassMapped) + np.sum(IntegralMapped[:, :normalization])
AbandonProb = np.sum(AbandonIntegralMapped[:, :normalization]) / totalMass
ExpectedWait = np.sum(WaitIntegralMapped[:, :normalization]) / (totalMass * (1 - AbandonProb))
# Compute arrival rate
lambda_arr = float(np.sum(lmap))
# Map to LINE output format
throughput = lambda_arr * (1 - AbandonProb)
TN[queue_idx, class_idx] = throughput
UN[queue_idx, class_idx] = throughput / (mu * SERVERSIZE)
RN[queue_idx, class_idx] = ExpectedWait + 1.0 / mu
QN[queue_idx, class_idx] = throughput * RN[queue_idx, class_idx]
XN[0, class_idx] = throughput
CN[0, class_idx] = RN[queue_idx, class_idx]
totiter = QUANTIZATION
except NotImplementedError:
# MRMFQ solver not available - use simplified Erlang-A approximation
lambda_arr = float(np.sum(lmap))
# Extract patience rate for Erlang-A approximation
patience_rate = ga[1] * 10.0 if ga[1] > 0 else 0.1 # rough estimate
# Erlang-A (M/M/s+M) approximation
rho = lambda_arr / (SERVERSIZE * mu)
if rho < 1:
# Stable system: approximate via modified Erlang-C
throughput = lambda_arr * min(1.0, 1.0 / (1.0 + patience_rate / (SERVERSIZE * mu)))
else:
# Overloaded: most excess customers abandon
throughput = SERVERSIZE * mu
TN[queue_idx, class_idx] = throughput
UN[queue_idx, class_idx] = throughput / (mu * SERVERSIZE)
RN[queue_idx, class_idx] = 1.0 / mu + (throughput / lambda_arr) / (SERVERSIZE * mu - throughput + 1e-10)
QN[queue_idx, class_idx] = throughput * RN[queue_idx, class_idx]
XN[0, class_idx] = throughput
CN[0, class_idx] = RN[queue_idx, class_idx]
totiter = QUANTIZATION
return QN, UN, RN, TN, CN, XN, totiter
def _mrmfq_solver(Qy, Qybounds, ydriftregimes, ydriftbounds, B):
"""
Multi-Regime Markov Fluid Queue solver.
This solver handles the spectral decomposition of the MRMFQ system
arising from the MAP/M/s+G reneging queue formulation.
The MRMFQ is characterized by:
- Generator matrices Qy for each regime
- Boundary generators Qybounds at each transition
- Drift matrices for fluid level dynamics
- Boundary levels B separating the regimes
Args:
Qy: (dim, dim, num_regimes) generator matrices per regime
Qybounds: (dim, dim, num_regimes+1) boundary generators
ydriftregimes: (num_regimes, dim) drift diagonal per regime
ydriftbounds: (num_regimes+1, dim) drift diagonal at boundaries
B: List of boundary levels
Returns:
Tuple of (coefficients, boundaries, Lzeromulti, Lnegmulti,
Lposmulti, Anegmulti, Aposmulti)
Raises:
NotImplementedError: The full MRMFQ spectral decomposition engine
is not yet available in Python. The MATLAB implementation
relies on the MRMFQSolver library.
"""
raise NotImplementedError(
"The MRMFQ solver (MRMFQSolver) is not yet available in Python native. "
"The MAP/M/s+G reneging analysis requires this specialized numerical engine. "
"A simplified Erlang-A approximation will be used instead."
)
__all__ = [
'QueueType',
'BmapMatrix',
'PhDistribution',
'QbdStatespace',
'RetrialQueueResult',
'RetrialQueueAnalyzer',
'qsys_bmapphnn_retrial',
'qsys_is_retrial',
'RetrialInfo',
'RenegingInfo',
'detect_reneging_topology',
'has_reneging_patience',
'extract_bmap_matrices',
'extract_ph_params',
'convert_patience_to_regimes',
'solver_mam_retrial',
]