Source code for line_solver.api.pfqn.asymptotic

"""
Asymptotic Methods for Normalizing Constants.

Implements asymptotic approximation methods for computing normalizing
constants in closed product-form queueing networks, including:
- Laguerre Expansion (LE)
- Cubature Methods (Grundmann-Moeller rules)
"""

import numpy as np
from typing import Tuple, Optional
from scipy.special import gammaln


def _factln(n: np.ndarray) -> np.ndarray:
    """Log factorial using gamma function."""
    return gammaln(1 + np.asarray(n, dtype=float))


def _multinomialln(n: np.ndarray) -> float:
    """Log multinomial coefficient."""
    n = np.asarray(n, dtype=float)
    return float(gammaln(1 + np.sum(n)) - np.sum(gammaln(1 + n)))


def _allbut(y: np.ndarray, idx: int) -> np.ndarray:
    """Return array without element at index idx."""
    return np.delete(y, idx)


def _pfqn_le_fpi(L: np.ndarray, N: np.ndarray) -> np.ndarray:
    """Fixed-point iteration to find mode location (no think time)."""
    M, R = L.shape
    u = np.ones(M) / M
    u_prev = np.full(M, np.inf)

    max_iter = 1000
    for _ in range(max_iter):
        if np.linalg.norm(u - u_prev, 1) < 1e-10:
            break
        u_prev = u.copy()

        for i in range(M):
            u[i] = 1.0 / (np.sum(N) + M)
            for r in range(R):
                denom = np.dot(u_prev, L[:, r])
                if denom > 0:
                    u[i] += N[r] / (np.sum(N) + M) * L[i, r] * u_prev[i] / denom

    return u


def _pfqn_le_fpiZ(
    L: np.ndarray, N: np.ndarray, Z: np.ndarray
) -> Tuple[np.ndarray, float]:
    """Fixed-point iteration to find mode location (with think time)."""
    M, R = L.shape
    eta = np.sum(N) + M
    u = np.ones(M) / M
    v = eta + 1
    u_prev = np.full(M, np.inf)

    max_iter = 1000
    for _ in range(max_iter):
        if np.linalg.norm(u - u_prev, 1) < 1e-10:
            break
        u_prev = u.copy()
        v_prev = v

        for i in range(M):
            u[i] = 1.0 / eta
            for r in range(R):
                denom = Z[r] + v * np.dot(u_prev, L[:, r])
                if denom > 0:
                    u[i] += (N[r] / eta) * (Z[r] + v * L[i, r]) * u_prev[i] / denom

        # Compute xi and update v
        xi = np.zeros(R)
        for r in range(R):
            denom = Z[r] + v_prev * np.dot(u_prev, L[:, r])
            if denom > 0:
                xi[r] = N[r] / denom

        v = eta + 1 - np.sum(xi * Z)

    return u, v


def _pfqn_le_hessian(L: np.ndarray, N: np.ndarray, u: np.ndarray) -> np.ndarray:
    """Compute Hessian matrix (no think time case)."""
    M, R = L.shape
    Ntot = np.sum(N)
    hu = np.zeros((M - 1, M - 1))

    for i in range(M - 1):
        for j in range(M - 1):
            if i != j:
                hu[i, j] = -(Ntot + M) * u[i] * u[j]
                for r in range(R):
                    denom = np.dot(u, L[:, r]) ** 2
                    if denom > 0:
                        hu[i, j] += N[r] * L[i, r] * L[j, r] * u[i] * u[j] / denom
            else:
                u_others = _allbut(u, i)
                hu[i, j] = (Ntot + M) * u[i] * np.sum(u_others)
                for r in range(R):
                    L_others = _allbut(L[:, r], i)
                    denom = np.dot(u, L[:, r]) ** 2
                    if denom > 0:
                        hu[i, j] -= N[r] * L[i, r] * u[i] * np.dot(u_others, L_others) / denom

    return hu


