"""
Quadrature Methods for Normalizing Constant Computation.
Native Python implementations of numerical integration methods for
computing normalizing constants in product-form queueing networks.
Key functions:
pfqn_mmint2: McKenna-Mitra integral using scipy.integrate
pfqn_mmint2_gausslegendre: Gauss-Legendre quadrature
pfqn_mmint2_gausslaguerre: Gauss-Laguerre quadrature
pfqn_mmsample2: Monte Carlo sampling approximation
References:
McKenna, J., and Mitra, D. "Integral representations and asymptotic
expansions for closed Markovian queueing networks: Normal usage."
Bell System Technical Journal 61.5 (1982): 661-683.
Original MATLAB: matlab/src/api/pfqn/pfqn_mmint2*.m
"""
import numpy as np
from scipy import integrate
from scipy.special import gammaln
from typing import Tuple, Optional
from numpy.polynomial.legendre import leggauss
from numpy.polynomial.laguerre import laggauss
def factln(n: float) -> float:
"""Compute log(n!) using log-gamma function."""
if n <= 0:
return 0.0
return gammaln(n + 1)
def factln_vec(arr: np.ndarray) -> np.ndarray:
"""Compute log(n!) element-wise for an array."""
arr = np.asarray(arr, dtype=float)
result = np.zeros_like(arr)
mask = arr > 0
result[mask] = gammaln(arr[mask] + 1)
return result
[docs]
def logsumexp(x: np.ndarray) -> float:
"""
Compute log(sum(exp(x))) in a numerically stable way.
Args:
x: Array of values
Returns:
log(sum(exp(x)))
"""
x = np.asarray(x).flatten()
x_max = np.max(x)
if not np.isfinite(x_max):
return float('-inf')
return x_max + np.log(np.sum(np.exp(x - x_max)))
[docs]
def pfqn_mmint2(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
m: int = 1) -> Tuple[float, float]:
"""
McKenna-Mitra integral form using scipy.integrate.
Computes the normalizing constant using numerical integration of the
McKenna-Mitra integral representation.
Args:
L: Service demand vector (R,)
N: Population vector (R,)
Z: Think time vector (R,)
m: Replication factor (default: 1)
Returns:
Tuple of (G, lG):
- G: Normalizing constant
- lG: Logarithm of normalizing constant
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_mmint2.m
"""
L = np.asarray(L, dtype=float).flatten()
N = np.asarray(N, dtype=float).flatten()
Z = np.asarray(Z, dtype=float).flatten()
# Find non-zero classes
nnzClasses = np.where(N > 0)[0]
if len(nnzClasses) == 0:
return 1.0, 0.0
order = 12
def f(u):
"""Integrand function."""
terms = Z[nnzClasses] + L[nnzClasses] * u
terms = np.maximum(terms, 1e-300) # Avoid log(0)
return np.exp(-u) * np.prod(np.power(terms, N[nnzClasses]))
# Cutoff for exponential term (99.999...% percentile)
p = 1 - 10**(-order)
exp1prctile = -np.log(1 - p)
# Perform integration
try:
result, _ = integrate.quad(f, 0, exp1prctile, limit=500)
lG = np.log(max(result, 1e-300)) - np.sum(factln_vec(N))
except Exception:
# Fallback to simple quadrature
result = 0.0
n_points = 1000
x = np.linspace(0, exp1prctile, n_points)
for xi in x:
result += f(xi)
result *= exp1prctile / n_points
lG = np.log(max(result, 1e-300)) - np.sum(factln_vec(N))
G = np.exp(lG) if lG > -700 else 0.0
return G, lG
[docs]
def pfqn_mmint2_gausslegendre(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
m: int = 1) -> Tuple[float, float]:
"""
McKenna-Mitra integral with Gauss-Legendre quadrature.
Uses Gauss-Legendre quadrature for improved accuracy in computing
the McKenna-Mitra integral.
Args:
L: Service demand vector (R,)
N: Population vector (R,)
Z: Think time vector (R,)
m: Replication factor (default: 1)
Returns:
Tuple of (G, lG):
- G: Normalizing constant
- lG: Logarithm of normalizing constant
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_mmint2_gausslegendre.m
"""
L = np.asarray(L, dtype=float).flatten()
N = np.asarray(N, dtype=float).flatten()
Z = np.asarray(Z, dtype=float).flatten()
# Number of quadrature points (use at least 300)
n = max(300, min(500, 2 * int(np.sum(N) + m - 1) - 1))
# Get Gauss-Legendre nodes and weights on [-1, 1]
nodes, weights = leggauss(n)
# Transform to [0, upper_limit]
# For repairman integrals, we use a finite upper limit
# The integrand decays exponentially, so we use a large but finite bound
upper_limit = 20 + np.sum(N) # Adaptive upper limit
# Transform nodes and weights: x = (u + 1) * upper_limit / 2
x = (nodes + 1) * upper_limit / 2
w = weights * upper_limit / 2
# Compute integrand at each node
y = np.zeros(n)
for i in range(n):
terms = Z + L * x[i]
terms = np.maximum(terms, 1e-300)
y[i] = np.dot(N, np.log(terms))
# Compute log of integrand
g = np.log(w) - x + y + (m - 1) * np.log(np.maximum(x, 1e-300))
# Compute coefficient
coeff = -np.sum(factln_vec(N)) - factln(m - 1)
# Use logsumexp for numerical stability
lG = logsumexp(g) + coeff
G = np.exp(lG) if np.isfinite(lG) and lG > -700 else 0.0
return G, lG
[docs]
def pfqn_mmint2_gausslaguerre(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
m: int = 1) -> Tuple[float, float]:
"""
McKenna-Mitra integral with Gauss-Laguerre quadrature.
Uses Gauss-Laguerre quadrature which is naturally suited for
integrals with exponential decay.
Args:
L: Service demand vector (R,)
N: Population vector (R,)
Z: Think time vector (R,)
m: Replication factor (default: 1)
Returns:
Tuple of (G, lG):
- G: Normalizing constant
- lG: Logarithm of normalizing constant
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_mmint2_gausslaguerre.m
"""
L = np.asarray(L, dtype=float).flatten()
N = np.asarray(N, dtype=float).flatten()
Z = np.asarray(Z, dtype=float).flatten()
# Find non-zero classes
nonzeroClasses = np.where(N > 0)[0]
if len(nonzeroClasses) == 0:
return 1.0, 0.0
# Number of quadrature points
n = min(300, 2 * int(np.sum(N)) + 1)
n = max(n, 50) # At least 50 points
# Get Gauss-Laguerre nodes and weights
# For Laguerre polynomials, the integral is exp(-x) * f(x) from 0 to inf
x, w = laggauss(n)
# Compute integrand values
F = np.zeros(n)
for i in range(n):
terms = Z[nonzeroClasses] + L[nonzeroClasses] * x[i]
terms = np.maximum(terms, 1e-300)
F[i] = (m - 1) * np.log(max(x[i], 1e-300)) + np.dot(N[nonzeroClasses], np.log(terms))
# Log of weighted integrand
# Note: Gauss-Laguerre weights include the exp(-x) factor
g = np.log(np.maximum(w, 1e-300)) + F - np.sum(factln_vec(N)) - factln(m - 1)
# Use logsumexp for numerical stability
lG = logsumexp(g)
G = np.exp(lG) if np.isfinite(lG) and lG > -700 else 0.0
return G, lG
[docs]
def pfqn_mmsample2(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
m: int = 1, n_samples: int = 10000) -> Tuple[float, float]:
"""
Monte Carlo sampling approximation for normalizing constant.
Uses Monte Carlo sampling to approximate the McKenna-Mitra integral.
Useful for very large populations where quadrature becomes expensive.
Args:
L: Service demand vector (R,)
N: Population vector (R,)
Z: Think time vector (R,)
m: Replication factor (default: 1)
n_samples: Number of Monte Carlo samples (default: 10000)
Returns:
Tuple of (G, lG):
- G: Normalizing constant (approximate)
- lG: Logarithm of normalizing constant
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_mmsample2.m
"""
L = np.asarray(L, dtype=float).flatten()
N = np.asarray(N, dtype=float).flatten()
Z = np.asarray(Z, dtype=float).flatten()
# Find non-zero classes
nonzeroClasses = np.where(N > 0)[0]
if len(nonzeroClasses) == 0:
return 1.0, 0.0
# Sample from exponential distribution (for importance sampling)
# The integral is of the form: int exp(-u) * f(u) du from 0 to inf
samples = np.random.exponential(1.0, n_samples)
# Evaluate log of integrand at each sample
log_values = np.zeros(n_samples)
for i, u in enumerate(samples):
terms = Z[nonzeroClasses] + L[nonzeroClasses] * u
terms = np.maximum(terms, 1e-300)
log_values[i] = (m - 1) * np.log(max(u, 1e-300)) + np.dot(N[nonzeroClasses], np.log(terms))
# For importance sampling with exp(-u), the weights are uniform
# (since we're already sampling from exp(-u))
# Compute coefficient
coeff = -np.sum(factln_vec(N)) - factln(m - 1)
# Use logsumexp and subtract log(n_samples) for averaging
lG = logsumexp(log_values) - np.log(n_samples) + coeff
G = np.exp(lG) if np.isfinite(lG) and lG > -700 else 0.0
return G, lG
__all__ = [
'pfqn_mmint2',
'pfqn_mmint2_gausslegendre',
'pfqn_mmint2_gausslaguerre',
'pfqn_mmsample2',
'logsumexp',
]