"""
Normalizing Constant methods for Product-Form Queueing Networks.
Native Python implementations of methods for computing normalizing constants:
- Convolution Algorithm (pfqn_ca)
- Related utility functions
References:
Buzen, J.P. "Computational algorithms for closed queueing networks with
exponential servers." Communications of the ACM 16.9 (1973): 527-531.
"""
import numpy as np
from math import log, exp, factorial, lgamma
from typing import Tuple, Dict, Any
from functools import lru_cache
# Try to import JIT-compiled kernels
try:
from .nc_jit import (
HAS_NUMBA as NC_HAS_NUMBA,
convolution_recursion_jit,
)
except ImportError:
NC_HAS_NUMBA = False
convolution_recursion_jit = None
# Threshold for using JIT (number of population states)
NC_JIT_THRESHOLD = 100
def _factln(n: float) -> float:
"""Compute log(n!) using log-gamma function."""
if n <= 0:
return 0.0
return lgamma(n + 1)
def _population_lattice_pprod(n: np.ndarray, N: np.ndarray = None) -> np.ndarray:
"""
Generate next population vector in lexicographic order.
Iterates through all population vectors from (0,0,...,0) to N.
Args:
n: Current population vector
N: Maximum population per class (for bounds)
Returns:
Next population vector, or (-1,...,-1) when exhausted
"""
R = len(n)
n_next = n.copy()
if N is None:
# Just increment
n_next[-1] += 1
return n_next
# Find rightmost position that can be incremented
for i in range(R - 1, -1, -1):
if n_next[i] < N[i]:
n_next[i] += 1
# Reset positions to the right
for j in range(i + 1, R):
n_next[j] = 0
return n_next
# All exhausted
return -np.ones(R, dtype=int)
def _hashpop(n: np.ndarray, N: np.ndarray) -> int:
"""
Compute linear index for population vector.
Maps population vector n to unique integer index in
the flattened population lattice [0, prod(N+1)).
Args:
n: Population vector
N: Maximum population per class
Returns:
Linear index
"""
R = len(n)
idx = 0
mult = 1
for i in range(R - 1, -1, -1):
idx += int(n[i]) * mult
mult *= int(N[i]) + 1
return idx
def _pfqn_pff_delay(Z: np.ndarray, n: np.ndarray) -> float:
"""
Product-form factor for delay stations (think times).
Computes contribution to normalizing constant from delay stations.
Args:
Z: Think times per class
n: Population vector
Returns:
Product-form factor
"""
R = len(n)
result = 1.0
for r in range(R):
if n[r] > 0:
# Contribution: Z[r]^n[r] / n[r]!
result *= (Z[r] ** n[r]) / factorial(int(n[r]))
return result
[docs]
def pfqn_ca(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None
) -> Tuple[float, float]:
"""
Convolution Algorithm for normalizing constant computation.
Computes the normalizing constant G(N) for a closed product-form
queueing network using Buzen's convolution algorithm.
Args:
L: Service demand matrix (M x R) where M is stations, R is classes
N: Population vector (1 x R or R,) - number of jobs per class
Z: Think time vector (1 x R or R,) - think time per class (default 0)
Returns:
Tuple (G, lG) where:
- G: Normalizing constant
- lG: log(G)
"""
L = np.asarray(L, dtype=np.float64)
N = np.asarray(N, dtype=np.float64).flatten()
N = np.ceil(N).astype(int)
R = len(N)
if L.ndim == 1:
L = L.reshape(-1, 1) if R == 1 else L.reshape(1, -1)
M = L.shape[0] # Number of stations
# Handle Z
if Z is None:
Z = np.zeros(R)
else:
Z = np.asarray(Z, dtype=np.float64).flatten()
# Special case: no stations (only delay)
if M == 0:
# G = prod_r (Z[r]^N[r] / N[r]!)
lGn = 0.0
for r in range(R):
lGn += -_factln(N[r]) # -log(N[r]!)
if Z[r] > 0 and N[r] > 0:
lGn += N[r] * log(Z[r])
Gn = exp(lGn)
return Gn, lGn
# Check for negative populations
if np.any(N < 0):
return 0.0, float('-inf')
# Check for zero population
if N.sum() == 0:
return 1.0, 0.0
# Compute total number of population vectors
product_N_plus_one = int(np.prod(N + 1))
# Use JIT kernel for large state spaces
if NC_HAS_NUMBA and product_N_plus_one > NC_JIT_THRESHOLD:
N_int = N.astype(np.int64)
return convolution_recursion_jit(L, N_int, Z, M, R)
# G[m, idx] = G_m(n) where idx = hashpop(n, N)
G = np.ones((M + 1, product_N_plus_one))
# Iterate through all population vectors
n = np.zeros(R, dtype=int) # Start at (0, 0, ..., 0)
while True:
# Check if done (n becomes all -1)
if np.all(n < 0):
break
idxn = _hashpop(n, N)
# Base case: delay station contribution
G[0, idxn] = _pfqn_pff_delay(Z, n)
# Convolution recursion: G_m(n) = G_{m-1}(n) + sum_r L[m-1,r] * G_m(n - e_r)
for m in range(1, M + 1):
G[m, idxn] = G[m - 1, idxn]
for r in range(R):
if n[r] >= 1:
n[r] -= 1
idxn_1r = _hashpop(n, N)
n[r] += 1
G[m, idxn] += L[m - 1, r] * G[m, idxn_1r]
# Next population vector
n = _population_lattice_pprod(n, N)
# Final normalizing constant
Gn = G[M, product_N_plus_one - 1]
lGn = log(Gn) if Gn > 0 else float('-inf')
return Gn, lGn
[docs]
def pfqn_nc(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None,
method: str = 'ca') -> Tuple[float, float]:
"""
Normalizing constant computation dispatcher.
Selects appropriate algorithm based on method parameter.
Args:
L: Service demand matrix (M x R)
N: Population vector (R,)
Z: Think time vector (R,) (default: zeros)
method: Algorithm to use:
- 'ca', 'exact': Convolution algorithm
- 'default': Auto-select based on problem size
- 'le': Leading eigenvalue asymptotic
- 'cub': Controllable upper bound
- 'imci': Importance sampling Monte Carlo integration
- 'panacea': Hybrid convolution/MVA
- 'propfair': Proportionally fair allocation
- 'mmint2': Gauss-Legendre quadrature
- 'gleint': Gauss-Legendre integration
- 'sampling': Monte Carlo sampling
- 'kt': Knessl-Tier expansion
- 'comom': Conditional moments
- 'rd': Reduced decomposition
- 'ls': Linearizer
Returns:
Tuple (G, lG) - normalizing constant and its log
"""
method = method.lower() if method else 'ca'
if method in ['ca', 'exact']:
return pfqn_ca(L, N, Z)
elif method == 'panacea':
return pfqn_panacea(L, N, Z)
elif method == 'propfair':
G, lG, _ = pfqn_propfair(L, N, Z)
return G, lG
elif method == 'le':
from .asymptotic import pfqn_le
result = pfqn_le(L, N, Z)
# pfqn_le returns (lG, XN) - extract lG
lG = result[0] if isinstance(result, tuple) else result
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'cub':
from .asymptotic import pfqn_cub
result = pfqn_cub(L, N, Z)
lG = result[0] if isinstance(result, tuple) else result
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'imci':
from .asymptotic import pfqn_mci
result = pfqn_mci(L, N, Z)
lG = result[0] if isinstance(result, tuple) else result
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method in ['mmint2', 'gleint']:
from .quadrature import pfqn_mmint2, pfqn_mmint2_gausslegendre
if method == 'gleint':
lG, _ = pfqn_mmint2_gausslegendre(L, N, Z)
else:
lG, _ = pfqn_mmint2(L, N, Z)
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'sampling':
from .quadrature import pfqn_mmsample2
lG, _ = pfqn_mmsample2(L, N, Z)
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'kt':
from .kt import pfqn_kt
lG, _ = pfqn_kt(L, N, Z)
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'comom':
from .comom import pfqn_comom
result = pfqn_comom(L, N, Z)
lG = result.lG if hasattr(result, 'lG') else np.nan
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'rd':
from .rd import pfqn_rd
result = pfqn_rd(L, N, Z)
lG = result.lG if hasattr(result, 'lG') else np.nan
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'ls':
from .ls import pfqn_ls
# Logistic sampling approximation
return pfqn_ls(L, N, Z)
elif method == 'nrl':
from .laplace import pfqn_nrl
lG = pfqn_nrl(L, N, Z)
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'nrp':
from .laplace import pfqn_nrp
lG = pfqn_nrp(L, N, Z)
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
elif method == 'default':
# Auto-select based on problem size
L = np.atleast_2d(np.asarray(L, dtype=float))
N = np.asarray(N, dtype=float).ravel()
M, R = L.shape
Ntot = int(np.sum(N))
# Use CA for small problems, asymptotic for large
if Ntot * R <= 500:
return pfqn_ca(L, N, Z)
else:
# Use LE for large problems
from .asymptotic import pfqn_le
result = pfqn_le(L, N, Z)
lG = result[0] if isinstance(result, tuple) else result
G = exp(lG) if np.isfinite(lG) else 0.0
return G, lG
else:
# Default to convolution algorithm
return pfqn_ca(L, N, Z)
[docs]
def pfqn_panacea(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None
) -> Tuple[float, float]:
"""
PANACEA algorithm (hybrid convolution/MVA).
Currently implemented as wrapper around convolution algorithm.
Args:
L: Service demand matrix
N: Population vector
Z: Think time vector
Returns:
Tuple (G, lG) - normalizing constant and its log
"""
return pfqn_ca(L, N, Z)
[docs]
def pfqn_propfair(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None
) -> Tuple[float, float, np.ndarray]:
"""
Proportionally Fair allocation approximation for normalizing constant.
Estimates the normalizing constant using a convex optimization program
that is asymptotically exact in models with single-server PS queues only.
This method is based on Schweitzer's approach and Walton's proportional
fairness theory for multi-class networks.
Args:
L: Service demand matrix (M x R) where M is stations, R is classes
N: Population vector (1 x R or R,) - number of jobs per class
Z: Think time vector (1 x R or R,) - think time per class (default 0)
Returns:
Tuple (G, lG, X) where:
- G: Estimated normalizing constant
- lG: log(G)
- X: Asymptotic throughputs per class (1 x R)
References:
Schweitzer, P. J. (1979). Approximate analysis of multiclass closed networks
of queues. In Proceedings of the International Conference on Stochastic
Control and Optimization.
Walton, N. (2009). Proportional fairness and its relationship with
multi-class queueing networks.
"""
from scipy.optimize import minimize
L = np.asarray(L, dtype=np.float64)
N = np.asarray(N, dtype=np.float64).flatten()
R = len(N)
if L.ndim == 1:
L = L.reshape(-1, 1) if R == 1 else L.reshape(1, -1)
M = L.shape[0] # Number of stations
if Z is None:
Z = np.zeros(R)
else:
Z = np.asarray(Z, dtype=np.float64).flatten()
FineTol = 1e-12
# Objective function: maximize sum_r (N[r] - x[r]*Z[r]) * log(x[r])
# We minimize the negative
def objective(x):
obj = 0.0
for r in range(R):
obj += (N[r] - x[r] * Z[r]) * log(abs(x[r]) + FineTol)
return -obj # Minimize negative
# Constraints: sum_r L[i,r] * x[r] <= 1 for all stations i
# And x[r] >= 0 for all r
constraints = []
# Capacity constraints
for i in range(M):
constraints.append({
'type': 'ineq',
'fun': lambda x, i=i: 1.0 - sum(L[i, r] * x[r] for r in range(R))
})
# Non-negativity constraints
for r in range(R):
constraints.append({
'type': 'ineq',
'fun': lambda x, r=r: x[r]
})
# Initial guess - balanced throughput
x0 = np.zeros(R)
for r in range(R):
D_max = L[:, r].max() if M > 0 else 0
if D_max > 0:
x0[r] = min(N[r] / (Z[r] + 1), 1.0 / D_max)
elif Z[r] > 0:
x0[r] = N[r] / Z[r]
else:
x0[r] = N[r]
# Ensure positive initial guess
x0 = np.maximum(x0, FineTol)
# Run optimization with COBYLA
result = minimize(
objective,
x0,
method='COBYLA',
constraints=constraints,
options={'maxiter': 10000, 'rhobeg': 1.0}
)
Xasy = result.x
# Compute lG
lG = 0.0
for r in range(R):
x = Xasy[r]
if x > FineTol:
lG += (N[r] - x * Z[r]) * log(1.0 / (x + FineTol))
# Factorial correction for think times
for r in range(R):
thinking = Xasy[r] * Z[r]
if thinking > 0:
lG -= _factln(thinking)
G = exp(lG) if lG > -700 else 0.0 # Avoid underflow
Xa = Xasy.reshape(1, -1)
return G, lG, Xa
[docs]
def pfqn_ls(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None,
I: int = 100000) -> Tuple[float, float]:
"""
Logistic sampling approximation for normalizing constant.
Approximates the normalizing constant using importance sampling from
a multivariate normal distribution fitted at the leading eigenvalue mode.
This method is particularly effective for large networks where
convolution becomes computationally expensive.
Args:
L: Service demand matrix (M x R)
N: Population vector (R,)
Z: Think time vector (R,) (default: zeros)
I: Number of samples for Monte Carlo integration (default: 100000)
Returns:
Tuple (G, lG) where:
G: Estimated normalizing constant
lG: log(G)
Reference:
G. Casale. "Accelerating performance inference over closed systems by
asymptotic methods." ACM SIGMETRICS 2017.
"""
L = np.atleast_2d(np.asarray(L, dtype=float))
N = np.asarray(N, dtype=float).ravel()
# Filter out zero-demand stations
Lsum = np.sum(L, axis=1)
L = L[Lsum > 1e-4, :]
M, R = L.shape
# Handle empty network
if L.size == 0 or np.sum(L) < 1e-4 or N.size == 0 or np.sum(N) == 0:
lGn = -np.sum([_factln(n) for n in N]) + np.sum(N * np.log(np.maximum(np.sum(Z) if Z is not None else 1e-300, 1e-300)))
return np.exp(lGn), lGn
if Z is None or len(Z) == 0:
Z = np.zeros(R)
else:
Z = np.asarray(Z, dtype=float).flatten()
# Find the mode using fixed-point iteration
u, converged = _pfqn_le_fpi(L, N, Z)
if not converged:
# Fall back to convolution if mode-finding fails
return pfqn_ca(L, N, Z)
Ntot = np.sum(N)
if np.sum(Z) <= 0:
# Case without think times
# Compute Hessian at the mode
A = _pfqn_le_hessian(L, N, u)
A = (A + A.T) / 2 # Ensure symmetry
try:
iA = np.linalg.inv(A)
except np.linalg.LinAlgError:
return pfqn_ca(L, N, Z)
x0 = np.log(u[:M-1] / u[M-1])
# Sample from multivariate normal
samples = np.random.multivariate_normal(x0, iA, I)
# Evaluate function at samples
T = np.zeros(I)
for i in range(I):
T[i] = _simplex_fun(samples[i, :], L, N)
# Evaluate PDF at samples
dpdf = np.zeros(I)
for i in range(I):
diff = samples[i, :] - x0
dpdf[i] = np.exp(-0.5 * diff @ np.linalg.inv(iA) @ diff) / np.sqrt((2 * np.pi) ** (M-1) * np.linalg.det(iA))
# Compute normalizing constant
valid = dpdf > 0
if np.sum(valid) == 0:
return pfqn_ca(L, N, Z)
lGn = _multinomialln(np.append(N, M-1)) + _factln(M-1) + np.log(np.mean(T[valid] / dpdf[valid]))
Gn = np.exp(lGn)
else:
# Case with think times Z > 0
u, v, converged = _pfqn_le_fpiZ(L, N, Z)
if not converged:
return pfqn_ca(L, N, Z)
# Compute Hessian
A = _pfqn_le_hessianZ(L, N, Z, u, v)
A = (A + A.T) / 2
try:
iA = np.linalg.inv(A)
except np.linalg.LinAlgError:
return pfqn_ca(L, N, Z)
x0 = np.append(np.log(u[:M-1] / u[M-1]), np.log(v))
# Sample from multivariate normal
samples = np.random.multivariate_normal(x0, iA, I)
# Evaluate function at samples
epsilon = 1e-10
eN = epsilon * np.sum(N)
eta = np.sum(N) + M * (1 + eN)
K = M
T = np.zeros(I)
for i in range(I):
x = samples[i, :]
term1 = -np.exp(x[K-1]) + K * (1 + eN) * x[M-1]
term2 = 0.0
for r in range(R):
inner = L[K-1, r] * np.exp(x[K-1]) + Z[r]
for k in range(K-1):
inner += np.exp(x[k]) * (L[k, r] * np.exp(x[K-1]) + Z[r])
term2 += N[r] * np.log(np.maximum(inner, 1e-300))
term3 = np.sum(x[:K-1])
term4 = -eta * np.log(1 + np.sum(np.exp(x[:K-1])))
T[i] = np.exp(term1 + term2 + term3 + term4)
# Evaluate PDF at samples
dpdf = np.zeros(I)
for i in range(I):
diff = samples[i, :] - x0
try:
dpdf[i] = np.exp(-0.5 * diff @ np.linalg.inv(iA) @ diff) / np.sqrt((2 * np.pi) ** len(x0) * np.linalg.det(iA))
except:
dpdf[i] = 0
valid = dpdf > 0
if np.sum(valid) == 0:
return pfqn_ca(L, N, Z)
Gn = np.exp(-np.sum([lgamma(1 + n) for n in N])) * np.mean(T[valid] / dpdf[valid])
lGn = np.log(Gn) if Gn > 0 else float('-inf')
return Gn, lGn
def _pfqn_le_fpi(L: np.ndarray, N: np.ndarray, Z: np.ndarray = None):
"""
Fixed-point iteration to find mode of Gaussian approximation.
Returns:
(u, converged): Mode vector and convergence flag
"""
M, R = L.shape
Ntot = np.sum(N)
u = np.ones(M) / M
u_prev = np.ones(M) * np.inf
for iteration in range(1000):
u_prev = u.copy()
for i in range(M):
u[i] = 1 / (Ntot + M)
for r in range(R):
denom = np.dot(u_prev, L[:, r])
if denom > 0:
u[i] += N[r] / (Ntot + M) * L[i, r] * u_prev[i] / denom
if np.linalg.norm(u - u_prev, 1) < 1e-10:
return u, True
return u, False
def _pfqn_le_fpiZ(L: np.ndarray, N: np.ndarray, Z: np.ndarray):
"""
Fixed-point iteration with think times.
Returns:
(u, v, converged): Mode vector, scale factor, and convergence flag
"""
M, R = L.shape
eta = np.sum(N) + M
u = np.ones(M) / M
v = eta + 1
for iteration in range(1000):
u_prev = u.copy()
v_prev = v
for ist in range(M):
u[ist] = 1 / eta
for r in range(R):
denom = Z[r] + v * np.dot(u_prev, L[:, r])
if denom > 0:
u[ist] += (N[r] / eta) * (Z[r] + v * L[ist, r]) * u_prev[ist] / denom
xi = np.zeros(R)
for r in range(R):
denom = Z[r] + v * np.dot(u_prev, L[:, r])
if denom > 0:
xi[r] = N[r] / denom
v = eta + 1 - np.dot(xi, Z)
if np.linalg.norm(u - u_prev, 1) + abs(v - v_prev) < 1e-10:
return u, v, True
return u, v, False
def _pfqn_le_hessian(L: np.ndarray, N: np.ndarray, u: np.ndarray):
"""
Compute Hessian of Gaussian approximation (without think times).
"""
M, R = L.shape
Ntot = np.sum(N)
hu = np.zeros((M-1, M-1))
for i in range(M-1):
for j in range(M-1):
if i != j:
hu[i, j] = -(Ntot + M) * u[i] * u[j]
for r in range(R):
denom = np.dot(u, L[:, r]) ** 2
if denom > 0:
hu[i, j] += N[r] * L[i, r] * L[j, r] * u[i] * u[j] / denom
else:
sum_other = np.sum(u) - u[i]
hu[i, j] = (Ntot + M) * u[i] * sum_other
for r in range(R):
denom = np.dot(u, L[:, r]) ** 2
L_other = np.sum(L[:, r]) - L[i, r]
if denom > 0:
hu[i, j] -= N[r] * L[i, r] * u[i] * (sum_other * L_other) / denom
return hu
def _pfqn_le_hessianZ(L: np.ndarray, N: np.ndarray, Z: np.ndarray, u: np.ndarray, v: float):
"""
Compute Hessian of Gaussian approximation (with think times).
"""
K, R = L.shape
Ntot = np.sum(N)
A = np.zeros((K, K))
csi = np.zeros(R)
for r in range(R):
denom = Z[r] + v * np.dot(u, L[:, r])
if denom > 0:
csi[r] = N[r] / denom
Lhat = np.zeros((K, R))
for k in range(K):
for r in range(R):
Lhat[k, r] = Z[r] + v * L[k, r]
eta = Ntot + K
for i in range(K):
for j in range(K):
if i != j:
A[i, j] = -eta * u[i] * u[j]
for r in range(R):
if N[r] > 0:
A[i, j] += csi[r]**2 * Lhat[i, r] * Lhat[j, r] * u[i] * u[j] / N[r]
for i in range(K):
A[i, i] = -np.sum(A[i, :]) + A[i, i]
# Reduce to (K-1) x (K-1) and add v column
A_reduced = A[:K-1, :K-1]
A_result = np.zeros((K, K))
A_result[:K-1, :K-1] = A_reduced
A_result[K-1, K-1] = 1
for r in range(R):
if N[r] > 0:
A_result[K-1, K-1] -= (csi[r]**2 / N[r]) * Z[r] * np.dot(u, L[:, r])
A_result[K-1, K-1] *= v
for i in range(K-1):
A_result[i, K-1] = 0
for r in range(R):
if N[r] > 0:
A_result[i, K-1] += v * u[i] * ((csi[r]**2 / N[r]) * Lhat[i, r] * np.dot(u, L[:, r]) - csi[r] * L[i, r])
A_result[K-1, i] = A_result[i, K-1]
return A_result
def _simplex_fun(x: np.ndarray, L: np.ndarray, N: np.ndarray) -> float:
"""
Evaluate simplex function for LS algorithm.
"""
M = len(x) + 1
v = np.zeros(M)
for i in range(M-1):
v[i] = np.exp(x[i])
v[M-1] = 1
term1 = np.sum(N * np.log(np.dot(v, L)))
term2 = np.sum(x)
term3 = -(np.sum(N) + M) * np.log(np.sum(v))
return np.exp(term1 + term2 + term3)
def _multinomialln(n: np.ndarray) -> float:
"""
Compute log of multinomial coefficient.
"""
return _factln(np.sum(n)) - np.sum([_factln(ni) for ni in n])
__all__ = [
'pfqn_ca',
'pfqn_nc',
'pfqn_panacea',
'pfqn_propfair',
'pfqn_ls',
]