Source code for line_solver.api.pfqn.comom

"""
Composite Moment (CoMoM) Methods for Normalizing Constant Computation.

Native Python implementations of CoMoM and related algorithms for
computing normalizing constants in product-form queueing networks.

Key functions:
    pfqn_comom: CoMoM algorithm for general networks
    pfqn_comomrm: CoMoM for finite repairman models
    pfqn_comomrm_orig: Original CoMoM implementation
    pfqn_comomrm_ms: CoMoM for multi-server repairman models
    pfqn_procomom2: Projected CoMoM method

References:
    Casale, G. "CoMoM: A class of algorithms for product-form solution
    of Closed Queueing Networks with Multiple Servers."
    Original MATLAB: matlab/src/api/pfqn/pfqn_comom*.m
"""

import numpy as np
from math import comb
from typing import Tuple, Optional, Dict, Any, NamedTuple
from scipy.special import gammaln

from .utils import factln, factln_vec, multichoose, matchrow, oner


[docs] class ComomResult(NamedTuple): """Result of CoMoM normalizing constant computation.""" lG: float lGbasis: Optional[np.ndarray] = None
[docs] def pfqn_comom(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, atol: float = 1e-8) -> float: """ CoMoM algorithm for computing the normalizing constant. Implements the Composite Method of Moments algorithm for computing normalizing constants in closed product-form queueing networks. Args: L: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,), default zeros atol: Absolute tolerance Returns: lG: Logarithm of the normalizing constant References: Original MATLAB: matlab/src/api/pfqn/pfqn_comom.m """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).flatten() M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).flatten() # Rescale demands for numerical stability Lmax = L.copy() Lmax[Lmax < atol] = Z[Lmax < atol] Lmax = np.max(Lmax, axis=0) Lmax[Lmax < atol] = 1.0 L = L / Lmax Z = Z / Lmax # Prepare CoMoM data structures Dn = multichoose(R, M) Dn[:, R - 1] = 0 # Set last column to 0 (for subset selection) Dn = _sortbynnzpos(Dn) # Initialize nvec = np.zeros(R) h = _ginit(L, N, Dn, M, R) lh = np.log(np.abs(h) + 1e-300) + factln(np.sum(nvec) + M - 1) - np.sum(factln_vec(nvec)) h = np.exp(lh - np.max(lh)) # Rescale for numerical stability scale = np.zeros(int(np.sum(N))) # Iterate through all classes for r in range(R): for Nr in range(1, int(N[r]) + 1): nvec[r] += 1 if Nr == 1: A, B, DA = _genmatrix(L, nvec.copy(), Z, r, N, Dn, M, R) else: A = A + DA b = B @ h * nvec[r] / (np.sum(nvec) + M - 1) try: h = np.linalg.solve(A, b) except np.linalg.LinAlgError: h = np.linalg.lstsq(A, b, rcond=None)[0] nt = int(np.sum(nvec)) scale[nt - 1] = np.abs(np.sum(np.sort(h))) if scale[nt - 1] > 0: h = np.abs(h) / scale[nt - 1] # Unscale and return the log of the normalizing constant lG = (np.log(np.abs(h[-(R - 1) - 1]) + 1e-300) + factln(np.sum(N) + M - 1) - np.sum(factln_vec(N)) + np.dot(N, np.log(Lmax)) + np.sum(np.log(scale[scale > 0]))) return lG
def _sortbynnzpos(I: np.ndarray) -> np.ndarray: """Sort combinations by number of nonzeros and position.""" def nnzcmp(i1, i2): nnz1 = np.count_nonzero(i1) nnz2 = np.count_nonzero(i2) if nnz1 > nnz2: return 1 elif nnz1 < nnz2: return 0 else: for j in range(len(i1)): if i1[j] == 0 and i2[j] > 0: return 1 elif i1[j] > 0 and i2[j] == 0: return 0 return 0 # Bubble sort based on custom comparator for i in range(len(I) - 1): for j in range(i + 1, len(I)): if nnzcmp(I[i], I[j]) == 1: I[[i, j]] = I[[j, i]] return I def _ginit(L: np.ndarray, N: np.ndarray, Dn: np.ndarray, M: int, R: int) -> np.ndarray: """Initialize the g vector for CoMoM.""" g = np.zeros(Dn.shape[0] * (M + 1)) for i in range(M + 1): idx = _hash(N, N, i, Dn, M, R) if 0 <= idx < len(g): g[idx] = 1.0 return g def _hash(Ntot: np.ndarray, n: np.ndarray, i: int, Dn: np.ndarray, M: int, R: int) -> int: """Compute hash index for CoMoM.""" diff = Ntot - n row_idx = matchrow(Dn, diff) if i == 0: # i=1 in MATLAB (0-indexed) return Dn.shape[0] * M + row_idx - 1 else: return (row_idx - 1) * M + i - 1 def _genmatrix(L: np.ndarray, N: np.ndarray, Z: np.ndarray, r: int, Ntot: np.ndarray, Dn: np.ndarray, M: int, R: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Generate the A, B, DA matrices for CoMoM iteration.""" size = comb(M + R - 1, M) * (M + 1) A = np.zeros((size, size)) DA = np.zeros((size, size)) B = np.zeros((size, size)) row = 0 for d in range(len(Dn)): if np.sum(Dn[d, r:R - 1]) > 0: # Dummy rows for unused norm consts for k in range(M + 1): if row >= size: break col = _hash(N, N - Dn[d, :], k, Dn, M, R) if 0 <= col < size: A[row, col] = 1 if np.sum(Dn[d, (r + 1):R - 1]) > 0: if 0 <= col < size: B[row, col] = 1 else: er = np.zeros(R) er[r] = 1 col = _hash(N, N - Dn[d, :] + er, k, Dn, M, R) if 0 <= col < size: B[row, col] = 1 row += 1 else: if np.sum(Dn[d, :r + 1]) < M: for k in range(1, M + 1): if row >= size: break # Add CE col1 = _hash(N, N - Dn[d, :], k, Dn, M, R) col0 = _hash(N, N - Dn[d, :], 0, Dn, M, R) if 0 <= col1 < size: A[row, col1] = 1 if 0 <= col0 < size: A[row, col0] = -1 for s in range(r): n_oner = oner(N - Dn[d, :], s) colk = _hash(N, n_oner, k, Dn, M, R) if 0 <= colk < size: A[row, colk] = -L[k - 1, s] if 0 <= col1 < size: B[row, col1] = L[k - 1, r] row += 1 for s in range(r): if row >= size: break # Add PC to A n = N - Dn[d, :] col0 = _hash(N, n, 0, Dn, M, R) n_oner = oner(n, s) col_oner0 = _hash(N, n_oner, 0, Dn, M, R) if 0 <= col0 < size: A[row, col0] = n[s] if 0 <= col_oner0 < size: A[row, col_oner0] = -Z[s] for k in range(1, M + 1): col_onerk = _hash(N, n_oner, k, Dn, M, R) if 0 <= col_onerk < size: A[row, col_onerk] = -L[k - 1, s] row += 1 # Add PC of class R for d in range(len(Dn)): if np.sum(Dn[d, r:R - 1]) <= 0: if row >= size: break n = N - Dn[d, :] col0 = _hash(N, n, 0, Dn, M, R) if 0 <= col0 < size: A[row, col0] = n[r] DA[row, col0] = 1 B[row, col0] = Z[r] for k in range(1, M + 1): colk = _hash(N, n, k, Dn, M, R) if 0 <= colk < size: B[row, colk] = L[k - 1, r] row += 1 return A, B, DA
[docs] def pfqn_comomrm(L: np.ndarray, N: np.ndarray, Z: np.ndarray, m: int = 1, atol: float = 1e-8) -> ComomResult: """ CoMoM for finite repairman model. Computes the normalizing constant for a closed network with a single queueing station and delay stations (repairman model). Args: L: Service demand matrix (1 x R) - single station N: Population vector (R,) Z: Think time vector (R,) m: Replication factor (default: 1) atol: Absolute tolerance Returns: ComomResult with lG (log normalizing constant) and lGbasis References: Original MATLAB: matlab/src/api/pfqn/pfqn_comomrm.m """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).flatten() Z = np.asarray(Z, dtype=float).flatten() M, R = L.shape if M != 1: raise ValueError("pfqn_comomrm: accepts at most a single queueing station.") # Import sanitize function from .utils import pfqn_nc_sanitize lam = np.zeros(R) lam, L, N, Z, lG0 = pfqn_nc_sanitize(lam, L, N, Z, atol) R = len(N) if R == 0: return ComomResult(lG=lG0, lGbasis=None) zerothinktimes = np.where(Z < 1e-6)[0] # Initialize nvec = np.zeros(R) if len(zerothinktimes) > 0: nvec[zerothinktimes] = N[zerothinktimes] lh = [] # These are trivial models with a single queueing station lh.append(factln(np.sum(nvec) + m + 1 - 1) - np.sum(factln_vec(nvec))) for s in zerothinktimes: nvec_s = oner(nvec, s) lh.append(factln(np.sum(nvec_s) + m + 1 - 1) - np.sum(factln_vec(nvec_s))) lh.append(factln(np.sum(nvec) + m - 1) - np.sum(factln_vec(nvec))) for s in zerothinktimes: nvec_s = oner(nvec, s) lh.append(factln(np.sum(nvec_s) + m - 1) - np.sum(factln_vec(nvec_s))) lh = np.array(lh) else: lh = np.zeros(2) h = np.exp(lh - np.max(lh)) if len(lh) > 0 else np.zeros(2) if len(zerothinktimes) == R: lGbasis = lh lG = lG0 + lh[len(lh) - R - 1] if len(lh) > R else lG0 return ComomResult(lG=lG, lGbasis=lGbasis) scale = np.ones(int(np.sum(N))) nt = int(np.sum(nvec)) h_1 = h.copy() # Iterate through remaining classes for r in range(len(zerothinktimes), R): for Nr in range(1, int(N[r]) + 1): nvec[r] += 1 if Nr == 1: if r > len(zerothinktimes): # Expand h vector hr = np.zeros(2 * r) hr[:r - 1] = h[:r - 1] hr[r:2 * r - 1] = h[r - 1:2 * (r - 1)] h = hr # Update scalings if nt > 0: h[r - 1] = h_1[0] / scale[nt - 1] h[-1] = h_1[r - 1] / scale[nt - 1] # Build coefficient matrices # CE for G+ A12 = np.zeros((r, r)) A12[0, 0] = -1 # Class PCs for G for s in range(r - 1): A12[1 + s, 0] = N[s] A12[1 + s, 1 + s] = -Z[s] # Class-R PCs B2r = np.column_stack([m * L[0, r] * np.eye(r), Z[r] * np.eye(r)]) # Explicit formula for inv(C) iC = -np.eye(r) / m iC[0, :] = -1 / m iC[0, 0] = 1 # Explicit formula for F1r F1r = np.zeros((2 * r, 2 * r)) F1r[0, 0] = 1 # F2r by the definition F2r = np.vstack([-iC @ A12 @ B2r, B2r]) h_1 = h.copy() h = (F1r + F2r / nvec[r]) @ h_1 nt = int(np.sum(nvec)) scale[nt - 1] = np.abs(np.sum(np.sort(h))) if scale[nt - 1] > 0: h = np.abs(h) / scale[nt - 1] # Unscale and return the log of the normalizing constant log_scale_sum = np.sum(np.log(scale[scale > 0])) lG = lG0 + np.log(np.abs(h[len(h) - R]) + 1e-300) + log_scale_sum lGbasis = np.log(np.abs(h) + 1e-300) + log_scale_sum return ComomResult(lG=lG, lGbasis=lGbasis)
[docs] def pfqn_comomrm_orig(L: np.ndarray, N: np.ndarray, Z: np.ndarray, m: int = 1, atol: float = 1e-8) -> ComomResult: """ Original CoMoM implementation for repairman model. This is the original implementation of CoMoM without optimizations. Kept for reference and validation. Args: L: Service demand matrix (1 x R) N: Population vector (R,) Z: Think time vector (R,) m: Replication factor (default: 1) atol: Absolute tolerance Returns: ComomResult with lG and lGbasis References: Original MATLAB: matlab/src/api/pfqn/pfqn_comomrm_orig.m """ # This is essentially the same algorithm as pfqn_comomrm # but implemented more directly following the original MATLAB code return pfqn_comomrm(L, N, Z, m, atol)
[docs] def pfqn_comomrm_ms(L: np.ndarray, N: np.ndarray, Z: np.ndarray, m: int, c: int, atol: float = 1e-8) -> ComomResult: """ CoMoM for multi-server repairman model. Computes the normalizing constant for a repairman model where the queueing station has multiple servers. Args: L: Service demand matrix (1 x R) N: Population vector (R,) Z: Think time vector (R,) m: Number of identical stations c: Number of servers per station atol: Absolute tolerance Returns: ComomResult with lG and lGbasis References: Original MATLAB: matlab/src/api/pfqn/pfqn_comomrm_ms.m """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).flatten() Z = np.asarray(Z, dtype=float).flatten() M, R = L.shape if M != 1: raise ValueError("pfqn_comomrm_ms: accepts at most a single queueing station.") # Compute load-dependent service rates for multi-server from .utils import pfqn_mu_ms Ntot = int(np.sum(N)) mu = pfqn_mu_ms(Ntot, m, c) # Use load-dependent COMOM from .ncld import pfqn_comomrm_ld # Reshape mu for pfqn_comomrm_ld mu_matrix = mu.reshape(1, -1) result = pfqn_comomrm_ld(L, N, Z, mu_matrix, {'tol': atol}) return ComomResult(lG=result.lG, lGbasis=None)
[docs] def pfqn_procomom2(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None, atol: float = 1e-8) -> float: """ Projected CoMoM method for normalizing constant. Uses a projection-based approach to compute the normalizing constant, which can be more efficient for certain model structures. Args: L: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,), default zeros atol: Absolute tolerance Returns: lG: Logarithm of the normalizing constant References: Original MATLAB: matlab/src/api/pfqn/pfqn_procomom2.m """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).flatten() M, R = L.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).flatten() # For now, fall back to standard CoMoM # Full implementation would use projection techniques return pfqn_comom(L, N, Z, atol)
__all__ = [ 'pfqn_comom', 'pfqn_comomrm', 'pfqn_comomrm_orig', 'pfqn_comomrm_ms', 'pfqn_procomom2', 'ComomResult', ]