"""
Load-dependent Normalizing Constant methods for Product-Form Queueing Networks.
Native Python implementations of methods for computing normalizing constants
in load-dependent queueing networks.
Key functions:
pfqn_ncld: Main dispatcher for load-dependent NC computation
pfqn_gld: Generic load-dependent NC
pfqn_gldsingle: Single-class load-dependent NC
pfqn_comomrm_ld: COMOM method for load-dependent repairman models
References:
Casale, G., et al. "LINE: A unified library for queueing network modeling."
"""
import numpy as np
from math import log, exp, factorial, lgamma
from typing import Tuple, Dict, Optional, Any
from dataclasses import dataclass
# Fine tolerance for numerical comparisons
FINE_TOL = 1e-12
ZERO = 1e-10
NEG_INF = float('-inf')
def _factln(n: float) -> float:
"""Compute log(n!) using log-gamma function."""
if n <= 0:
return 0.0
return lgamma(n + 1)
def _factln_array(arr: np.ndarray) -> np.ndarray:
"""Compute log(n!) element-wise for an array."""
from scipy.special import gammaln
arr = np.asarray(arr, dtype=float)
result = np.zeros_like(arr)
mask = arr > 0
result[mask] = gammaln(arr[mask] + 1)
return result
[docs]
@dataclass
class PfqnNcResult:
"""Result of normalizing constant computation."""
G: float
lG: float
method: str = "default"
[docs]
@dataclass
class PfqnComomrmLdResult:
"""Result of COMOM load-dependent computation."""
G: float
lG: float
prob: np.ndarray
[docs]
def pfqn_mushift(mu: np.ndarray, k: int) -> np.ndarray:
"""
Shift a load-dependent scaling vector by one position.
Used in recursive normalizing constant computations.
Args:
mu: Load-dependent scalings matrix (M x N)
k: Row index to shift
Returns:
Shifted mu matrix (M x N-1)
"""
mu = np.atleast_2d(np.asarray(mu, dtype=float))
M, N = mu.shape
if N <= 1:
return np.zeros((M, 0))
mushift = mu[:, :-1].copy()
mushift[k, :] = mu[k, 1:]
return mushift
[docs]
def pfqn_gldsingle(L: np.ndarray, N: np.ndarray, mu: np.ndarray,
options: Optional[Dict[str, Any]] = None) -> PfqnNcResult:
"""
Compute normalizing constant for single-class load-dependent model.
Auxiliary function used by pfqn_gld to compute the normalizing constant
in a single-class load-dependent model using dynamic programming.
Args:
L: Service demands at all stations (M x 1)
N: Number of jobs (scalar or 1x1 array)
mu: Load-dependent scaling factors (M x Ntot)
options: Solver options (unused, for API compatibility)
Returns:
PfqnNcResult with G (normalizing constant) and lG (log)
Raises:
RuntimeError: If multiclass model is detected
"""
L = np.atleast_2d(np.asarray(L, dtype=float))
N = np.asarray(N, dtype=float).flatten()
mu = np.atleast_2d(np.asarray(mu, dtype=float))
M = L.shape[0]
R = L.shape[1]
if R > 1:
raise RuntimeError("pfqn_gldsingle: multiclass model detected. "
"pfqn_gldsingle is for single class models.")
N_val = int(np.ceil(N[0]))
if N_val <= 0:
return PfqnNcResult(G=1.0, lG=0.0)
# Use dictionary for sparse storage with tuple keys
# g[(m, n, tm)] maps to the value
g: Dict[Tuple[int, int, int], float] = {}
# Initialize boundary conditions: g(0, n, 1) = 0 for n=1:N
for n in range(1, N_val + 1):
g[(0, n, 1)] = 0.0
for m in range(1, M + 1):
# Initialize boundary conditions: g(m, 0, tm) = 1 for tm=1:(N+1)
for tm in range(1, N_val + 2):
g[(m, 0, tm)] = 1.0
for n in range(1, N_val + 1):
for tm in range(1, N_val - n + 2):
g_prev = g.get((m - 1, n, 1), 0.0)
g_curr = g.get((m, n - 1, tm + 1), 0.0)
# Get mu value safely
mu_idx = tm - 1 # 0-indexed
if mu_idx < mu.shape[1]:
mu_val = mu[m - 1, mu_idx]
else:
mu_val = 1.0
if mu_val > 0:
g[(m, n, tm)] = g_prev + L[m - 1, 0] * g_curr / mu_val
else:
g[(m, n, tm)] = g_prev
G = g.get((M, N_val, 1), 0.0)
lG = log(G) if G > 0 else NEG_INF
return PfqnNcResult(G=G, lG=lG)
[docs]
def pfqn_gld(L: np.ndarray, N: np.ndarray, mu: np.ndarray,
options: Optional[Dict[str, Any]] = None) -> PfqnNcResult:
"""
Compute normalizing constant of a load-dependent closed queueing network.
Uses the generalized convolution algorithm for computing normalizing
constants in load-dependent closed queueing networks.
Args:
L: Service demands at all stations (M x R)
N: Number of jobs for each class (1 x R)
mu: Load-dependent scalings (M x Ntot)
options: Solver options
Returns:
PfqnNcResult with G (normalizing constant) and lG (log)
"""
from .nc import pfqn_nc
L = np.atleast_2d(np.asarray(L, dtype=float))
N = np.asarray(N, dtype=float).flatten()
mu = np.atleast_2d(np.asarray(mu, dtype=float)) if mu is not None else None
if options is None:
options = {'tol': 1e-6, 'method': 'default'}
M = L.shape[0]
R = L.shape[1]
Ntot = int(np.ceil(np.sum(N)))
# Validate dimensions
if len(N) != R:
# Dimension mismatch between L columns and N elements
# This can happen with chain aggregation - use the minimum
R = min(R, len(N))
L = L[:, :R]
# Handle single station case
if M == 1:
N_tmp = []
L_tmp = []
for i in range(R):
if L[0, i] > FINE_TOL:
N_tmp.append(N[i])
L_tmp.append(log(L[0, i]))
if len(N_tmp) == 0:
return PfqnNcResult(G=1.0, lG=0.0)
N_tmp = np.array(N_tmp)
L_tmp = np.array(L_tmp)
# Ensure mu has enough columns
if mu is not None:
if Ntot >= mu.shape[1]:
mu_row = mu[0, :].copy()
else:
mu_row = mu[0, :Ntot].copy()
else:
mu_row = np.ones(Ntot)
# Compute log of mu values
with np.errstate(divide='ignore'):
log_mu = np.log(mu_row)
log_mu = np.where(np.isfinite(log_mu), log_mu, 0.0)
lG = (_factln(np.sum(N_tmp)) - np.sum(_factln_array(N_tmp)) +
np.dot(N_tmp, L_tmp) - np.sum(log_mu[:Ntot]))
G = exp(lG) if lG > -700 else 0.0
return PfqnNcResult(G=G, lG=lG)
# Handle single-class case
if R == 1:
return pfqn_gldsingle(L, N, mu, options)
# Handle empty L
if L.size == 0 or np.sum(L) < FINE_TOL:
return PfqnNcResult(G=0.0, lG=NEG_INF)
# Initialize mu if None
if mu is None:
mu = np.ones((M, Ntot))
# Check if load-dependent
is_load_dep = False
is_inf_server = np.zeros(M, dtype=bool)
for i in range(M):
mu_row = mu[i, :Ntot] if Ntot <= mu.shape[1] else np.concatenate([mu[i, :], np.ones(Ntot - mu.shape[1])])
# Check if delay station (mu = [1, 2, 3, ...])
expected_delay = np.arange(1, Ntot + 1, dtype=float)
if len(mu_row) >= Ntot:
is_delay = np.allclose(mu_row[:Ntot], expected_delay[:Ntot], atol=FINE_TOL)
else:
is_delay = False
# Check if single server (mu = [1, 1, 1, ...])
is_single = np.allclose(mu_row, 1.0, atol=FINE_TOL)
if is_single:
is_inf_server[i] = False
elif is_delay:
is_inf_server[i] = True
else:
is_inf_server[i] = False
is_load_dep = True
# If not load-dependent, use standard NC
if not is_load_dep:
Lli = L[~is_inf_server, :] if np.any(~is_inf_server) else np.zeros((1, R))
Zli = L[is_inf_server, :] if np.any(is_inf_server) else np.zeros((1, R))
if Lli.size == 0 or Lli.shape[0] == 0:
Lli = np.zeros((1, R))
if Zli.size == 0 or Zli.shape[0] == 0:
Zli = np.zeros((1, R))
Z_sum = np.sum(Zli, axis=0)
result = pfqn_nc(Lli, N, Z_sum, method='exact')
return PfqnNcResult(G=result[0], lG=result[1])
# Handle zero population
if Ntot == 0 or (np.abs(np.max(N)) < FINE_TOL and np.abs(np.min(N)) < FINE_TOL):
return PfqnNcResult(G=1.0, lG=0.0)
# Single-class case
if R == 1:
result = pfqn_gldsingle(L, N, mu, options)
return result
# Recursive case: G_M(N) = G_{M-1}(N) + sum_r L[M-1,r]/mu[M-1,0] * G_M(N-e_r)
G = pfqn_gld(L[:-1, :], N, mu[:-1, :], options).G
for r in range(R):
if N[r] > FINE_TOL:
N_1 = N.copy()
N_1[r] -= 1
mu_shifted = pfqn_mushift(mu, M - 1)
if mu[M - 1, 0] > 0:
G += (L[M - 1, r] / mu[M - 1, 0]) * pfqn_gld(L, N_1, mu_shifted, options).G
lG = log(G) if G > 0 else NEG_INF
return PfqnNcResult(G=G, lG=lG)
[docs]
def pfqn_comomrm_ld(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
mu: np.ndarray, options: Optional[Dict[str, Any]] = None
) -> PfqnComomrmLdResult:
"""
Run the COMOM normalizing constant method on a load-dependent repairman model.
Implements the Convolution Method of Moments (COMOM) for computing normalizing
constants in load-dependent repairman queueing models.
Args:
L: Service demands at all stations (M x R)
N: Number of jobs for each class (1 x R)
Z: Think times for each class (1 x R)
mu: Load-dependent scalings (M x Ntot)
options: Solver options
Returns:
PfqnComomrmLdResult with G, lG, and marginal probabilities
"""
from .nc import pfqn_ca
L = np.atleast_2d(np.asarray(L, dtype=float)).copy()
N = np.asarray(N, dtype=float).flatten().copy()
Z = np.asarray(Z, dtype=float).flatten().copy()
mu = np.atleast_2d(np.asarray(mu, dtype=float)).copy()
if options is None:
options = {'tol': 1e-6}
atol = options.get('tol', 1e-6)
N = np.ceil(N)
M = L.shape[0]
R = L.shape[1]
Nt = int(np.sum(N))
# Sum Z across rows if 2D
if Z.ndim > 1:
Z = np.sum(Z, axis=0)
# Handle case where Z is negligible
if np.sum(Z) < ZERO:
# Identify delay stations (mu = [1, 2, 3, ...])
# A delay station has mu[j] = j+1 for ALL columns, not just the first Nt
# This prevents false positives when Nt is small
OneToMuCols = np.arange(1, mu.shape[1] + 1, dtype=float)
zset = []
non_zset = []
for i in range(M):
# Compare the FULL mu row with expected delay pattern [1, 2, 3, ..., mu_cols]
if np.linalg.norm(mu[i, :] - OneToMuCols) < atol:
zset.append(i)
else:
non_zset.append(i)
if len(zset) > 0:
Z = np.sum(L[zset, :], axis=0)
L = L[non_zset, :] if len(non_zset) > 0 else np.zeros((0, R))
mu = mu[non_zset, :] if len(non_zset) > 0 else np.zeros((0, mu.shape[1]))
M = L.shape[0]
# Handle negligible demands
if np.sum(L) < ZERO:
G, lG = pfqn_ca(L, N, Z)
prob = np.zeros(Nt + 1)
prob[Nt] = 1.0
return PfqnComomrmLdResult(G=G, lG=lG, prob=prob)
# Sanitize inputs
lG0 = 0.0
# Remove classes with zero demands and zero think times
non_zero_classes = []
for r in range(R):
if np.sum(L[:, r]) >= atol or (len(Z) > r and Z[r] >= atol):
non_zero_classes.append(r)
else:
if N[r] > 0:
# Handle zero-demand classes
pass
if len(non_zero_classes) < R:
L = L[:, non_zero_classes]
N = N[non_zero_classes]
if len(Z) > 0:
Z = Z[non_zero_classes]
R = len(non_zero_classes)
# Handle empty cases
if Z.size == 0 or np.sum(Z) < ZERO:
if L.size == 0 or np.sum(L) < ZERO:
prob = np.zeros(Nt + 1)
prob[0] = 1.0
return PfqnComomrmLdResult(G=exp(lG0), lG=lG0, prob=prob)
Z = np.zeros(R) if R > 0 else np.zeros(1)
elif L.size == 0 or np.sum(L) < ZERO:
L = np.zeros((1, R)) if R > 0 else np.zeros((1, 1))
M = L.shape[0]
if M == 0:
prob = np.zeros(Nt + 1)
prob[Nt] = 1.0
return PfqnComomrmLdResult(G=exp(lG0), lG=lG0, prob=prob)
if M != 1:
raise ValueError("pfqn_comomrm_ld: The solver accepts at most a single queueing station.")
# COMOM algorithm
h = np.zeros(Nt + 1)
h[Nt] = 1.0
scale = np.zeros(Nt)
nt = 0
for r in range(R):
# Build transition matrix Tr
Tr = np.eye(Nt + 1) * Z[r]
for i in range(Nt):
mu_idx = Nt - i - 1
if mu_idx < mu.shape[1]:
mu_val = mu[0, mu_idx]
else:
mu_val = 1.0
if mu_val > 0:
Tr[i, i + 1] = L[0, r] * (Nt - i) / mu_val
nr = 0
while nr < N[r]:
hT = Tr / (1.0 + nr)
h = hT @ h
scale[nt] = np.abs(np.sum(np.sort(h)))
h = np.abs(h)
if scale[nt] > 0:
h = h / scale[nt]
nt += 1
nr += 1
# Compute final result
with np.errstate(divide='ignore'):
log_scale = np.log(scale)
log_scale = np.where(np.isfinite(log_scale), log_scale, 0.0)
lG = lG0 + np.sum(log_scale)
G = exp(lG) if lG > -700 else 0.0
prob = h[::-1]
if G > 0:
prob = prob / G
prob = prob / np.sum(prob) if np.sum(prob) > 0 else prob
return PfqnComomrmLdResult(G=G, lG=lG, prob=prob)
[docs]
def pfqn_ncld(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
mu: np.ndarray, options: Optional[Dict[str, Any]] = None
) -> PfqnNcResult:
"""
Main method to compute normalizing constant of a load-dependent model.
Provides the main entry point for computing normalizing constants in
load-dependent queueing networks with automatic method selection and
preprocessing.
Args:
L: Service demands at all stations (M x R)
N: Number of jobs for each class (1 x R)
Z: Think times for each class (1 x R)
mu: Load-dependent scalings (M x Ntot)
options: Solver options with keys:
- method: 'default', 'exact', 'rd', 'comomld', etc.
- tol: Numerical tolerance
Returns:
PfqnNcResult with G (normalizing constant), lG (log), and method used
"""
from .nc import pfqn_ca
L = np.atleast_2d(np.asarray(L, dtype=float)).copy()
N = np.asarray(N, dtype=float).flatten().copy()
Z = np.asarray(Z, dtype=float).flatten().copy()
mu = np.atleast_2d(np.asarray(mu, dtype=float)).copy()
if options is None:
options = {'method': 'default', 'tol': 1e-6}
method = options.get('method', 'default')
tol = options.get('tol', 1e-6)
lG = np.nan
G = np.nan
Ntot = int(np.ceil(np.sum(N)))
# Ensure mu has enough columns
if Ntot > mu.shape[1]:
# Extend mu with last column values
extra_cols = Ntot - mu.shape[1]
mu_extended = np.zeros((mu.shape[0], Ntot))
mu_extended[:, :mu.shape[1]] = mu
for i in range(mu.shape[1], Ntot):
mu_extended[:, i] = mu[:, -1]
mu = mu_extended
elif Ntot < mu.shape[1]:
mu = mu[:, :Ntot]
# Remove classes with zero population
L_new = []
N_new = []
Z_new = []
for i in range(len(N)):
if np.abs(N[i]) >= FINE_TOL:
L_new.append(L[:, i])
N_new.append(N[i])
if i < len(Z):
Z_new.append(Z[i])
else:
Z_new.append(0.0)
if len(N_new) == 0:
return PfqnNcResult(G=1.0, lG=0.0, method=method)
L_new = np.column_stack(L_new) if len(L_new) > 0 else np.zeros((L.shape[0], 1))
N_new = np.array(N_new)
Z_new = np.array(Z_new)
R = len(N_new)
# Scaling for numerical stability
scalevec = np.ones(R)
for r in range(R):
max_L = np.max(L_new[:, r]) if L_new.shape[0] > 0 else 0
max_Z = Z_new[r] if r < len(Z_new) else 0
scalevec[r] = max(max_L, max_Z, FINE_TOL)
L_new = L_new / scalevec
Z_new = Z_new / scalevec
# Compute demand statistics
Lsum = np.sum(L_new, axis=1)
Lmax = np.max(L_new, axis=1)
# Filter stations with non-zero demands
dem_stations = []
for i in range(L_new.shape[0]):
with np.errstate(divide='ignore', invalid='ignore'):
ratio = Lmax[i] / Lsum[i]
if not np.isnan(ratio) and ratio > FINE_TOL:
dem_stations.append(i)
if len(dem_stations) > 0:
L_new = L_new[dem_stations, :]
mu = mu[dem_stations, :]
else:
L_new = np.zeros((0, R))
mu = np.zeros((0, Ntot))
M = L_new.shape[0]
# Check for zero demands with positive population
flag = False
for i in range(R):
L_sum_r = np.sum(L_new[:, i]) if M > 0 else 0
Z_r = Z_new[i] if i < len(Z_new) else 0
if np.abs(L_sum_r + Z_r) < FINE_TOL and N_new[i] > FINE_TOL:
flag = True
break
if flag:
print("pfqn_ncld warning: The model has no positive demands in any class.")
if Z_new.size == 0 or np.sum(Z_new) < tol:
lG = 0.0
else:
Z_sum = np.sum(Z_new)
with np.errstate(divide='ignore'):
log_Z = np.log(np.sum(Z_new))
log_scale = np.log(scalevec)
lG = (-np.sum(_factln_array(N_new)) +
np.dot(N_new, np.where(np.isfinite(log_Z), log_Z, 0.0) * np.ones(R)) +
np.dot(N_new, np.where(np.isfinite(log_scale), log_scale, 0.0)))
return PfqnNcResult(G=np.nan, lG=lG, method=method)
# Handle empty or negligible demands
if L_new.size == 0 or np.sum(L_new) < tol:
if Z_new.size == 0 or np.sum(Z_new) < tol:
lG = 0.0
else:
with np.errstate(divide='ignore'):
log_Z_sum = np.log(np.sum(Z_new, axis=0) if Z_new.ndim > 1 else Z_new)
log_scale = np.log(scalevec)
log_Z_sum = np.where(np.isfinite(log_Z_sum), log_Z_sum, 0.0)
log_scale = np.where(np.isfinite(log_scale), log_scale, 0.0)
lG = (-np.sum(_factln_array(N_new)) +
np.dot(N_new, log_Z_sum) + np.dot(N_new, log_scale))
G = exp(lG) if lG > -700 else 0.0
return PfqnNcResult(G=G, lG=lG, method=method)
# Single station with no think times
if M == 1 and (Z_new.size == 0 or np.sum(Z_new) < tol):
with np.errstate(divide='ignore'):
log_L_sum = np.log(np.sum(L_new, axis=0))
log_scale = np.log(scalevec)
log_mu = np.log(mu.flatten()[:Ntot]) if mu.size > 0 else np.zeros(Ntot)
log_L_sum = np.where(np.isfinite(log_L_sum), log_L_sum, 0.0)
log_scale = np.where(np.isfinite(log_scale), log_scale, 0.0)
log_mu = np.where(np.isfinite(log_mu), log_mu, 0.0)
lG = (_factln(np.sum(N_new)) - np.sum(_factln_array(N_new)) +
np.dot(N_new, log_L_sum) + np.dot(N_new, log_scale) - np.sum(log_mu))
G = exp(lG) if lG > -700 else 0.0
return PfqnNcResult(G=G, lG=lG, method=method)
# Separate zero-demand and nonzero-demand classes
zero_demand_classes = []
nonzero_demand_classes = []
for i in range(R):
if np.sum(L_new[:, i]) < tol:
zero_demand_classes.append(i)
else:
nonzero_demand_classes.append(i)
# Compute contribution from zero-demand classes (delay only)
lGzdem = 0.0
if len(zero_demand_classes) > 0:
Zz = Z_new[zero_demand_classes]
Nz = N_new[zero_demand_classes]
scalevecz = scalevec[zero_demand_classes]
if np.sum(Zz) >= tol:
with np.errstate(divide='ignore'):
log_Zz = np.log(Zz)
log_scalevecz = np.log(scalevecz)
log_Zz = np.where(np.isfinite(log_Zz), log_Zz, 0.0)
log_scalevecz = np.where(np.isfinite(log_scalevecz), log_scalevecz, 0.0)
lGzdem = (-np.sum(_factln_array(Nz)) +
np.dot(Nz, log_Zz) + np.dot(Nz, log_scalevecz))
# Extract nonzero demand classes
if len(nonzero_demand_classes) > 0:
L_nnz = L_new[:, nonzero_demand_classes]
N_nnz = N_new[nonzero_demand_classes]
Z_nnz = Z_new[nonzero_demand_classes]
scalevec_nnz = scalevec[nonzero_demand_classes]
else:
L_nnz = np.zeros((M, 1))
N_nnz = np.zeros(1)
Z_nnz = np.zeros(1)
scalevec_nnz = np.ones(1)
# Compute normalizing constant for nonzero demand classes
lGnnzdem = 0.0
if np.min(N_nnz) >= 0:
result = _compute_norm_const_ld(L_nnz, N_nnz, Z_nnz, mu, options)
lGnnzdem = result.lG
method = result.method
# Combine results
with np.errstate(divide='ignore'):
log_scalevec_nnz = np.log(scalevec_nnz)
log_scalevec_nnz = np.where(np.isfinite(log_scalevec_nnz), log_scalevec_nnz, 0.0)
lG = lGnnzdem + lGzdem + np.dot(N_nnz, log_scalevec_nnz)
G = exp(lG) if lG > -700 else 0.0
return PfqnNcResult(G=G, lG=lG, method=method)
def _compute_norm_const_ld(L: np.ndarray, N: np.ndarray, Z: np.ndarray,
mu: np.ndarray, options: Dict[str, Any]
) -> PfqnNcResult:
"""
Run a normalizing constant solution method on a load-dependent model.
Internal function that dispatches to the appropriate algorithm based
on the method option.
Args:
L: Service demands at all stations (M x R)
N: Number of jobs for each class (1 x R)
Z: Think times for each class (1 x R)
mu: Load-dependent scalings (M x Ntot)
options: Solver options
Returns:
PfqnNcResult with G, lG, and method used
"""
M = L.shape[0]
R = L.shape[1]
method = options.get('method', 'default')
lG = None
# Ensure N has R elements to match L's columns
N = np.atleast_1d(N).flatten()
if len(N) != R:
# Dimension mismatch - this can happen with chain aggregation
# Try to handle gracefully by falling back to exact method
if method not in ['default', 'exact']:
method = 'exact'
if method in ['default', 'exact']:
# Combine L and Z for stations with infinite servers
if np.sum(Z) < FINE_TOL:
Lz = L
muz = mu
else:
D = 1 # Z is 1D
Lz = np.vstack([L, Z.reshape(1, -1)])
# Create mu for delay stations
Ntot = mu.shape[1]
delay_mu = np.arange(1, Ntot + 1, dtype=float).reshape(1, -1)
muz = np.vstack([mu, delay_mu])
if R == 1:
result = pfqn_gldsingle(Lz, N, muz, options)
lG = result.lG
method = "exact/gld"
elif M == 1 and np.max(Z) > 0:
result = pfqn_comomrm_ld(L, N, Z, muz, options)
lG = result.lG
method = "exact/comomld"
elif M == 2 and np.max(Z) < FINE_TOL:
# For M==2 with no Z, pfqn_comomrm_ld assumes one station is a delay
# Check if exactly one station matches the delay pattern [1, 2, 3, ...]
# If mu was trimmed to small Ntot, both stations might falsely match
delay_count = 0
OneToMuCols = np.arange(1, mu.shape[1] + 1, dtype=float)
atol = options.get('tol', 1e-6)
for i in range(M):
if np.linalg.norm(mu[i, :] - OneToMuCols) < atol:
delay_count += 1
if delay_count == 1:
result = pfqn_comomrm_ld(L, N, np.zeros_like(N), mu, options)
lG = result.lG
method = "exact/comomld"
else:
# Either no delays or both stations appear as delays (e.g., small Ntot)
# Fall back to general solver
result = pfqn_gld(Lz, N, muz, options)
lG = result.lG
method = "exact/gld"
else:
result = pfqn_gld(Lz, N, muz, options)
lG = result.lG
method = "exact/gld"
elif method == 'comomld':
if M <= 1 or np.sum(Z) <= ZERO:
result = pfqn_comomrm_ld(L, N, Z, mu, options)
lG = result.lG
else:
print("pfqn_ncld warning: Load-dependent CoMoM is available only in "
"models with a delay and m identical stations.")
# Fall back to gld
if np.sum(Z) < FINE_TOL:
Lz = L
muz = mu
else:
Lz = np.vstack([L, Z.reshape(1, -1)])
Ntot = mu.shape[1]
delay_mu = np.arange(1, Ntot + 1, dtype=float).reshape(1, -1)
muz = np.vstack([mu, delay_mu])
result = pfqn_gld(Lz, N, muz, options)
lG = result.lG
method = "gld"
elif method == 'nrl':
from .laplace import pfqn_nrl
lG = pfqn_nrl(L, N, Z, alpha=mu)
method = "nrl"
elif method == 'nrp':
from .laplace import pfqn_nrp
lG = pfqn_nrp(L, N, Z, alpha=mu)
method = "nrp"
elif method == 'rd':
from .rd import pfqn_rd
result = pfqn_rd(L, N, Z, mu=mu)
lG = result[0] if isinstance(result, tuple) else result.lGN
method = "rd"
else:
# Default to exact/gld
if np.sum(Z) < FINE_TOL:
Lz = L
muz = mu
else:
Lz = np.vstack([L, Z.reshape(1, -1)])
Ntot = mu.shape[1]
delay_mu = np.arange(1, Ntot + 1, dtype=float).reshape(1, -1)
muz = np.vstack([mu, delay_mu])
result = pfqn_gld(Lz, N, muz, options)
lG = result.lG
method = "exact/gld"
G = exp(lG) if lG is not None and lG > -700 else 0.0
return PfqnNcResult(G=G, lG=lG if lG is not None else NEG_INF, method=method)
[docs]
@dataclass
class PfqnFncResult:
"""Result of functional server scaling computation."""
mu: np.ndarray
c: np.ndarray
[docs]
def pfqn_fnc(alpha: np.ndarray, c: Optional[np.ndarray] = None) -> PfqnFncResult:
"""
Compute scaling factor of a load-dependent functional server.
Used to calculate the mean queue length in load-dependent systems by
computing functional scaling factors from load-dependent service rate
parameters.
Args:
alpha: Load-dependent scalings (M x N)
c: Scaling constants (1 x M), optional. If None, auto-selected.
Returns:
PfqnFncResult with mu (functional server scalings) and c (scaling constants)
"""
alpha = np.atleast_2d(np.asarray(alpha, dtype=float))
M = alpha.shape[0]
N = alpha.shape[1]
if c is None:
# First try c = 0
c = np.zeros((1, M))
result = _pfqn_fnc_with_c(alpha, c)
if not np.all(np.isfinite(result.mu)):
# Try c = -0.5
c = np.full((1, M), -0.5)
result = _pfqn_fnc_with_c(alpha, c)
# If still not finite, search for valid c
dt = 0.0
while not np.all(np.isfinite(result.mu)):
dt += 0.05
c_scalar = -0.5 + dt
c = np.array([[c_scalar]])
result = _pfqn_fnc_with_c(alpha, c)
if c_scalar >= 2:
break
return result
else:
c = np.atleast_2d(np.asarray(c, dtype=float))
return _pfqn_fnc_with_c(alpha, c)
def _pfqn_fnc_with_c(alpha: np.ndarray, c: np.ndarray) -> PfqnFncResult:
"""
Compute functional server scalings with specified scaling constant.
Internal function that performs the actual computation of functional
server scalings.
Args:
alpha: Load-dependent scalings (M x N)
c: Scaling constants (1 x M)
Returns:
PfqnFncResult with mu and c
"""
alpha = np.atleast_2d(np.asarray(alpha, dtype=float))
c = np.atleast_2d(np.asarray(c, dtype=float)).flatten()
M = alpha.shape[0]
N = alpha.shape[1]
mu = np.zeros((M, N))
for i in range(M):
c_i = c[i] if i < len(c) else 0.0
mu[i, 0] = alpha[i, 0] / (1 + c_i)
alphanum = np.zeros((N, N))
alphaden = np.zeros((N, N))
for n in range(1, N):
alphanum[n, 0] = alpha[i, n]
alphaden[n, 0] = alpha[i, n - 1]
for k in range(1, n):
alphanum[n, k] = alphanum[n, k - 1] * alpha[i, n - k]
alphaden[n, k] = alphaden[n, k - 1] * alpha[i, n - k - 1]
for n in range(1, N):
rho = 0.0
muden = 1.0
for k in range(n):
with np.errstate(invalid='ignore'):
muden *= mu[i, k]
if muden != 0 and np.isfinite(muden):
rho += (alphanum[n, k] - alphaden[n, k]) / muden
if muden != 0 and np.isfinite(muden) and (1 - rho) != 0:
mu[i, n] = (alphanum[n, n - 1] * alpha[i, 0] / muden) / (1 - rho)
else:
mu[i, n] = np.inf
# Clean up non-finite values
for i in range(M):
for j in range(N):
if np.isnan(mu[i, j]) or np.abs(mu[i, j]) > 1e15:
mu[i, j] = np.inf
# Replace values after first inf with inf
for i in range(M):
if not np.all(np.isfinite(mu[i, :])):
replace_with_inf = False
for j in range(N):
if replace_with_inf:
mu[i, j] = np.inf
elif np.isinf(mu[i, j]):
replace_with_inf = True
return PfqnFncResult(mu=mu, c=c.reshape(1, -1))
__all__ = [
'pfqn_ncld',
'pfqn_gld',
'pfqn_gldsingle',
'pfqn_mushift',
'pfqn_comomrm_ld',
'pfqn_fnc',
'PfqnNcResult',
'PfqnComomrmLdResult',
'PfqnFncResult',
]