Source code for line_solver.api.pfqn.linearizer

"""
Linearizer Approximate MVA for Product-Form Networks.

Implements the linearizer family of approximate MVA methods for closed
queueing networks where exact MVA becomes computationally prohibitive.
Provides near-exact accuracy with reduced computational complexity.
"""

import numpy as np
from typing import Tuple, List, Optional, Union
from enum import Enum

# Scheduling strategy enum
[docs] class SchedStrategy(Enum): """Scheduling strategies for queueing stations.""" INF = "INF" # Infinite server (delay) FCFS = "FCFS" # First Come First Serve PS = "PS" # Processor Sharing LCFS = "LCFS" # Last Come First Serve SIRO = "SIRO" # Service In Random Order
def _oner(N: np.ndarray, indices: List[int]) -> np.ndarray: """ Create population vector with one less job in specified classes. Args: N: Population vector indices: List of class indices to decrement (-1 means no decrement) Returns: Modified population vector """ N_1 = N.copy().astype(float).ravel() for idx in indices: if idx >= 0 and idx < len(N_1): N_1[idx] = max(0, N_1[idx] - 1) return N_1
[docs] def pfqn_linearizer( L: np.ndarray, N: np.ndarray, Z: np.ndarray, sched_type: List[str], tol: float = 1e-8, maxiter: int = 1000 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ Linearizer approximate MVA algorithm. Args: L: Demand matrix (M x R) - rows are stations, columns are classes N: Population vector (R,) Z: Think time vector (R,) sched_type: List of scheduling strategies per station ('FCFS', 'PS', etc.) tol: Convergence tolerance maxiter: Maximum iterations Returns: Tuple of (Q, U, W, T, C, X, iterations): Q: Queue lengths (M x R) U: Utilizations (M x R) W: Waiting times (M x R) T: Station throughputs (M x R) C: Response times (1 x R) X: Class throughputs (1 x R) iterations: Number of iterations """ return pfqn_gflinearizer(L, N, Z, sched_type, tol, maxiter, alpha=1.0)
[docs] def pfqn_gflinearizer( L: np.ndarray, N: np.ndarray, Z: np.ndarray, sched_type: List[str], tol: float = 1e-8, maxiter: int = 1000, alpha: float = 1.0 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ General-form linearizer approximate MVA. Args: L: Demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,) sched_type: List of scheduling strategies per station tol: Convergence tolerance maxiter: Maximum iterations alpha: Linearization parameter (scalar) Returns: Same as pfqn_linearizer """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() Z = np.asarray(Z, dtype=float).ravel() R = len(N) alpha_vec = np.full(R, alpha) return pfqn_egflinearizer(L, N, Z, sched_type, tol, maxiter, alpha_vec)
[docs] def pfqn_egflinearizer( L: np.ndarray, N: np.ndarray, Z: np.ndarray, sched_type: List[str], tol: float = 1e-8, maxiter: int = 1000, alpha: np.ndarray = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]: """ Extended general-form linearizer with class-specific parameters. Args: L: Demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,) sched_type: List of scheduling strategies per station tol: Convergence tolerance maxiter: Maximum iterations alpha: Class-specific linearization parameters (R,) Returns: Tuple of (Q, U, W, T, C, X, iterations) """ from .mva import pfqn_bs L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() Z = np.asarray(Z, dtype=float).ravel() M, R = L.shape if alpha is None: alpha = np.ones(R) else: alpha = np.asarray(alpha, dtype=float).ravel() # Handle empty or zero demand case if L.size == 0 or np.max(L) == 0: X = N / Z Q = np.zeros((M, R)) U = np.zeros((M, R)) W = np.zeros((M, R)) T = np.zeros((M, R)) C = np.zeros(R) return Q, U, W, T, C, X.reshape(1, -1), 0 # Initialize Q arrays for each station # Q[i] is (R x (1+R)) matrix: Q[i][r, s+1] = queue length of class r when one job of class s is removed Q = [np.zeros((R, 1 + R)) for _ in range(M)] Delta = [np.zeros((R, R)) for _ in range(M)] # Initial estimates using balanced system for s in range(-1, R): N_1 = _oner(N, [s]) init_result = pfqn_bs(L, N_1, Z) Q_init = init_result[2] # QN from pfqn_bs for i in range(M): for r in range(R): Q[i][r, 1 + s] = Q_init[i, r] totiter = 0 # Main outer loop (3 iterations for linearization) for I in range(3): for s in range(-1, R): N_1 = _oner(N, [s]) Q1 = np.zeros((M, R)) for i in range(M): for j in range(R): Q1[i, j] = Q[i][j, 1 + s] Q_new, W_new, T_new, iter_count = _egflinearizer_core( L, M, R, N_1, Z, Q1, Delta, sched_type, tol, maxiter - totiter, alpha ) for i in range(M): for j in range(R): Q[i][j, 1 + s] = Q_new[i, j] totiter += iter_count # Update delta for i in range(M): for r in range(R): if N[r] == 1: for j in range(R): Q[i][j, 1 + r] = 0 for s in range(R): if N[s] > 1: Ns = _oner(N, [s]) if N[r] > 0 and Ns[r] > 0: Delta[i][r, s] = ( Q[i][r, 1 + s] / np.power(Ns[r], alpha[r]) - Q[i][r, 0] / np.power(N[r], alpha[r]) ) # Final core iteration with full population Q1 = np.zeros((M, R)) for i in range(M): for j in range(R): Q1[i, j] = Q[i][j, 0] Q_final, W, T, iter_count = _egflinearizer_core( L, M, R, N, Z, Q1, Delta, sched_type, tol, maxiter - totiter, alpha ) totiter += iter_count # Compute performance metrics X = T.copy() U = np.zeros((M, R)) for i in range(M): for r in range(R): U[i, r] = X[r] * L[i, r] C = np.zeros(R) for r in range(R): if X[r] > 0: C[r] = N[r] / X[r] - Z[r] return Q_final, U, W, U, C.reshape(1, -1), X.reshape(1, -1), totiter
def _egflinearizer_core( L: np.ndarray, M: int, R: int, N_1: np.ndarray, Z: np.ndarray, Q: np.ndarray, Delta: List[np.ndarray], sched_type: List[str], tol: float, maxiter: int, alpha: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, int]: """ Core iteration for extended general-form linearizer. Returns: Tuple of (Q, W, T, iterations) """ Q = Q.copy() W = L.copy() T = None has_converged = False iter_count = 0 while not has_converged: Q_last = Q.copy() # Estimate population Q_1 = _egflinearizer_estimate(L, M, R, N_1, Z, Q, Delta, W, alpha) # Forward MVA Q, W, T = _egflinearizer_forward_mva(L, M, R, sched_type, N_1, Z, Q_1) # Check convergence diff_norm = np.linalg.norm(Q - Q_last) if diff_norm < tol or iter_count > maxiter: has_converged = True iter_count += 1 return Q, W, T, iter_count def _egflinearizer_estimate( L: np.ndarray, M: int, R: int, N_1: np.ndarray, Z: np.ndarray, Q: np.ndarray, Delta: List[np.ndarray], W: np.ndarray, alpha: np.ndarray ) -> List[np.ndarray]: """ Estimate intermediate queue lengths for linearizer. Returns: List of Q_1 matrices for each station """ Q_1 = [np.zeros((R, 1 + R)) for _ in range(M)] for i in range(M): for r in range(R): for s in range(R): Ns = _oner(N_1, [s]) if N_1[r] > 0 and Ns[r] > 0: Q_1[i][r, 1 + s] = ( np.power(Ns[r], alpha[r]) * (Q[i, r] / np.power(N_1[r], alpha[r]) + Delta[i][r, s]) ) return Q_1 def _egflinearizer_forward_mva( L: np.ndarray, M: int, R: int, sched_type: List[str], N_1: np.ndarray, Z: np.ndarray, Q_1: List[np.ndarray] ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Forward MVA step for linearizer. Returns: Tuple of (Q, W, T) """ W = np.zeros((M, R)) T = np.zeros(R) Q = np.zeros((M, R)) # Compute residence times for i in range(M): for r in range(R): sched = sched_type[i].upper() if isinstance(sched_type[i], str) else sched_type[i] if sched == 'FCFS': W[i, r] = L[i, r] if L[i, r] != 0: for s in range(R): W[i, r] += L[i, s] * Q_1[i][s, 1 + r] else: # PS or other strategies sum_Q = np.sum(Q_1[i][:, 1 + r]) W[i, r] = L[i, r] * (1 + sum_Q) # Compute throughputs and queue lengths for r in range(R): W_col_sum = np.sum(W[:, r]) if Z[r] + W_col_sum > 0: T[r] = N_1[r] / (Z[r] + W_col_sum) for i in range(M): Q[i, r] = T[r] * W[i, r] return Q, W, T __all__ = [ 'pfqn_linearizer', 'pfqn_gflinearizer', 'pfqn_egflinearizer', 'SchedStrategy', ]