Source code for line_solver.api.pfqn.mva

"""
Mean Value Analysis (MVA) algorithms for Product-Form Queueing Networks.

Native Python implementations of MVA and related algorithms including:
- Standard MVA for closed networks
- Multi-class MVA with population recursion
- Approximate MVA methods

References:
    Reiser, M., and Lavenberg, S.S. "Mean-Value Analysis of Closed Multichain
    Queueing Networks." Journal of the ACM 27.2 (1980): 313-322.
"""

import numpy as np
from numpy.linalg import LinAlgError
from typing import Tuple, Optional, Dict, Any, Union
from math import log, ceil

from .replicas import pfqn_unique, pfqn_expand, pfqn_combine_mi

# Import JIT-compiled kernels
try:
    from .mva_jit import (
        HAS_NUMBA as MVA_HAS_NUMBA,
        mva_single_class_jit,
        mva_population_recursion_jit,
        schweitzer_iteration_jit,
    )
except ImportError:
    # Fallback if JIT import fails
    MVA_HAS_NUMBA = False
    mva_single_class_jit = None
    mva_population_recursion_jit = None
    schweitzer_iteration_jit = None

# Threshold for using JIT (population space size)
JIT_THRESHOLD = 100


def _population_lattice_pprod(n: np.ndarray, N: np.ndarray = None) -> np.ndarray:
    """
    Generate next population vector in lexicographic order.

    Args:
        n: Current population vector
        N: Maximum population (optional, for wrapping)

    Returns:
        Next population vector, or (-1,...,-1) when exhausted
    """
    R = len(n)
    n_next = n.copy()

    # If N is None, just increment by 1 in the last position and carry
    if N is None:
        n_next[-1] += 1
        return n_next

    # Find rightmost position that can be incremented
    for i in range(R - 1, -1, -1):
        if n_next[i] < N[i]:
            n_next[i] += 1
            # Reset all positions to the right to 0
            for j in range(i + 1, R):
                n_next[j] = 0
            return n_next

    # All exhausted
    return -np.ones(R)


def _population_lattice_hashpop(n: np.ndarray, N: np.ndarray) -> int:
    """
    Compute hash index for population vector n given max N.

    Args:
        n: Population vector
        N: Maximum population vector

    Returns:
        Linear index in flattened population lattice
    """
    R = len(n)
    idx = 0
    mult = 1
    for i in range(R - 1, -1, -1):
        idx += int(n[i]) * mult
        mult *= int(N[i]) + 1
    return idx