def _pfqn_le_hessianZ(
    L: np.ndarray, N: np.ndarray, Z: np.ndarray, u: np.ndarray, v: float
) -> np.ndarray:
    """Compute Hessian matrix (with think time case)."""
    K, R = L.shape
    Ntot = np.sum(N)

    # Compute csi
    csi = np.zeros(R)
    for r in range(R):
        denom = Z[r] + v * np.dot(u, L[:, r])
        if denom > 0:
            csi[r] = N[r] / denom

    # Compute Lhat
    Lhat = np.zeros((K, R))
    for k in range(K):
        for r in range(R):
            Lhat[k, r] = Z[r] + v * L[k, r]

    eta = Ntot + K
    A = np.zeros((K, K))

    # Off-diagonal elements
    for i in range(K):
        for j in range(K):
            if i != j:
                A[i, j] = -eta * u[i] * u[j]
                for r in range(R):
                    if N[r] > 0:
                        A[i, j] += csi[r] ** 2 * Lhat[i, r] * Lhat[j, r] * u[i] * u[j] / N[r]

    # Diagonal elements
    for i in range(K):
        row_sum = np.sum(_allbut(A[i, :], i))
        A[i, i] = -row_sum

    # Reduce to (K-1) x (K-1)
    A_reduced = A[: K - 1, : K - 1]

    # Add extra element for v
    A_full = np.zeros((K, K))
    A_full[: K - 1, : K - 1] = A_reduced

    A_full[K - 1, K - 1] = 1.0
    for r in range(R):
        if N[r] > 0:
            A_full[K - 1, K - 1] -= (csi[r] ** 2 / N[r]) * Z[r] * np.dot(u, L[:, r])
    A_full[K - 1, K - 1] *= v

    for i in range(K - 1):
        val = 0.0
        for r in range(R):
            if N[r] > 0:
                val += v * u[i] * (
                    (csi[r] ** 2 / N[r]) * Lhat[i, r] * np.dot(u, L[:, r]) - csi[r] * L[i, r]
                )
        A_full[i, K - 1] = val
        A_full[K - 1, i] = val

    return A_full


