Source code for line_solver.api.pfqn.mvaldmx

"""
Load-Dependent MVA for Mixed Open/Closed Networks.

Native Python implementations of load-dependent MVA algorithms for
mixed queueing networks with limited load dependence.

Key functions:
    pfqn_mvaldmx: Load-dependent MVA for mixed networks
    pfqn_mvaldmx_ec: Effective capacity computation
    pfqn_mvaldms: Multi-server wrapper for mvaldmx

References:
    Original MATLAB: matlab/src/api/pfqn/pfqn_mvaldmx*.m
"""

import numpy as np
from typing import Tuple, Optional

from .schmidt import pprod, hashpop
from .utils import oner


[docs] def pfqn_mvaldmx_ec(lam: np.ndarray, D: np.ndarray, mu: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Compute effective capacity terms for MVALDMX solver. Calculates the effective capacity E, E', and EC terms needed for load-dependent MVA with limited load dependence. Args: lam: Arrival rate vector (R,) D: Service demand matrix (M x R) mu: Load-dependent rate matrix (M x Nt) Returns: Tuple of (EC, E, Eprime, Lo): EC: Effective capacity matrix (M x Nt) E: E-function values (M x (1+Nt)) Eprime: E-prime function values (M x (1+Nt)) Lo: Open class load vector (M,) References: Original MATLAB: matlab/src/api/pfqn/pfqn_mvaldmx_ec.m """ mu = np.atleast_2d(np.asarray(mu, dtype=float)) D = np.atleast_2d(np.asarray(D, dtype=float)) lam = np.asarray(lam, dtype=float).flatten() M, Nt_orig = mu.shape # Compute open class load Lo = np.zeros(M) for ist in range(M): Lo[ist] = np.dot(lam, D[ist, :]) # Find limited load dependence level b for each station b = np.zeros(M, dtype=int) for ist in range(M): # Find first index where mu equals the final value for k in range(Nt_orig): if mu[ist, k] == mu[ist, -1]: b[ist] = k + 1 # 1-based index break if b[ist] == 0: b[ist] = Nt_orig Nt = Nt_orig max_b = int(np.max(b)) # Extend mu if needed mu_ext = np.zeros((M, Nt + 1 + max_b)) mu_ext[:, :Nt] = mu for k in range(Nt, Nt + 1 + max_b): mu_ext[:, k] = mu[:, -1] # C = 1/mu (service time) C = 1.0 / np.maximum(mu_ext, 1e-12) # Initialize outputs EC = np.zeros((M, Nt)) E = np.zeros((M, 1 + Nt)) Eprime = np.zeros((M, 1 + Nt)) for ist in range(M): b_i = int(b[ist]) E1 = np.zeros(1 + Nt) E2 = np.zeros(1 + Nt) E3 = np.zeros(1 + Nt) E2prime = np.zeros(1 + Nt) F2 = np.zeros((1 + Nt, max(1, b_i - 1))) F3 = np.zeros((1 + Nt, max(1, b_i - 1))) F2prime = np.zeros((1 + Nt, max(1, b_i - 1))) for n in range(Nt + 1): if n >= b_i: # Simple formula for n >= b denom = 1 - Lo[ist] * C[ist, b_i - 1] if abs(denom) > 1e-12: E[ist, n] = 1.0 / (denom ** (n + 1)) else: E[ist, n] = 1e12 Eprime[ist, n] = C[ist, b_i - 1] * E[ist, n] else: # n <= b_i - 1 # Compute E1 if n == 0: denom = 1 - Lo[ist] * C[ist, b_i - 1] if abs(denom) > 1e-12: E1[n] = 1.0 / denom else: E1[n] = 1e12 for j in range(b_i - 1): E1[n] *= C[ist, j] / C[ist, b_i - 1] else: E1[n] = (1.0 / (1 - Lo[ist] * C[ist, b_i - 1])) * \ (C[ist, b_i - 1] / C[ist, n - 1]) * E1[n - 1] # Compute F2 for n0 in range(b_i - 1): if n0 == 0: F2[n, n0] = 1.0 else: idx = min(n + n0 - 1, C.shape[1] - 1) F2[n, n0] = ((n + n0) / n0) * Lo[ist] * C[ist, idx] * F2[n, n0 - 1] # Compute E2 E2[n] = np.sum(F2[n, :b_i - 1]) # Compute F3 for n0 in range(b_i - 1): if n == 0 and n0 == 0: F3[n, n0] = 1.0 for j in range(b_i - 1): F3[n, n0] *= C[ist, j] / C[ist, b_i - 1] elif n > 0 and n0 == 0: F3[n, n0] = (C[ist, b_i - 1] / C[ist, n - 1]) * F3[n - 1, 0] else: F3[n, n0] = ((n + n0) / n0) * Lo[ist] * C[ist, b_i - 1] * F3[n, n0 - 1] # Compute E3 E3[n] = np.sum(F3[n, :b_i - 1]) # Compute F2prime for n0 in range(b_i - 1): if n0 == 0: idx = min(n, C.shape[1] - 1) F2prime[n, n0] = C[ist, idx] else: idx = min(n + n0, C.shape[1] - 1) F2prime[n, n0] = ((n + n0) / n0) * Lo[ist] * C[ist, idx] * F2prime[n, n0 - 1] # Compute E2prime E2prime[n] = np.sum(F2prime[n, :b_i - 1]) # Final E and Eprime E[ist, n] = E1[n] + E2[n] - E3[n] if n < b_i - 1: Eprime[ist, n] = C[ist, b_i - 1] * E1[n] + E2prime[n] - C[ist, b_i - 1] * E3[n] else: Eprime[ist, n] = C[ist, b_i - 1] * E[ist, n] # Compute EC for n in range(1, Nt + 1): if E[ist, n - 1] > 0: EC[ist, n - 1] = C[ist, n - 1] * E[ist, n] / E[ist, n - 1] else: EC[ist, n - 1] = 0.0 return EC, E, Eprime, Lo
[docs] def pfqn_mvaldmx(lam: np.ndarray, D: np.ndarray, N: np.ndarray, Z: np.ndarray, mu: Optional[np.ndarray] = None, S: Optional[np.ndarray] = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, float, np.ndarray]: """ Load-dependent MVA for mixed open/closed networks with limited load dependence. Implements the MVALDMX algorithm for analyzing mixed queueing networks with load-dependent service rates using limited load dependence. Args: lam: Arrival rate vector (R,) - non-zero for open classes D: Service demand matrix (M x R) N: Population vector (R,) - inf for open classes Z: Think time vector (R,) mu: Load-dependent rate matrix (M x Nt), optional S: Number of servers per station (M,), optional Returns: Tuple of (XN, QN, UN, CN, lGN, Pc): XN: System throughput (R,) QN: Mean queue lengths (M, R) UN: Utilization (M, R) CN: Cycle times (M, R) lGN: Logarithm of normalizing constant Pc: Marginal queue-length probabilities (M, 1+Ntot, prod(1+Nc)) References: Original MATLAB: matlab/src/api/pfqn/pfqn_mvaldmx.m """ D = np.atleast_2d(np.asarray(D, dtype=float)) N = np.asarray(N, dtype=float).flatten() Z = np.asarray(Z, dtype=float).flatten() lam = np.asarray(lam, dtype=float).flatten() M, R = D.shape # Identify open and closed classes openClasses = np.where(np.isinf(N))[0] closedClasses = np.array([i for i in range(R) if i not in openClasses], dtype=int) Nct = int(np.sum(N[np.isfinite(N)])) # Default mu and S if mu is None: mu = np.ones((M, Nct)) S = np.ones(M) elif S is None: S = np.ones(M) mu = np.atleast_2d(np.asarray(mu, dtype=float)) S = np.asarray(S, dtype=float).flatten() # Extend mu if needed if mu.shape[1] < Nct: mu_ext = np.zeros((M, Nct + 1)) mu_ext[:, :mu.shape[1]] = mu mu_ext[:, mu.shape[1]:] = mu[:, -1].reshape(-1, 1) mu = mu_ext # Initialize outputs XN = np.zeros(R) UN = np.zeros((M, R)) CN = np.zeros((M, R)) QN = np.zeros((M, R)) lGN = 0.0 # Extend mu by one column mu = np.hstack([mu, mu[:, -1:]]) # Compute effective capacity terms EC, E, Eprime = pfqn_mvaldmx_ec(lam, D, mu)[:3] C = len(closedClasses) if C == 0: # Pure open network - use standard formulas for r in openClasses: XN[r] = lam[r] for ist in range(M): rho = np.sum([lam[s] * D[ist, s] for s in openClasses]) if rho < 1: QN[ist, r] = lam[r] * D[ist, r] / (1 - rho) CN[ist, r] = D[ist, r] / (1 - rho) UN[ist, r] = lam[r] * D[ist, r] return XN, QN, UN, CN, lGN, np.array([]) Dc = D[:, closedClasses] Nc = N[closedClasses].astype(int) Zc = Z[closedClasses] # Precomputed products for hashing prods = np.zeros(C, dtype=int) for r in range(C): prods[r] = int(np.prod(Nc[:r] + 1)) total_states = int(np.prod(Nc + 1)) # Initialize nvec = pprod(Nc) Pc = np.zeros((M, 1 + int(np.sum(Nc)), total_states)) x = np.zeros((C, total_states)) w = np.zeros((M, C, total_states)) # Initialize Pc(0|0) = 1 hnvec_init = hashpop(nvec, Nc, C, prods) for ist in range(M): Pc[ist, 0, hnvec_init - 1] = 1.0 u = np.zeros((M, C)) # Population recursion while np.all(nvec >= 0): hnvec = hashpop(nvec, Nc, C, prods) nc = int(np.sum(nvec)) for ist in range(M): for c in range(C): if nvec[c] > 0: hnvec_c = hashpop(oner(nvec, c), Nc, C, prods) # Compute mean residence times for n in range(1, nc + 1): if n - 1 < EC.shape[1] and Pc[ist, n - 1, hnvec_c - 1] > 0: w[ist, c, hnvec - 1] += Dc[ist, c] * n * EC[ist, n - 1] * Pc[ist, n - 1, hnvec_c - 1] # Compute throughput for c in range(C): denom = Zc[c] + np.sum(w[:, c, hnvec - 1]) if denom > 0: x[c, hnvec - 1] = nvec[c] / denom else: x[c, hnvec - 1] = 0.0 # Update Pc for ist in range(M): for n in range(1, nc + 1): for c in range(C): if nvec[c] > 0: hnvec_c = hashpop(oner(nvec, c), Nc, C, prods) if n - 1 < EC.shape[1]: Pc[ist, n, hnvec - 1] += Dc[ist, c] * EC[ist, n - 1] * x[c, hnvec - 1] * Pc[ist, n - 1, hnvec_c - 1] Pc[ist, 0, hnvec - 1] = max(1e-12, 1 - np.sum(Pc[ist, 1:nc + 1, hnvec - 1])) nvec = pprod(nvec, Nc) # Final performance metrics at Nc hnvec = hashpop(Nc, Nc, C, prods) # Utilization # MATLAB: for n=1:sum(Nc), using Eprime(ist, n) which is 1-based # In 0-based terms: n from 0 to sum(Nc)-1 for c in range(C): hnvec_c = hashpop(oner(Nc, c), Nc, C, prods) for ist in range(M): u[ist, c] = 0.0 for n in range(int(np.sum(Nc))): # Fixed: was range(sum(Nc)+1), should be range(sum(Nc)) if n < E.shape[1] and E[ist, n] > 0: u[ist, c] += Dc[ist, c] * x[c, hnvec - 1] * Eprime[ist, n] / E[ist, n] * Pc[ist, n, hnvec_c - 1] # Throughput XN[closedClasses] = x[:, hnvec - 1] # Utilization UN[:, closedClasses] = u # Response time CN[:, closedClasses] = w[:, :, hnvec - 1] # Queue-length for c in range(C): QN[:, closedClasses[c]] = XN[closedClasses[c]] * CN[:, closedClasses[c]] # Open class metrics for r in openClasses: XN[r] = lam[r] for ist in range(M): QN[ist, r] = 0.0 for n in range(int(np.sum(Nc)) + 1): if n < EC.shape[1]: QN[ist, r] += lam[r] * D[ist, r] * (n + 1) * EC[ist, n] * Pc[ist, n, hnvec - 1] if lam[r] > 0: CN[ist, r] = QN[ist, r] / lam[r] else: CN[ist, r] = 0.0 UN[ist, r] = 0.0 for n in range(int(np.sum(Nc)) + 1): if n < E.shape[1] and E[ist, n] > 0: UN[ist, r] += lam[r] * Eprime[ist, n] / E[ist, n] * Pc[ist, n, hnvec - 1] # Final Pc Pc_final = Pc[:, :, hnvec - 1] return XN, QN, UN, CN, lGN, Pc_final
[docs] def pfqn_mvaldms(lam: np.ndarray, D: np.ndarray, N: np.ndarray, Z: np.ndarray, S: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, float]: """ Load-dependent MVA for multiserver mixed networks. Wrapper for pfqn_mvaldmx that adjusts utilizations to account for multi-server stations. Args: lam: Arrival rate vector (R,) D: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,) S: Number of servers per station (M,) Returns: Tuple of (XN, QN, UN, CN, lGN): XN: System throughput (R,) QN: Mean queue lengths (M, R) UN: Utilization (M, R) - adjusted for multiservers CN: Cycle times (M, R) lGN: Logarithm of normalizing constant References: Original MATLAB: matlab/src/api/pfqn/pfqn_mvaldms.m """ D = np.atleast_2d(np.asarray(D, dtype=float)) N = np.asarray(N, dtype=float).flatten() Z = np.asarray(Z, dtype=float).flatten() lam = np.asarray(lam, dtype=float).flatten() S = np.asarray(S, dtype=float).flatten() M, R = D.shape # Compute closed population Nct = int(np.sum(N[np.isfinite(N)])) if len(Z) == 0 or Z is None: Z = np.zeros(R) # Handle pure open networks (Nct=0) using M/M/k formulas directly # pfqn_mvaldmx cannot handle empty mu matrix if Nct == 0: # All classes are open - use M/M/k formulas openClasses = np.where(np.isinf(N))[0] XN = np.zeros(R) QN = np.zeros((M, R)) UN = np.zeros((M, R)) CN = np.zeros((M, R)) lGN = np.nan # Normalizing constant not defined for open networks for r in openClasses: XN[r] = lam[r] for ist in range(M): if D[ist, r] > 0 and S[ist] > 0: # M/M/k: rho = lambda * D / S rho = lam[r] * D[ist, r] / S[ist] UN[ist, r] = rho if rho < 1: k = int(S[ist]) if k == 1: # M/M/1: Q = rho/(1-rho) QN[ist, r] = rho / (1 - rho) else: # M/M/k approximation using Erlang-C from ..qsys import qsys_mmk mu_rate = 1.0 / D[ist, r] if D[ist, r] > 0 else float('inf') result = qsys_mmk(lam[r], mu_rate, k) QN[ist, r] = result.get('L', rho / (1 - rho)) CN[ist, r] = QN[ist, r] / lam[r] if lam[r] > 0 else 0 return XN, QN, UN, CN, lGN # Build load-dependent rate matrix mu = np.ones((M, Nct)) for ist in range(M): for n in range(Nct): mu[ist, n] = min(n + 1, S[ist]) # Call mvaldmx XN, QN, _, CN, lGN, _ = pfqn_mvaldmx(lam, D, N, Z, mu, S) # Identify open and closed classes openClasses = np.where(np.isinf(N))[0] closedClasses = np.array([i for i in range(R) if i not in openClasses], dtype=int) # Adjust utilization for multiservers UN = np.zeros((M, R)) for r in closedClasses: for ist in range(M): UN[ist, r] = XN[r] * D[ist, r] / S[ist] for r in openClasses: for ist in range(M): UN[ist, r] = lam[r] * D[ist, r] / S[ist] return XN, QN, UN, CN, lGN
__all__ = [ 'pfqn_mvaldmx', 'pfqn_mvaldmx_ec', 'pfqn_mvaldms', ]