Source code for line_solver.api.pfqn.kt

"""
Knessl-Tier Asymptotic Expansion for Normalizing Constant.

Native Python implementation of the Knessl-Tier asymptotic expansion
method for computing normalizing constants of product-form queueing networks.

Key functions:
    pfqn_kt: Knessl-Tier asymptotic expansion

References:
    Original MATLAB: matlab/src/api/pfqn/pfqn_kt.m
    Knessl and Tier, "Asymptotic Expansions for Large Closed Queueing Networks"
"""

import numpy as np
from typing import Tuple, Optional

from .mva import pfqn_bs, pfqn_aql


# Small tolerance constant
FINE_TOL = 1e-14


[docs] def pfqn_kt(L: np.ndarray, N: np.ndarray, Z: Optional[np.ndarray] = None ) -> Tuple[float, float, np.ndarray, np.ndarray]: """ Knessl-Tier asymptotic expansion for normalizing constant. Computes the normalizing constant using Knessl-Tier's asymptotic expansion, which is particularly accurate for large populations. Args: L: Service demand matrix (M x R) N: Population vector (R,) Z: Think time vector (R,), optional (default: zeros) Returns: Tuple of (G, lG, X, Q): G: Normalizing constant lG: Logarithm of normalizing constant X: System throughput (R,) Q: Mean queue lengths (M, R) References: Original MATLAB: matlab/src/api/pfqn/pfqn_kt.m """ if L is None or len(L) == 0 or N is None or len(N) == 0 or np.sum(N) == 0: return 1.0, 0.0, np.array([]), np.array([[]]) L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).flatten() if Z is None: Z = np.zeros(len(N)) else: Z = np.asarray(Z, dtype=float).flatten() Morig, Rorig = L.shape # Handle self-looping customers (they would yield Uk=1) slcdemandfactor = 0.0 if Rorig > 1: isslc = np.zeros(Rorig, dtype=bool) for r in range(Rorig): if np.count_nonzero(L[:, r]) == 1 and Z[r] == 0: ist = np.where(L[:, r] > 0)[0][0] # Replicate station for each job new_rows = np.tile(L[ist, :].reshape(1, -1), (int(N[r]), 1)) L = np.vstack([L, new_rows]) isslc[r] = True slcdemandfactor = N[r] * np.log(L[ist, r]) # Remove self-looping classes keep_classes = ~isslc L = L[:, keep_classes] Z = Z[keep_classes] N = N[keep_classes] M, R = L.shape Ntot = int(np.sum(N)) if Ntot == 0: return 1.0, 0.0, np.zeros(R), np.zeros((M, R)) beta = np.zeros(R) for r in range(R): beta[r] = N[r] / Ntot if Ntot > 0 else 0 # Get throughput estimate if Ntot <= 4: X, Q = pfqn_bs(L, N, Z) else: X, Q = pfqn_aql(L, N, Z) X = np.asarray(X).flatten() Q = np.atleast_2d(np.asarray(Q)) # Compute covariance matrix C delta = np.eye(R) C = np.zeros((R, R)) for s in range(R): for r in range(R): SK = 0.0 for k in range(M): denom = 1 - np.sum(X * L[k, :]) denom = max(FINE_TOL, denom) ** 2 SK += X[s] * X[r] * L[k, s] * L[k, r] / denom C[s, r] = delta[s, r] * beta[s] + (1.0 / Ntot) * SK # Compute denominator product Den = 1.0 for k in range(M): factor = 1 - np.sum(X * L[k, :]) Den *= max(FINE_TOL, factor) # Compute log normalizing constant det_C = np.linalg.det(C) det_C = max(FINE_TOL, abs(det_C)) # Avoid log of zero for throughput X_safe = np.maximum(X, FINE_TOL) lG = (np.log((2 * np.pi) ** (-R / 2) / np.sqrt(Ntot ** R * det_C)) + (-Ntot * np.dot(beta, np.log(X_safe))) - np.log(max(FINE_TOL, abs(Den))) + slcdemandfactor) G = np.exp(lG) return G, lG, X, Q
__all__ = ['pfqn_kt']