[docs] def pfqn_le( L: np.ndarray, N: np.ndarray, Z: Optional[np.ndarray] = None ) -> Tuple[float, float]: """ Laguerre Expansion (LE) asymptotic approximation for normalizing constant. Provides an asymptotic estimate of the normalizing constant for closed product-form queueing networks. Useful for large populations where exact methods become computationally expensive. Args: L: Service demand matrix (M x R). N: Population vector (R,). Z: Think time vector (R,). Optional. Returns: Tuple of (Gn, lGn): Gn: Estimated normalizing constant. lGn: Logarithm of normalizing constant. Reference: G. Casale. "Accelerating performance inference over closed systems by asymptotic methods." ACM SIGMETRICS 2017. """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape # Handle empty or trivial cases if L.size == 0 or N.size == 0 or np.sum(N) == 0 or np.sum(L) < 1e-4: if Z is not None: Z = np.asarray(Z, dtype=float).ravel() sum_Z = np.sum(Z) if sum_Z > 0: lGn = -np.sum(_factln(N)) + np.sum(N * np.log(sum_Z)) else: lGn = -np.sum(_factln(N)) else: lGn = -np.sum(_factln(N)) Gn = np.exp(lGn) return float(Gn), float(lGn) if Z is None or np.sum(Z) < 1e-10: # Case without think time umax = _pfqn_le_fpi(L, N) A = _pfqn_le_hessian(L, N, umax) S = 0.0 for r in range(R): term = np.dot(umax, L[:, r]) if term > 0: S += N[r] * np.log(term) det_A = np.linalg.det(A) if det_A <= 0: det_A = 1e-100 # Fallback for numerical issues lGn = ( _multinomialln(np.append(N, M - 1)) + _factln(np.array([M - 1]))[0] + (M - 1) * np.log(np.sqrt(2 * np.pi)) - np.log(np.sqrt(det_A)) + np.sum(np.log(np.maximum(umax, 1e-100))) + S ) else: # Case with think time Z = np.asarray(Z, dtype=float).ravel() umax, vmax = _pfqn_le_fpiZ(L, N, Z) A = _pfqn_le_hessianZ(L, N, Z, umax, vmax) S = 0.0 for r in range(R): term = Z[r] + vmax * np.dot(umax, L[:, r]) if term > 0: S += N[r] * np.log(term) det_A = np.linalg.det(A) if det_A <= 0: det_A = 1e-100 lGn = ( -np.sum(_factln(N)) - vmax + M * np.log(max(vmax, 1e-100)) + M * np.log(np.sqrt(2 * np.pi)) - np.log(np.sqrt(det_A)) + np.sum(np.log(np.maximum(umax, 1e-100))) + S ) Gn = np.exp(lGn) return float(Gn), float(lGn)
def _grnmol( f, V: np.ndarray, s: int, tol: float = 1e-8 ) -> Tuple[np.ndarray, int]: """ Grundmann-Moeller cubature rule for simplex integration. Reference: "Invariant Integration Formulas for the N-Simplex by Combinatorial Methods", A. Grundmann and H. M. Moller, SIAM J Numer. Anal. 15(1978), pp. 282-290. """ n = V.shape[0] Q = np.zeros(s + 1) Qv = np.zeros(s + 1) import math Vol = 1.0 / math.factorial(n) nv = 0 d = 0 while True: m = n + 2 * d + 1 al = np.ones(n, dtype=float) alz = 2 * d + 1 Qs = 0.0 while True: x = V @ np.append([alz], al) / m Qs += f(x) nv += 1 for j in range(n): alz -= 2 if alz > 0: al[j] += 2 break alz += al[j] + 1 al[j] = 1 if alz == 2 * d + 1: break d += 1 Qv[d - 1] = Vol * Qs Q[d - 1] = 0 p = 2.0 / np.prod(np.arange(n + 1, m + 1) * 2.0) for i in range(1, d + 1): Q[d - 1] += ((m + 2 - 2 * i) ** (2 * d - 1)) * p * Qv[d - i] p = -p * (m + 1 - i) / i if d > s or (d > 1 and abs(Q[d - 1] - Q[d - 2]) < tol * abs(Q[d - 2])): return Q[:d], nv return Q, nv
[docs] def pfqn_cub( L: np.ndarray, N: np.ndarray, Z: Optional[np.ndarray] = None, order: Optional[int] = None, atol: float = 1e-8, ) -> Tuple[float, float]: """ Cubature method for normalizing constant using Grundmann-Moeller rules. Uses numerical integration over simplices to compute the normalizing constant exactly (for sufficient order) or approximately. Args: L: Service demand matrix (M x R). N: Population vector (R,). Z: Think time vector (R,). Optional. order: Degree of cubature rule (default: ceil((sum(N)-1)/2)). atol: Absolute tolerance (default: 1e-8). Returns: Tuple of (Gn, lGn): Gn: Estimated normalizing constant. lGn: Logarithm of normalizing constant. Reference: G. Casale. "Accelerating performance inference over closed systems by asymptotic methods." ACM SIGMETRICS 2017. """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape # Handle empty or trivial cases if L.size == 0 or N.size == 0 or np.sum(N) == 0: return 1.0, 0.0 if order is None: order = int(np.ceil((np.sum(N) - 1) / 2)) if Z is None or np.sum(Z) < atol: # Case without think time Nt = np.sum(N) beta = N / Nt V = np.eye(M - 1, M) # Simplex vertices def f(x): # Complete the simplex point x_full = np.append(x, 1 - np.sum(x)) Lx = x_full @ L h = beta * np.log(np.maximum(Lx, 1e-100)) return np.prod(np.exp(Nt * h)) Q, _ = _grnmol(f, V, order, atol) if len(Q) > 0: Gn = Q[-1] * np.exp( gammaln(1 + np.sum(N) + M - 1) - np.sum(gammaln(1 + N)) ) else: Gn = 1.0 else: # Case with think time - numerical integration over v Z = np.asarray(Z, dtype=float).ravel() steps = 1000 Nt = np.sum(N) beta = N / Nt Gn = 0.0 vmax = Nt * 10 dv = vmax / steps V = np.eye(M - 1, M) for v_val in np.arange(0, vmax + dv, dv): Lv = L * v_val + np.tile(Z, (M, 1)) def f(x): x_full = np.append(x, 1 - np.sum(x)) Lx = x_full @ Lv h = beta * np.log(np.maximum(Lx, 1e-100)) return np.exp(np.sum(Nt * h)) Q, _ = _grnmol(f, V, order, atol) if len(Q) > 0: dG = np.exp(-v_val) * (v_val ** (M - 1)) * Q[-1] * dv Gn += dG if v_val > 0 and Gn > 0 and dG / Gn < atol: break Gn *= np.exp(-np.sum(_factln(N))) lGn = np.log(max(Gn, 1e-300)) return float(Gn), float(lGn)
def _logmeanexp(x: np.ndarray) -> float: """ Compute log(mean(exp(x))) in a numerically stable way. Uses the log-sum-exp trick to avoid overflow/underflow. """ x = np.asarray(x, dtype=float).ravel() if len(x) == 0: return -np.inf x_max = np.max(x) if np.isinf(x_max): return x_max return x_max + np.log(np.mean(np.exp(x - x_max)))
[docs] def pfqn_mci( D: np.ndarray, N: np.ndarray, Z: Optional[np.ndarray] = None, I: int = 100000, variant: str = 'imci' ) -> Tuple[float, float, np.ndarray]: """ Monte Carlo Integration (MCI) for normalizing constant estimation. Provides a Monte Carlo estimate of the normalizing constant for closed product-form queueing networks. Args: D: Service demand matrix (M x R). N: Population vector (R,). Z: Think time vector (R,). Optional, defaults to zeros. I: Number of samples (default: 100000). variant: MCI variant - 'mci', 'imci' (improved), or 'rm' (repairman). Default: 'imci'. Returns: Tuple of (G, lG, lZ): G: Estimated normalizing constant. lG: Logarithm of normalizing constant. lZ: Individual random sample log values. Reference: Implementation based on MonteQueue methodology. """ from .mva import pfqn_bs D = np.atleast_2d(np.asarray(D, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = D.shape if Z is None: Z = np.zeros(R) else: Z = np.asarray(Z, dtype=float).ravel() # Handle empty or trivial cases if D.size == 0 or np.sum(D) < 1e-4: lGn = -np.sum(_factln(N)) + np.sum(N * np.log(np.sum(Z))) G = np.exp(lGn) return float(G), float(lGn), np.array([]) # Compute throughput estimate using balanced system if variant.lower() == 'imci': # Improved MCI XN, _, _, _, _, _, _ = pfqn_bs(D, N, Z) tput = XN.ravel() util = D @ tput gamma = np.maximum(0.01, 1 - util) elif variant.lower() == 'mci': # Original MCI XN, _, _, _, _, _, _ = pfqn_bs(D, N, Z) tput = XN.ravel() util = D @ tput gamma = np.zeros(M) for i in range(M): if util[i] > 0.9: gamma[i] = 1.0 / np.sqrt(max(N)) else: gamma[i] = 1.0 - util[i] elif variant.lower() == 'rm': # Repairman problem tput = N / (np.sum(D, axis=0) + Z + np.max(D, axis=0) * (np.sum(N) - 1)) util = D @ tput gamma = np.zeros(M) for i in range(M): if util[i] > 0.9: gamma[i] = 1.0 / np.sqrt(max(N)) else: gamma[i] = 1.0 - util[i] else: raise ValueError(f"Unknown variant: {variant}. Use 'mci', 'imci', or 'rm'.") # Ensure gamma is positive gamma = np.maximum(gamma, 1e-6) # Compute log factorials logfact = np.array([np.sum(np.log(np.arange(1, int(N[r]) + 1))) if N[r] > 0 else 0.0 for r in range(R)]) # Uniform sampling with importance sampling VL = np.log(np.random.rand(I, M)) V = (-1.0 / gamma).reshape(1, -1) * VL ZI = np.tile(Z, (I, 1)) # Importance sampling formula # lZ = -(ones(1,M) - gamma) * V' - sum(log(gamma)) - sum(logfact) + N*log(V*D+ZI)' term1 = -np.sum((1 - gamma) * V, axis=1) # Shape: (I,) term2 = -np.sum(np.log(gamma)) term3 = -np.sum(logfact) VD_plus_ZI = V @ D + ZI # Shape: (I, R) term4 = np.sum(N * np.log(np.maximum(VD_plus_ZI, 1e-300)), axis=1) # Shape: (I,) lZ = term1 + term2 + term3 + term4 # Compute log of mean lG = _logmeanexp(lZ) if np.isinf(lG): lG = np.max(lZ) G = np.exp(lG) return float(G), float(lG), lZ
[docs] def pfqn_grnmol(L: np.ndarray, N: np.ndarray) -> Tuple[float, float]: """ Normalizing constant using Grundmann-Moeller quadrature. Computes the normalizing constant for closed product-form queueing networks using Grundmann-Moeller cubature rules on simplices. This is an exact method that uses polynomial quadrature to compute the normalizing constant integral representation. Args: L: Service demand matrix (M x R). N: Population vector (R,). Returns: Tuple of (G, lG): G: Normalizing constant. lG: Logarithm of normalizing constant. Reference: Grundmann, A. and Moller, H.M. "Invariant Integration Formulas for the N-Simplex by Combinatorial Methods", SIAM J Numer. Anal. 15 (1978), pp. 282-290. """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() M, R = L.shape # Handle trivial cases if L.size == 0 or N.size == 0 or np.sum(N) == 0: return 1.0, 0.0 G = 0.0 S = int(np.ceil((np.sum(N) - 1) / 2)) H = np.zeros(1 + S) c = np.zeros(1 + S) w = np.zeros(1 + S) for i in range(S + 1): c[i] = 2 * (S - i) + M # w(1+i) = 2^-(2*S) * (-1)^i * c(1+i)^(2*S+1) / factorial(i) / factorial(i+c(1+i)) log_w = (-(2 * S) * np.log(2) + (2 * S + 1) * np.log(max(c[i], 1e-300)) - _factln(np.array([i]))[0] - _factln(np.array([i + c[i]]))[0]) w[i] = ((-1) ** i) * np.exp(log_w) # Enumerate simplex states s_iter = S - i if s_iter < 0: continue # Generate all partitions of s_iter into M parts from scipy.special import comb num_states = int(comb(s_iter + M - 1, M - 1)) if num_states == 0: H[i] = 1.0 # Only one state: all zeros else: H[i] = 0.0 bvec = np.zeros(M, dtype=int) bvec[0] = s_iter for _ in range(num_states): # Compute term: prod((((2*bvec+1)/c(i))*L).^N) scale = (2 * bvec + 1) / c[i] Lscaled = scale.reshape(-1, 1) * L # (M x R) # Sum over stations for each class, then raise to power N Lsum = np.sum(Lscaled, axis=0) # (R,) log_term = np.sum(N * np.log(np.maximum(Lsum, 1e-300))) H[i] += np.exp(log_term) # Next partition bvec = _next_partition(bvec, s_iter) if bvec is None: break G += w[i] * H[i] # Multiply by factorial(sum(N)+M-1) / prod(factorial(N)) lG_coeff = _factln(np.array([np.sum(N) + M - 1]))[0] - np.sum(_factln(N)) G = G * np.exp(lG_coeff) lG = np.log(max(abs(G), 1e-300)) if G < 0: G = 0.0 lG = -np.inf return float(G), float(lG)
def _next_partition(bvec: np.ndarray, total: int) -> Optional[np.ndarray]: """ Generate next partition of total into M non-negative integers. Uses reverse lexicographic order. Args: bvec: Current partition total: Sum constraint Returns: Next partition, or None if exhausted """ M = len(bvec) if M <= 1: return None # Find rightmost position that can be decremented for i in range(M - 2, -1, -1): if bvec[i] > 0: bvec[i] -= 1 # Compute remaining remaining = total - np.sum(bvec[:i+1]) # Put remaining in position i+1 bvec[i+1] = remaining # Zero out positions after i+1 for j in range(i + 2, M): bvec[j] = 0 return bvec return None
[docs] def pfqn_le_fpi(L: np.ndarray, N: np.ndarray) -> np.ndarray: """ Fixed-point iteration to find mode location (no think time). Public wrapper for the internal _pfqn_le_fpi function. Args: L: Service demand matrix (M x R). N: Population vector (R,). Returns: Mode location vector u (M,). """ return _pfqn_le_fpi(L, N)
[docs] def pfqn_le_fpiZ( L: np.ndarray, N: np.ndarray, Z: np.ndarray ) -> Tuple[np.ndarray, float]: """ Fixed-point iteration to find mode location (with think time). Public wrapper for the internal _pfqn_le_fpiZ function. Args: L: Service demand matrix (M x R). N: Population vector (R,). Z: Think time vector (R,). Returns: Tuple of (u, v): u: Mode location vector (M,). v: Scale factor. """ return _pfqn_le_fpiZ(L, N, Z)
[docs] def pfqn_le_hessian(L: np.ndarray, N: np.ndarray, u: np.ndarray) -> np.ndarray: """ Compute Hessian matrix (no think time case). Public wrapper for the internal _pfqn_le_hessian function. Args: L: Service demand matrix (M x R). N: Population vector (R,). u: Mode location vector (M,). Returns: Hessian matrix (M-1 x M-1). """ return _pfqn_le_hessian(L, N, u)
[docs] def pfqn_le_hessianZ( L: np.ndarray, N: np.ndarray, Z: np.ndarray, u: np.ndarray, v: float ) -> np.ndarray: """ Compute Hessian matrix (with think time case). Public wrapper for the internal _pfqn_le_hessianZ function. Args: L: Service demand matrix (M x R). N: Population vector (R,). Z: Think time vector (R,). u: Mode location vector (M,). v: Scale factor. Returns: Hessian matrix (M x M). """ return _pfqn_le_hessianZ(L, N, Z, u, v)
__all__ = [ 'pfqn_le', 'pfqn_cub', 'pfqn_mci', 'pfqn_grnmol', 'pfqn_le_fpi', 'pfqn_le_fpiZ', 'pfqn_le_hessian', 'pfqn_le_hessianZ', ]