Source code for line_solver.api.qsys.loss

"""
Loss System Queue Analysis.

Native Python implementations for analyzing loss systems (finite buffer
queues where customers are rejected when buffer is full).

Key functions:
    qsys_mm1k_loss: M/M/1/K loss probability
    qsys_mg1k_loss: M/G/1/K loss probability (Niu-Cooper formula)
    qsys_mg1k_loss_mgs: M/G/1/K loss with MacGregor Smith method

References:
    Original MATLAB: matlab/src/api/qsys/qsys_*k_loss.m
    Niu-Cooper, "Transform-Free Analysis of M/G/1/K and Related Queues", 1993
"""

import numpy as np
from typing import Tuple, Callable, Optional
from scipy.integrate import quad


[docs] def qsys_mm1k_loss(lambda_val: float, mu: float, K: int) -> Tuple[float, float]: """ Compute loss probability for M/M/1/K queue. Uses the closed-form formula for the M/M/1/K loss system where customers are rejected when the buffer (capacity K) is full. Args: lambda_val: Arrival rate mu: Service rate K: Buffer capacity (including customer in service) Returns: Tuple of (lossprob, rho): lossprob: Probability that an arriving customer is rejected rho: Offered load (lambda/mu) References: Original MATLAB: matlab/src/api/qsys/qsys_mm1k_loss.m """ rho = lambda_val / mu if abs(rho - 1.0) < 1e-10: # Special case: rho = 1 lossprob = 1.0 / (K + 1) else: lossprob = (1 - rho) / (1 - rho ** (K + 1)) * rho ** K return lossprob, rho
[docs] def qsys_mg1k_loss(lambda_val: float, service_pdf: Callable[[float], float], K: int, max_t: float = 100.0) -> Tuple[float, float, float]: """ Compute loss probability for M/G/1/K queue. Uses the Niu-Cooper transform-free analysis method for computing the stationary distribution and loss probability of M/G/1/K queues. Args: lambda_val: Arrival rate service_pdf: Probability density function of service time f(t) K: Buffer capacity max_t: Maximum integration time (default: 100) Returns: Tuple of (sigma, rho, lossprob): sigma: Normalizing constant term rho: Offered load lossprob: Probability of loss References: Original MATLAB: matlab/src/api/qsys/qsys_mg1k_loss.m Niu-Cooper, "Transform-Free Analysis of M/G/1/K", 1993 """ # Compute mean service time mean_service, _ = quad(lambda t: t * service_pdf(t), 0, max_t) rho = lambda_val * mean_service # Compute the stationary probabilities using Niu-Cooper method # This is a simplified implementation # Probability of n customers at departure epochs pi = np.zeros(K + 1) pi[0] = 1.0 # Iterate to find stationary distribution for _ in range(1000): pi_new = np.zeros(K + 1) # pi_0 contribution for n in range(K + 1): # Probability of n arrivals during service a_n = _poisson_arrivals(lambda_val, n, service_pdf, max_t) pi_new[min(n, K)] += pi[0] * a_n # pi_j contributions for j > 0 for j in range(1, K + 1): for n in range(K - j + 2): a_n = _poisson_arrivals(lambda_val, n, service_pdf, max_t) new_state = min(j + n - 1, K) if new_state >= 0: pi_new[new_state] += pi[j] * a_n # Normalize total = np.sum(pi_new) if total > 0: pi_new /= total if np.linalg.norm(pi_new - pi) < 1e-10: break pi = pi_new # Loss probability is related to probability of being in state K # at arrival epochs (PASTA) sigma = pi[K] lossprob = sigma * rho / (1 + sigma * (rho - 1)) if rho > 0 else 0.0 return sigma, rho, lossprob
def _poisson_arrivals(lambda_val: float, n: int, service_pdf: Callable[[float], float], max_t: float) -> float: """Compute probability of n Poisson arrivals during service.""" from scipy.special import factorial def integrand(t): return (lambda_val * t) ** n * np.exp(-lambda_val * t) / factorial(n) * service_pdf(t) result, _ = quad(integrand, 0, max_t) return result
[docs] def qsys_mg1k_loss_mgs(lambda_val: float, mu: float, mu_scv: float, K: int) -> Tuple[float, float]: """ Compute loss probability for M/G/1/K using MacGregor Smith approximation. Matches MATLAB qsys_mg1k_loss_mgs.m exactly. Args: lambda_val: Arrival rate mu: Service rate mu_scv: Squared coefficient of variation of service time K: Buffer capacity Returns: Tuple of (lossprob, rho): lossprob: Probability of loss rho: Offered load References: Original MATLAB: matlab/src/api/qsys/qsys_mg1k_loss_mgs.m J. MacGregor Smith, "Optimal Design and Performance Modelling of M/G/1/K Queueing Systems" """ rho = lambda_val / mu s = np.sqrt(mu_scv) sqrt_rho = np.sqrt(rho) lossprob_num = rho**((sqrt_rho * s**2 - sqrt_rho + 2 * K) / (2 + sqrt_rho * s**2 - sqrt_rho)) * (rho - 1) lossprob_den = rho**(2 * (1 + sqrt_rho * s**2 - sqrt_rho + K) / (2 + sqrt_rho * s**2 - sqrt_rho)) - 1 lossprob = lossprob_num / lossprob_den return lossprob, rho
[docs] def qsys_mxm1(lambda_batch: float, mu: float, E_X_or_batch_sizes, E_X2_or_pmf, mode: Optional[str] = None) -> Tuple[float, float, float, float]: """ Analyze MX/M/1 queue with batch arrivals. Matches MATLAB qsys_mxm1.m exactly. Three input formats: 1. Moment-based: qsys_mxm1(lambda_batch, mu, E_X, E_X2) 2. PMF-based: qsys_mxm1(lambda_batch, mu, batch_sizes, pmf) 3. Variance: qsys_mxm1(lambda_batch, mu, E_X, Var_X, 'variance') Args: lambda_batch: Batch arrival rate mu: Service rate E_X_or_batch_sizes: Mean batch size (scalar) or array of batch sizes E_X2_or_pmf: Second moment of batch size, PMF, or variance mode: Optional 'variance' flag for variance-based input Returns: Tuple of (W, Wq, U, Q): W: Mean time in system Wq: Mean waiting time in queue U: Server utilization Q: Mean queue length (including service) References: Original MATLAB: matlab/src/api/qsys/qsys_mxm1.m """ if mode is not None and mode.lower() == 'variance': # Format 3: Variance-based E_X = float(E_X_or_batch_sizes) Var_X = float(E_X2_or_pmf) E_X2 = Var_X + E_X**2 elif hasattr(E_X_or_batch_sizes, '__len__') and len(np.asarray(E_X_or_batch_sizes).flatten()) > 1: # Format 2: PMF-based batch_sizes = np.asarray(E_X_or_batch_sizes, dtype=float).flatten() pmf = np.asarray(E_X2_or_pmf, dtype=float).flatten() if len(batch_sizes) != len(pmf): raise ValueError("Batch sizes and PMF must have the same length") pmf = pmf / np.sum(pmf) E_X = np.sum(batch_sizes * pmf) E_X2 = np.sum(batch_sizes**2 * pmf) else: # Format 1: Moment-based (default) E_X = float(E_X_or_batch_sizes) E_X2 = float(E_X2_or_pmf) # Compute effective job arrival rate lambda_eff = lambda_batch * E_X # Compute utilization rho = lambda_eff / mu if rho >= 1: raise ValueError(f"System is unstable: rho = {rho:.6f} >= 1") # Mean waiting time in queue (MATLAB formula): # Wq = rho/(mu*(1-rho)) + (E[X^2] - E[X])/(2*mu*E[X]*(1-rho)) Wq = rho / (mu * (1 - rho)) + (E_X2 - E_X) / (2 * mu * E_X * (1 - rho)) # Mean time in system W = Wq + 1 / mu # Server utilization U = rho # Mean queue length (Little's Law) Q = lambda_eff * W return W, Wq, U, Q
__all__ = [ 'qsys_mm1k_loss', 'qsys_mg1k_loss', 'qsys_mg1k_loss_mgs', 'qsys_mxm1', ]