[docs] def pfqn_mva_single_class(N: int, L: np.ndarray, Z: float = 0.0, mi: Optional[np.ndarray] = None) -> Dict[str, Any]: """ Mean Value Analysis for single-class closed network. Simplified MVA for single customer class, with optional multi-server stations specified via mi (multiplicity). Args: N: Number of customers L: Service demands at each station (1D array of length M) Z: Think time (default 0) mi: Number of servers at each station (default all 1) Returns: dict with keys: - 'X': Throughput - 'Q': Queue lengths (array of length M) - 'R': Residence times (array of length M) - 'U': Utilizations (array of length M) - 'lG': Log of normalizing constant """ L = np.asarray(L, dtype=np.float64).flatten() M = len(L) if mi is None: mi = np.ones(M) else: mi = np.asarray(mi, dtype=np.float64).flatten() if N <= 0: return { 'X': 0.0, 'Q': np.zeros(M), 'R': L.copy(), 'U': np.zeros(M), 'lG': 0.0 } # Use JIT version for larger populations if MVA_HAS_NUMBA and mva_single_class_jit is not None and N > JIT_THRESHOLD: X, Q, R, U, lG = mva_single_class_jit(N, L, Z, mi) return {'X': X, 'Q': Q, 'R': R, 'U': U, 'lG': lG} # Pure Python MVA recursion Q = np.zeros(M) lG = 0.0 for n in range(1, N + 1): # Residence times: R_i = L_i * (m_i + Q_i(n-1)) R = L * (mi + Q) # Throughput: X = n / (Z + sum(R)) total_R = R.sum() X = n / (Z + total_R) # Update queue lengths Q = X * R # Update log normalizing constant if n > 0: lG -= log(X) # Utilizations U = X * L return { 'X': X, 'Q': Q, 'R': R, 'U': U, 'lG': lG }
[docs] def pfqn_mva(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, mi: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Mean Value Analysis for multi-class closed product-form network. Implements the exact MVA algorithm using population recursion. Computes exact performance measures for closed product-form networks with load-independent stations. Args: L: Service demand matrix (M x R) where M is stations, R is classes N: Population vector (1 x R or R,) - number of jobs per class Z: Think time vector (1 x R or R,) - think time per class (default 0) mi: Multiplicity vector (1 x M or M,) - servers per station (default 1) Returns: Tuple of (XN, CN, QN, UN, RN, TN, AN) where: - XN: Throughputs per class (1 x R) - CN: Response times per class (1 x R) - total cycle time - QN: Queue lengths (M x R) - UN: Utilizations (M x R) - RN: Residence times (M x R) - TN: Node throughputs (M x R) - AN: Arrival rates (M x R) """ L = np.asarray(L, dtype=np.float64) N = np.asarray(N, dtype=np.float64).flatten() N = np.ceil(N).astype(int) R = len(N) # Number of classes if L.ndim == 1: L = L.reshape(-1, 1) if R == 1 else L.reshape(1, -1) M_original = L.shape[0] # Original number of stations if L.shape[1] != R: raise ValueError(f"Demand matrix columns ({L.shape[1]}) must match population size ({R})") # Handle Z if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=np.float64).flatten() if len(Z) != R: raise ValueError(f"Think time vector length ({len(Z)}) must match number of classes ({R})") # Handle mi if mi is None: mi = np.ones(M_original) else: mi = np.asarray(mi, dtype=np.float64).flatten() if len(mi) != M_original: raise ValueError(f"Multiplicity vector length ({len(mi)}) must match number of stations ({M_original})") # Detect and consolidate replicated stations unique_result = pfqn_unique(L) L_reduced = unique_result.L_unique mapping = unique_result.mapping M = L_reduced.shape[0] # Combine user-provided mi with detected multiplicity mi = pfqn_combine_mi(mi, mapping, M).flatten() # Empty population check if not np.any(N > 0): return (np.zeros((1, R)), np.zeros((1, R)), np.zeros((M_original, R)), np.zeros((M_original, R)), np.zeros((M_original, R)), np.zeros((M_original, R)), np.zeros((M_original, R))) # For single class, use simpler algorithm if R == 1: result = pfqn_mva_single_class(int(N[0]), L_reduced[:, 0], Z[0], mi) XN = np.array([[result['X']]]) QN = result['Q'].reshape(-1, 1) RN = result['R'].reshape(-1, 1) UN = result['U'].reshape(-1, 1) # Expand results back to original dimensions if stations were consolidated if M < M_original: QN, UN, RN = pfqn_expand(QN, UN, RN, mapping) TN = XN * np.ones((M_original, 1)) # Node throughputs = system throughput AN = TN.copy() # Arrival rates = throughputs CN = np.array([[RN.sum() + Z[0]]]) return XN, CN, QN, UN, RN, TN, AN # Multi-class MVA using population recursion # Total population combinations for JIT threshold check totpop = int(np.prod(N + 1)) # Use JIT version for larger population spaces if MVA_HAS_NUMBA and mva_population_recursion_jit is not None and totpop > JIT_THRESHOLD: N_int = N.astype(np.int64) XN_jit, QN_jit, CN_jit, lGN = mva_population_recursion_jit( L_reduced, N_int, Z, mi ) XN = XN_jit.reshape(1, -1) QN = QN_jit CN = CN_jit # Compute utilizations UN = np.zeros((M, R)) for m in range(M): for r in range(R): UN[m, r] = XN[0, r] * L_reduced[m, r] # Compute residence times RN = np.zeros((M, R)) for m in range(M): for r in range(R): if XN[0, r] > 0: RN[m, r] = QN[m, r] / XN[0, r] else: RN[m, r] = L_reduced[m, r] # Expand results back to original dimensions if stations were consolidated if M < M_original: QN, UN, RN = pfqn_expand(QN, UN, RN, mapping) CN, _, _ = pfqn_expand(CN, CN, CN, mapping) # Node throughputs and arrival rates TN = np.zeros((M_original, R)) AN = np.zeros((M_original, R)) for m in range(M_original): for r in range(R): TN[m, r] = XN[0, r] AN[m, r] = XN[0, r] # Response time per class CN_total = np.zeros((1, R)) for r in range(R): CN_total[0, r] = RN[:, r].sum() + Z[r] return XN, CN_total, QN, UN, RN, TN, AN # Pure Python: Compute product of (N[i]+1) for indexing prods = np.zeros(R - 1) for w in range(R - 1): prods[w] = np.prod(np.ones(R - w - 1) + N[w + 1:]) # Find first non-empty class (from the end) first_non_empty = R - 1 while first_non_empty >= 0 and N[first_non_empty] == 0: first_non_empty -= 1 if first_non_empty < 0: return (np.zeros((1, R)), np.zeros((1, R)), np.zeros((M_original, R)), np.zeros((M_original, R)), np.zeros((M_original, R)), np.zeros((M_original, R)), np.zeros((M_original, R))) # Q[pop_idx, station] stores cumulative queue length at population index Q = np.zeros((totpop, M)) # Output arrays XN = np.zeros((1, R)) QN = np.zeros((M, R)) CN = np.zeros((M, R)) # Log normalizing constant lGN = 0.0 # Initialize population vector n = np.zeros(R, dtype=int) n[first_non_empty] = 1 currentpop = 1 ctr = totpop # Process all populations including empty state indexing while ctr > 0: for s in range(R): if n[s] > 0: # Compute index for n - e_s (one less job in class s) n[s] -= 1 pos_n_1s = int(n[R - 1]) for w in range(R - 1): pos_n_1s += int(n[w] * prods[w]) n[s] += 1 else: pos_n_1s = 0 # Compute residence times: CN[i,s] = L_reduced[i,s] * (mi[i] + Q[pos_n_1s, i]) CNtot = 0.0 for i in range(M): CN[i, s] = L_reduced[i, s] * (mi[i] + Q[pos_n_1s, i]) CNtot += CN[i, s] # Compute throughput for class s XN[0, s] = n[s] / (Z[s] + CNtot) if (Z[s] + CNtot) > 0 else 0.0 # Compute queue lengths and accumulate for i in range(M): QN[i, s] = XN[0, s] * CN[i, s] Q[currentpop, i] += QN[i, s] # Update log normalizing constant # Find last non-zero class position nonzero_idx = np.where(n > 0)[0] if len(nonzero_idx) > 0: last_nnz = nonzero_idx[-1] sumn = np.sum(n[:last_nnz]) sumN = np.sum(N[:last_nnz]) sumnprime = np.sum(n[last_nnz + 1:]) if sumn == sumN and sumnprime == 0 and XN[0, last_nnz] > 0: lGN -= log(XN[0, last_nnz]) # Find next population vector s = R - 1 while s >= 0 and (n[s] == N[s] or s > first_non_empty): s -= 1 if s < 0: break n[s] += 1 for i in range(s + 1, R): n[i] = 0 ctr -= 1 currentpop += 1 # Compute utilizations UN = np.zeros((M, R)) for m in range(M): for r in range(R): UN[m, r] = XN[0, r] * L_reduced[m, r] # Compute residence times (waiting times) RN = np.zeros((M, R)) for m in range(M): for r in range(R): if XN[0, r] > 0: RN[m, r] = QN[m, r] / XN[0, r] else: RN[m, r] = L_reduced[m, r] # Expand results back to original dimensions if stations were consolidated if M < M_original: QN, UN, RN = pfqn_expand(QN, UN, RN, mapping) CN, _, _ = pfqn_expand(CN, CN, CN, mapping) # Node throughputs and arrival rates TN = np.zeros((M_original, R)) AN = np.zeros((M_original, R)) for m in range(M_original): for r in range(R): TN[m, r] = XN[0, r] # Closed network: all throughputs equal system throughput AN[m, r] = XN[0, r] # Arrival rate = departure rate = throughput # Response time per class (sum of residence times + think time) CN_total = np.zeros((1, R)) for r in range(R): CN_total[0, r] = RN[:, r].sum() + Z[r] return XN, CN_total, QN, UN, RN, TN, AN
[docs] def pfqn_bs(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Balanced System (Asymptotic) analysis for product-form networks. Provides asymptotic approximations based on bottleneck analysis. Fast but less accurate than exact MVA for small populations. Args: L: Service demand matrix (M x R) N: Population vector Z: Think time vector (default 0) Returns: Same format as pfqn_mva """ L = np.asarray(L, dtype=np.float64) N = np.asarray(N, dtype=np.float64).flatten() R = len(N) if L.ndim == 1: L = L.reshape(-1, 1) if R == 1 else L.reshape(1, -1) M = L.shape[0] if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=np.float64).flatten() # Compute asymptotic bounds XN = np.zeros((1, R)) QN = np.zeros((M, R)) UN = np.zeros((M, R)) RN = np.zeros((M, R)) TN = np.zeros((M, R)) AN = np.zeros((M, R)) for r in range(R): if N[r] <= 0: continue # Find bottleneck demand D_max = L[:, r].max() D_total = L[:, r].sum() # Asymptotic throughput if Z[r] > 0 or D_total > 0: X_lower = N[r] / (N[r] * D_max + Z[r]) # Heavy traffic X_upper = N[r] / (D_total + Z[r]) # Light traffic XN[0, r] = min(X_upper, 1.0 / D_max) if D_max > 0 else X_upper else: XN[0, r] = 0.0 # Utilizations and queue lengths for m in range(M): UN[m, r] = XN[0, r] * L[m, r] RN[m, r] = L[m, r] * (1 + UN[m, r] * N[r] / max(N[r], 1)) # Simple approximation QN[m, r] = XN[0, r] * RN[m, r] TN[m, r] = XN[0, r] AN[m, r] = XN[0, r] # Response times CN = np.zeros((1, R)) for r in range(R): CN[0, r] = RN[:, r].sum() + Z[r] return XN, CN, QN, UN, RN, TN, AN
[docs] def pfqn_aql(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, max_iter: int = 1000, tol: float = 1e-6 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Approximate Queue Length (AQL) algorithm. Uses iterative approximation to compute queue lengths for large populations where exact MVA would be computationally expensive. Args: L: Service demand matrix (M x R) N: Population vector Z: Think time vector (default 0) max_iter: Maximum iterations (default 1000) tol: Convergence tolerance (default 1e-6) Returns: Same format as pfqn_mva """ L = np.asarray(L, dtype=np.float64) N = np.asarray(N, dtype=np.float64).flatten() N_total = N.sum() R = len(N) if L.ndim == 1: L = L.reshape(-1, 1) if R == 1 else L.reshape(1, -1) M = L.shape[0] if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=np.float64).flatten() # Initialize with balanced system solution XN, CN, QN, UN, RN, TN, AN = pfqn_bs(L, N, Z) # Use JIT version for iterative refinement if MVA_HAS_NUMBA and schweitzer_iteration_jit is not None and M * R > 10: XN_jit, QN, UN, RN, iterations = schweitzer_iteration_jit( L, N.astype(np.float64), Z, QN, max_iter, tol ) XN = XN_jit.reshape(1, -1) # Node throughputs and arrival rates for m in range(M): for r in range(R): TN[m, r] = XN[0, r] AN[m, r] = XN[0, r] # Response times CN = np.zeros((1, R)) for r in range(R): CN[0, r] = RN[:, r].sum() + Z[r] return XN, CN, QN, UN, RN, TN, AN # Pure Python iterative refinement (Schweitzer approximation) for iteration in range(max_iter): Q_old = QN.copy() for r in range(R): if N[r] <= 0: continue for m in range(M): # Schweitzer approximation: E[Q_i | arrival of class r job] # ≈ (N_r - 1) / N_r * Q_i if N[r] > 1: Q_others = (N[r] - 1) / N[r] * Q_old[m, r] else: Q_others = 0 # Add other class contributions for s in range(R): if s != r: Q_others += Q_old[m, s] # Update residence time RN[m, r] = L[m, r] * (1 + Q_others) # Throughput R_total = RN[:, r].sum() if Z[r] + R_total > 0: XN[0, r] = N[r] / (Z[r] + R_total) else: XN[0, r] = 0 # Queue lengths for m in range(M): QN[m, r] = XN[0, r] * RN[m, r] UN[m, r] = XN[0, r] * L[m, r] TN[m, r] = XN[0, r] AN[m, r] = XN[0, r] # Check convergence diff = np.abs(QN - Q_old).max() if diff < tol: break # Response times CN = np.zeros((1, R)) for r in range(R): CN[0, r] = RN[:, r].sum() + Z[r] return XN, CN, QN, UN, RN, TN, AN
[docs] def pfqn_sqni(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Single Queue Network Interpolation (SQNI) approximate MVA. Implements a fast approximation for multi-class closed queueing networks that reduces the system to single-queue representations with interpolation-based corrections. This method is particularly efficient for networks where one station dominates (bottleneck analysis), providing a good trade-off between accuracy and computational speed. Args: L: Service demand vector (1 x R or R,) - demands at the bottleneck queue N: Population vector (1 x R or R,) - number of jobs per class Z: Think time vector (1 x R or R,) - think time per class (default 0) Returns: Tuple of (Q, U, X) where: - Q: Queue lengths (2 x R) - first row for queue, second placeholder - U: Utilizations (2 x R) - first row for queue, second placeholder - X: Throughputs (1 x R) Reference: Based on the SQNI method for approximate MVA analysis. """ L = np.asarray(L, dtype=np.float64).flatten() N = np.asarray(N, dtype=np.float64).flatten() R = len(N) if len(L) != R: raise ValueError(f"L length ({len(L)}) must match N length ({R})") if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=np.float64).flatten() if len(Z) != R: raise ValueError(f"Z length ({len(Z)}) must match N length ({R})") queue_idx = 0 Nt = N.sum() Q = np.zeros((2, R)) U = np.zeros((2, R)) X = np.zeros((1, R)) if Nt <= 0: return Q, U, X if Nt == 1.0: for r in range(R): if Z[r] + L[r] > 0: Xr = N[r] / (Z[r] + L[r]) else: Xr = 0.0 X[0, r] = Xr U[queue_idx, r] = Xr * L[r] Q[queue_idx, r] = Xr * L[r] else: for r in range(R): Nr = N[r] Lr = L[r] Zr = Z[r] # Compute Nvec_1r: N with one less job in class r Nvec_1r = N.copy() Nvec_1r[r] = max(0, Nvec_1r[r] - 1) sumN = N.sum() # Compute sumBrPart sumBrPart = 0.0 for i in range(R): if i != r: Zi = Z[i] Li = L[i] Ni = Nvec_1r[i] denom = Zi + Li + Li * (sumN - 2) if denom > 0: sumBrPart += Zi * Ni / denom # Compute BrVec BrVec = np.zeros(R) for i in range(R): Zi = Z[i] Li = L[i] Ni = N[i] denom = Zi + Li + Li * (sumN - 1 - sumBrPart) if denom > 0: BrVec[i] = Ni / denom * Zi # Compute BrSum (sum of BrVec except class r) BrSum = 0.0 for i in range(R): if i != r: BrSum += BrVec[i] Br = Lr * BrSum # Compute throughput if Lr == 0.0: if Zr > 0: Xr = Nr / Zr else: Xr = 0.0 else: # Quadratic formula solution discriminant = (Br * Br - 2 * Br * Lr * Nt - 2 * Br * Zr + Lr * Lr * Nt * Nt + 2 * Lr * Nt * Zr - 4 * Nr * Lr * Zr + Zr * Zr) if discriminant < 0: discriminant = 0 sqrt_term = np.sqrt(discriminant) denom = 2 * Lr * Zr if denom > 0: Xr = (Zr - sqrt_term - Br + Lr * Nt) / denom else: Xr = Nr / (Lr * Nt) if Lr * Nt > 0 else 0.0 X[0, r] = max(0, Xr) U[queue_idx, r] = X[0, r] * Lr Q[queue_idx, r] = Nr - X[0, r] * Zr # Handle Z=0 case (adjust for infinite server at think station) for r in range(R): if Z[r] == 0.0 and L[r] > 0: Q_sum = Q[queue_idx, :].sum() denom = L[r] * (1 + Q_sum) if denom > 0: Xr = N[r] / denom else: Xr = 0.0 X[0, r] = Xr U[queue_idx, r] = Xr * L[r] Q[queue_idx, r] = N[r] - Xr * Z[r] return Q, U, X
[docs] def pfqn_qd( L: np.ndarray, N: np.ndarray, ga: callable = None, be: callable = None, Q0: np.ndarray = None, tol: float = 1e-6, max_iter: int = 1000 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, int]: """ Queue-Dependent (QD) Approximate MVA. Implements the QD-AMVA algorithm that uses queue-dependent correction factors to improve accuracy of approximate MVA for closed networks. The algorithm iteratively computes queue lengths using: - A correction factor delta = (N_tot - 1) / N_tot - Per-class correction factor delta_r = (N_r - 1) / N_r - Optional scaling functions ga(A) and be(A) for advanced corrections Args: L: Service demand matrix (M x R) - rows are stations, columns are classes N: Population vector (R,) - number of jobs per class ga: Gamma scaling function ga(A) -> array(M,) (default: ones) A is the arrival queue seen by class, A[k] = 1 + delta * sum(Q[k,:]) be: Beta scaling function be(A) -> array(M, R) (default: ones) A is the arrival queue per class, A[k,r] = 1 + delta_r * Q[k,r] Q0: Initial queue length estimate (M x R) (default: proportional) tol: Convergence tolerance (default 1e-6) max_iter: Maximum iterations (default 1000) Returns: Tuple of (Q, X, U, iter) where: Q: Mean queue lengths (M x R) X: Class throughputs (R,) U: Utilizations (M x R) iter: Number of iterations performed Reference: Schweitzer, P.J. "Approximate analysis of multiclass closed networks of queues." Proceedings of the International Conference on Stochastic Control and Optimization (1979). """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape N_tot = np.sum(N) # Default scaling functions (identity - return ones) if ga is None: def ga(A): return np.ones(M) if be is None: def be(A): return np.ones((M, R)) # Initialize queue lengths if Q0 is None: # Proportional initialization: Q_kr = L_kr / sum_k(L_kr) * N_r L_sum = np.sum(L, axis=0, keepdims=True) L_sum[L_sum == 0] = 1 # Avoid division by zero Q = (L / L_sum) * N else: Q = np.asarray(Q0, dtype=float).copy() # Queue-dependent correction factors if N_tot > 0: delta = (N_tot - 1) / N_tot else: delta = 0.0 deltar = np.zeros(R) for r in range(R): if N[r] > 0: deltar[r] = (N[r] - 1) / N[r] else: deltar[r] = 0.0 # Initialize outputs X = np.zeros(R) U = np.zeros((M, R)) C = np.zeros((M, R)) # Storage for arrival queue vectors Ak = [np.zeros(M) for _ in range(R)] Akr = np.zeros((M, R)) # Iteration Q_prev = Q * 10 # Ensure first iteration runs iteration = 0 while np.max(np.abs(Q - Q_prev)) > tol and iteration < max_iter: iteration += 1 Q_prev = Q.copy() # Compute arrival queue vectors for k in range(M): Q_row_sum = np.sum(Q[k, :]) for r in range(R): Ak[r][k] = 1 + delta * Q_row_sum Akr[k, r] = 1 + deltar[r] * Q[k, r] # Compute queue lengths and throughputs for each class for r in range(R): if N[r] <= 0: X[r] = 0 Q[:, r] = 0 U[:, r] = 0 continue # Get scaling factors g = ga(Ak[r]) b = be(Akr) # Compute cycle times C[k,r] for k in range(M): Q_row_sum = np.sum(Q_prev[k, :]) C[k, r] = L[k, r] * g[k] * b[k, r] * (1 + delta * Q_row_sum) # Compute throughput C_sum = np.sum(C[:, r]) if C_sum > 0: X[r] = N[r] / C_sum else: X[r] = 0 # Compute queue lengths and utilizations for k in range(M): Q[k, r] = X[r] * C[k, r] U[k, r] = L[k, r] * g[k] * b[k, r] * X[r] return Q, X, U, iteration
[docs] def pfqn_qdlin( L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, tol: float = 1e-6, max_iter: int = 1000 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ QD-Linearizer (QDLIN) Approximate MVA. Combines Queue-Dependent (QD) correction with Linearizer iteration for improved accuracy in multi-class closed networks. Args: L: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,) (default: zeros) tol: Convergence tolerance (default 1e-6) max_iter: Maximum iterations (default 1000) Returns: Tuple of (Q, U, R, X, C, iter) where: Q: Mean queue lengths (M x R) U: Utilizations (M x R) R: Residence times (M x R) X: Class throughputs (1 x R) C: Cycle times (1 x R) iter: Number of iterations performed """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).ravel() N_tot = np.sum(N) if N_tot <= 0: return (np.zeros((M, R)), np.zeros((M, R)), np.zeros((M, R)), np.zeros((1, R)), np.zeros((1, R)), 0) # QD correction factors delta = (N_tot - 1) / N_tot if N_tot > 0 else 0.0 deltar = np.where(N > 0, (N - 1) / N, 0.0) # Initialize with proportional distribution L_sum = np.sum(L, axis=0, keepdims=True) L_sum[L_sum == 0] = 1 Q = (L / L_sum) * N X = np.zeros((1, R)) RN = np.zeros((M, R)) C = np.zeros((1, R)) U = np.zeros((M, R)) Q_prev = Q * 10 iteration = 0 while np.max(np.abs(Q - Q_prev)) > tol and iteration < max_iter: iteration += 1 Q_prev = Q.copy() for r in range(R): if N[r] <= 0: continue # Compute residence times with QD correction for k in range(M): # Queue seen by arriving class-r job Q_others = np.sum(Q_prev[k, :]) - Q_prev[k, r] Q_seen = Q_others + deltar[r] * Q_prev[k, r] RN[k, r] = L[k, r] * (1 + Q_seen) # Throughput R_total = np.sum(RN[:, r]) if Z[r] + R_total > 0: X[0, r] = N[r] / (Z[r] + R_total) else: X[0, r] = 0 # Update queue lengths for k in range(M): Q[k, r] = X[0, r] * RN[k, r] U[k, r] = X[0, r] * L[k, r] C[0, r] = R_total return Q, U, RN, X, C, iteration
[docs] def pfqn_qli( L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, tol: float = 1e-6, max_iter: int = 1000 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ Queue-Line (QLI) Approximate MVA (Wang-Sevcik). Implements the Wang-Sevcik Queue-Line approximation which provides improved accuracy for multi-class networks by better estimating the queue length seen by arriving customers. Args: L: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,) (default: zeros) tol: Convergence tolerance (default 1e-6) max_iter: Maximum iterations (default 1000) Returns: Tuple of (Q, U, R, X, C, iter) where: Q: Mean queue lengths (M x R) U: Utilizations (M x R) R: Residence times (M x R) X: Class throughputs (1 x R) C: Cycle times (1 x R) iter: Number of iterations performed Reference: Wang, W. and Sevcik, K.C. "Performance Models for Multiprogrammed Systems." IBM Research Report RC 5925 (1976). """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).ravel() N_tot = np.sum(N) if N_tot <= 0: return (np.zeros((M, R)), np.zeros((M, R)), np.zeros((M, R)), np.zeros((1, R)), np.zeros((1, R)), 0) # Initialize with proportional distribution L_sum = np.sum(L, axis=0, keepdims=True) L_sum[L_sum == 0] = 1 Q = (L / L_sum) * N X = np.zeros((1, R)) RN = np.zeros((M, R)) C = np.zeros((1, R)) U = np.zeros((M, R)) Q_prev = Q * 10 iteration = 0 while np.max(np.abs(Q - Q_prev)) > tol and iteration < max_iter: iteration += 1 Q_prev = Q.copy() for r in range(R): if N[r] <= 0: continue for k in range(M): # Wang-Sevcik Queue-Line correction # Estimate queue seen by arriving class-r customer Q_total_k = np.sum(Q_prev[k, :]) # Compute qlinum: L[k,r] * (1 + Q_total - Q[k,r]) qlinum = L[k, r] * (1 + Q_total_k - Q_prev[k, r]) # Compute qliden: sum over all stations m of L[m,r] * (1 + Q_total_m - Q[m,r]) qliden = 0.0 for m in range(M): if L[m, r] > 0: Q_total_m = np.sum(Q_prev[m, :]) qliden += L[m, r] * (1 + Q_total_m - Q_prev[m, r]) if qliden > 0 and N[r] > 1: Q_seen = Q_total_k - (1 / (N[r] - 1)) * (Q_prev[k, r] - qlinum / qliden) else: Q_seen = Q_total_k - Q_prev[k, r] Q_seen = max(0, Q_seen) RN[k, r] = L[k, r] * (1 + Q_seen) # Throughput R_total = np.sum(RN[:, r]) if Z[r] + R_total > 0: X[0, r] = N[r] / (Z[r] + R_total) else: X[0, r] = 0 # Update queue lengths for k in range(M): Q[k, r] = X[0, r] * RN[k, r] U[k, r] = X[0, r] * L[k, r] C[0, r] = R_total return Q, U, RN, X, C, iteration
[docs] def pfqn_fli( L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, tol: float = 1e-6, max_iter: int = 1000 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ Fraction-Line (FLI) Approximate MVA (Wang-Sevcik). Implements the Wang-Sevcik Fraction-Line approximation, an alternative to Queue-Line that uses a different formula for estimating the queue length seen by arriving customers. Args: L: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,) (default: zeros) tol: Convergence tolerance (default 1e-6) max_iter: Maximum iterations (default 1000) Returns: Tuple of (Q, U, R, X, C, iter) where: Q: Mean queue lengths (M x R) U: Utilizations (M x R) R: Residence times (M x R) X: Class throughputs (1 x R) C: Cycle times (1 x R) iter: Number of iterations performed Reference: Wang, W. and Sevcik, K.C. "Performance Models for Multiprogrammed Systems." IBM Research Report RC 5925 (1976). """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).ravel() N_tot = np.sum(N) if N_tot <= 0: return (np.zeros((M, R)), np.zeros((M, R)), np.zeros((M, R)), np.zeros((1, R)), np.zeros((1, R)), 0) # Initialize with proportional distribution L_sum = np.sum(L, axis=0, keepdims=True) L_sum[L_sum == 0] = 1 Q = (L / L_sum) * N X = np.zeros((1, R)) RN = np.zeros((M, R)) C = np.zeros((1, R)) U = np.zeros((M, R)) Q_prev = Q * 10 iteration = 0 while np.max(np.abs(Q - Q_prev)) > tol and iteration < max_iter: iteration += 1 Q_prev = Q.copy() for r in range(R): if N[r] <= 0: continue for k in range(M): # Wang-Sevcik Fraction-Line correction Q_total_k = np.sum(Q_prev[k, :]) # Compute qlinum: L[k,r] * (1 + Q_total - Q[k,r]) qlinum = L[k, r] * (1 + Q_total_k - Q_prev[k, r]) # Compute qliden: sum over all stations m of L[m,r] * (1 + Q_total_m - Q[m,r]) qliden = 0.0 for m in range(M): if L[m, r] > 0: Q_total_m = np.sum(Q_prev[m, :]) qliden += L[m, r] * (1 + Q_total_m - Q_prev[m, r]) # FLI uses different formula than QLI if qliden > 0 and N[r] > 0: Q_seen = Q_total_k - (2 / N[r]) * Q_prev[k, r] + qlinum / qliden else: Q_seen = Q_total_k - Q_prev[k, r] Q_seen = max(0, Q_seen) RN[k, r] = L[k, r] * (1 + Q_seen) # Throughput R_total = np.sum(RN[:, r]) if Z[r] + R_total > 0: X[0, r] = N[r] / (Z[r] + R_total) else: X[0, r] = 0 # Update queue lengths for k in range(M): Q[k, r] = X[0, r] * RN[k, r] U[k, r] = X[0, r] * L[k, r] C[0, r] = R_total return Q, U, RN, X, C, iteration
[docs] def pfqn_bsfcfs( L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, tol: float = 1e-6, max_iter: int = 1000, QN: np.ndarray = None, weight: np.ndarray = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ Bard-Schweitzer approximate MVA for FCFS scheduling with weighted priorities. Implements AMVA with FCFS approximation where classes can have relative priority weights affecting the expected waiting times. Args: L: Service demand matrix (M x R) where M is stations, R is classes N: Population vector (1 x R) - number of jobs per class Z: Think time vector (1 x R) - default: zeros tol: Convergence tolerance (default 1e-6) max_iter: Maximum iterations (default 1000) QN: Initial queue length matrix (M x R) - default: uniform distribution weight: Weight matrix (M x R) for relative priorities - default: ones Returns: Tuple of (XN, QN, UN, RN, it) where: XN: System throughput per class (1 x R) QN: Mean queue lengths (M x R) UN: Utilizations (M x R) RN: Residence times (M x R) it: Number of iterations performed Reference: Bard, Y. and Schweitzer, P.J. "Analyzing Closed Queueing Networks with Multiple Job Classes and Multiserver Stations." Performance Evaluation Review 7.1-2 (1978). """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).ravel() # Initialize queue lengths if QN is None: QN = np.tile(N, (M, 1)) / M else: QN = np.asarray(QN, dtype=float).copy() + 1e-12 # Avoid zero for numerical stability # Initialize weight matrix if weight is None: weight = np.ones((M, R)) else: weight = np.asarray(weight, dtype=float) XN = np.zeros(R) UN = np.zeros((M, R)) CN = np.zeros((M, R)) relprio = np.zeros((M, R)) for it in range(1, max_iter + 1): QN_old = QN.copy() # Compute relative priorities for ist in range(M): for r in range(R): relprio[ist, r] = QN[ist, r] * weight[ist, r] # Compute residence times with FCFS approximation for r in range(R): for ist in range(M): CN[ist, r] = L[ist, r] for s in range(R): if s != r: # FCFS approximation: weighted by relative priorities if relprio[ist, r] > 0: CN[ist, r] += L[ist, s] * QN[ist, s] * relprio[ist, s] / relprio[ist, r] else: # Same class contribution with arrival theorem correction if N[r] > 0 and relprio[ist, r] > 0: CN[ist, r] += L[ist, r] * QN[ist, r] * (N[r] - 1) / N[r] * relprio[ist, s] / relprio[ist, r] # Compute throughput CN_sum = np.sum(CN[:, r]) if Z[r] + CN_sum > 0: XN[r] = N[r] / (Z[r] + CN_sum) else: XN[r] = 0 # Update queue lengths for r in range(R): for ist in range(M): QN[ist, r] = XN[r] * CN[ist, r] # Compute utilizations for r in range(R): for ist in range(M): UN[ist, r] = XN[r] * L[ist, r] # Check convergence if QN_old.max() > 0: rel_diff = np.abs(1 - QN / np.maximum(QN_old, 1e-12)).max() if rel_diff < tol: break # Compute residence times from queue lengths RN = np.zeros((M, R)) for r in range(R): for ist in range(M): if XN[r] > 0: RN[ist, r] = QN[ist, r] / XN[r] else: RN[ist, r] = L[ist, r] return XN.reshape(1, -1), QN, UN, RN, it
[docs] def pfqn_joint( n: np.ndarray, L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, lGN: float = None ) -> float: """ Compute joint queue-length probability distribution. Computes the joint probability for a given queue-length state vector in a closed product-form queueing network. Args: n: Queue-length state vector (M,) for total or (M x R) for per-class - If 1D (M,): n[i] is the total number of jobs at station i - If 2D (M x R): n[i,r] is the number of class-r jobs at station i L: Service demand matrix (M x R) N: Population vector (1 x R) Z: Think time vector (1 x R) - default: zeros lGN: Log normalizing constant (optional, computed if not provided) Returns: pjoint: Joint probability of state n Examples: # Total queue-lengths (Z > 0) >>> p = pfqn_joint([2, 1], [[10, 2], [5, 4]], [2, 2], [91, 92]) # Per-class queue-lengths >>> p = pfqn_joint([[1, 0], [0, 1]], [[10, 2], [5, 4]], [2, 2], [91, 92]) """ from .nc import pfqn_ca from scipy.special import gammaln L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() n = np.asarray(n, dtype=float) M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).ravel() # Compute normalizing constant if not provided if lGN is None: _, lGN = pfqn_ca(L, N, Z) def factln(x): """Log factorial using gammaln.""" return gammaln(np.asarray(x) + 1) def multinomialln(x): """Log multinomial coefficient.""" x = np.asarray(x, dtype=float) return factln(np.sum(x)) - np.sum(factln(x)) if n.ndim == 1 or (n.ndim == 2 and n.shape[1] == 1): # Joint probability of total queue lengths n = n.ravel() if np.sum(Z) > 0: # With think time n0 = np.sum(N) - np.sum(n) if n0 < 0: return 0.0 # Compute F_per for L extended with Z L_ext = np.vstack([L, Z.reshape(1, -1)]) n_ext = np.concatenate([n, [n0]]) Fjoint = _fper(L_ext, N, n_ext.astype(int)) pjoint = np.exp(np.log(max(Fjoint, 1e-300)) - lGN - factln(n0)) else: Fjoint = _fper(L, N, n.astype(int)) pjoint = np.exp(np.log(max(Fjoint, 1e-300)) - lGN) elif n.ndim == 2 and n.shape[1] == R: # Joint probability of per-class queue-lengths n0 = N - np.sum(n, axis=0) if np.any(n0 < 0): return 0.0 if np.sum(Z) > 0: Fjoint = np.sum(n0 * np.log(np.maximum(Z, 1e-300))) - np.sum(factln(n0)) else: Fjoint = 0.0 for i in range(M): if np.sum(n[i, :]) > 0: Fjoint += multinomialln(n[i, :]) + np.sum(n[i, :] * np.log(np.maximum(L[i, :], 1e-300))) pjoint = np.exp(Fjoint - lGN) else: raise ValueError("Invalid argument to pfqn_joint: n must be (M,) or (M, R)") return max(0.0, pjoint)
def _fper(L: np.ndarray, N: np.ndarray, m: np.ndarray) -> float: """ Compute permanent-based function F_per for joint probability. Internal helper for pfqn_joint. """ from math import factorial from itertools import permutations M, R = L.shape m = np.asarray(m, dtype=int).ravel() # Build matrix Ak Ak_cols = [] for r in range(R): N_r = int(N[r]) for _ in range(N_r): Ak_cols.append(L[:, r]) Ak = np.column_stack(Ak_cols) if Ak_cols else np.zeros((M, 0)) # Build matrix A based on m A_rows = [] for i in range(M): if i < len(m) and m[i] > 0: for _ in range(int(m[i])): A_rows.append(Ak[i, :]) A = np.vstack(A_rows) if A_rows else np.zeros((0, Ak.shape[1])) # Compute permanent (simplified for small matrices) if A.shape[0] == 0 or A.shape[1] == 0: return 1.0 n_rows, n_cols = A.shape if n_rows > n_cols: return 0.0 # Simple permanent computation (exponential but works for small n) if n_rows <= 10: perm = 0.0 from itertools import permutations as perms for p in perms(range(n_cols), n_rows): prod = 1.0 for i, j in enumerate(p): prod *= A[i, j] perm += prod else: # For larger matrices, use approximation perm = _permanent_approx(A) # Divide by product of factorials prod_fact = 1.0 for r in range(R): prod_fact *= factorial(int(N[r])) return perm / prod_fact def _permanent_approx(A: np.ndarray) -> float: """ Approximate permanent using Bethe approximation. For small matrices, computes exact permanent. """ n_rows, n_cols = A.shape if n_rows == 0: return 1.0 # Simple approximation using product of sums # This is an upper bound (van der Waerden) perm = 1.0 for i in range(n_rows): row_sum = np.sum(A[i, :]) perm *= row_sum return perm __all__ = [ 'pfqn_mva', 'pfqn_mva_single_class', 'pfqn_bs', 'pfqn_aql', 'pfqn_sqni', 'pfqn_qd', 'pfqn_qdlin', 'pfqn_qli', 'pfqn_fli', 'pfqn_bsfcfs', 'pfqn_joint', ]