"""
PFQN Utility Functions.
Native Python implementations of utility functions for Product-Form
Queueing Network analysis, including load-dependent scaling, multiserver
scaling, and normalizing constant sanitization.
Key functions:
pfqn_lldfun: Load and queue-dependent scaling function
pfqn_mu_ms: Multi-server load-dependent service rate
pfqn_nc_sanitize: Sanitize and preprocess NC parameters
pfqn_cdfun: Class-dependence function
multichoose: Combinations with repetition
matchrow: Find row index in matrix
oner: Increment vector at position
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_*.m
"""
import numpy as np
from scipy.interpolate import interp1d
from scipy.special import gammaln
from typing import Tuple, Optional, List
from itertools import combinations_with_replacement
[docs]
def factln(n: float) -> float:
"""
Compute log(n!) using log-gamma function.
Args:
n: Non-negative number
Returns:
log(n!) = log(Gamma(n+1))
"""
if n <= 0:
return 0.0
return gammaln(n + 1)
[docs]
def factln_vec(arr: np.ndarray) -> np.ndarray:
"""
Compute log(n!) element-wise for an array.
Args:
arr: Array of non-negative numbers
Returns:
Array of log(n!) values
"""
arr = np.asarray(arr, dtype=float)
result = np.zeros_like(arr)
mask = arr > 0
result[mask] = gammaln(arr[mask] + 1)
return result
[docs]
def softmin(a: float, b: float, alpha: float = 20.0) -> float:
"""
Compute a smooth approximation to min(a, b).
Uses the log-sum-exp trick for numerical stability.
Args:
a: First value
b: Second value
alpha: Smoothing parameter (larger = sharper approximation)
Returns:
Smooth approximation of min(a, b)
"""
if alpha <= 0:
return min(a, b)
# Use log-sum-exp for numerical stability
max_val = max(-alpha * a, -alpha * b)
return -1.0 / alpha * (max_val + np.log(np.exp(-alpha * a - max_val) +
np.exp(-alpha * b - max_val)))
[docs]
def oner(n: np.ndarray, s: int) -> np.ndarray:
"""
Return a copy of n with position s reduced by 1.
Args:
n: Population vector
s: Index to decrement (0-based)
Returns:
Copy of n with n[s] -= 1
"""
n_copy = np.asarray(n, dtype=float).copy()
if 0 <= s < len(n_copy):
n_copy[s] = max(0, n_copy[s] - 1)
return n_copy
[docs]
def multichoose(r: int, n: int) -> np.ndarray:
"""
Generate all combinations with repetition.
Returns all ways to choose n items from r categories with repetition,
where the result is a matrix with each row being a combination.
Args:
r: Number of categories
n: Number of items to choose
Returns:
Matrix (C x r) where C = C(n+r-1, r-1) is the number of combinations
"""
if r <= 0 or n < 0:
return np.array([[]])
if n == 0:
return np.zeros((1, r), dtype=int)
# Generate all combinations with repetition
# Each combination represents how many of each category
result = []
def generate_combinations(remaining: int, categories_left: int, current: List[int]):
if categories_left == 1:
result.append(current + [remaining])
return
for i in range(remaining + 1):
generate_combinations(remaining - i, categories_left - 1, current + [i])
generate_combinations(n, r, [])
return np.array(result, dtype=int)
[docs]
def matchrow(matrix: np.ndarray, row: np.ndarray) -> int:
"""
Find the index of a row in a matrix.
Args:
matrix: 2D array to search in
row: 1D array to find
Returns:
1-based index of the matching row, or 0 if not found
"""
matrix = np.atleast_2d(np.asarray(matrix))
row = np.asarray(row).flatten()
for i in range(matrix.shape[0]):
if np.array_equal(matrix[i, :len(row)], row):
return i + 1 # 1-based indexing like MATLAB
return 0
[docs]
def pfqn_lldfun(n: np.ndarray, lldscaling: Optional[np.ndarray] = None,
nservers: Optional[np.ndarray] = None) -> np.ndarray:
"""
AMVA-QD load and queue-dependent scaling function.
Computes the scaling factor for load-dependent queueing stations,
accounting for multi-server stations and general load-dependent
service rate scaling.
Args:
n: Queue population vector (M,)
lldscaling: Load-dependent scaling matrix (M x Nmax), optional
nservers: Number of servers per station (M,), optional
Returns:
Scaling factor vector (M,)
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_lldfun.m
"""
n = np.asarray(n, dtype=float).flatten()
M = len(n)
r = np.ones(M)
alpha = 20.0 # softmin parameter
if lldscaling is not None:
lldscaling = np.atleast_2d(np.asarray(lldscaling, dtype=float))
smax = lldscaling.shape[1]
else:
smax = 0
for i in range(M):
# Handle servers
if nservers is not None:
nservers_arr = np.asarray(nservers, dtype=float).flatten()
if i < len(nservers_arr):
if np.isinf(nservers_arr[i]):
# Delay server
r[i] = 1.0
else:
# Multi-server station
sm = softmin(n[i], nservers_arr[i], alpha)
if np.isnan(sm) or sm <= 0:
# Fallback if numerical problems in soft-min
sm = min(n[i], nservers_arr[i])
if sm <= 0:
sm = 1.0
r[i] = r[i] / sm
# Handle generic load-dependent scaling
if lldscaling is not None and smax > 0:
if i < lldscaling.shape[0]:
row = lldscaling[i, :smax]
# Check if there's variation in the scaling
if np.ptp(row) > 0: # ptp = max - min = range
# Interpolate the scaling factor
x_vals = np.arange(1, smax + 1)
try:
interp_func = interp1d(x_vals, row, kind='cubic',
fill_value='extrapolate')
scale_val = float(interp_func(n[i]))
if scale_val > 0 and np.isfinite(scale_val):
r[i] = r[i] / scale_val
except Exception:
# Fallback to linear interpolation
scale_val = np.interp(n[i], x_vals, row)
if scale_val > 0 and np.isfinite(scale_val):
r[i] = r[i] / scale_val
return r
[docs]
def pfqn_mu_ms(N: int, m: int, c: int) -> np.ndarray:
"""
Compute load-dependent rates for m identical c-server FCFS stations.
Calculates the effective service rate as a function of the number
of jobs in the system for a network of m identical stations, each
with c parallel servers.
Args:
N: Maximum population
m: Number of identical stations
c: Number of servers per station
Returns:
Load-dependent service rate vector (1 x N)
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_mu_ms.m
"""
if N <= 0:
return np.array([])
mu = np.zeros(N)
g = np.zeros((m, N + 1)) # g[station, population]
# Auxiliary function to compute g values
def gnaux(n: int, station: int, c: int, g: np.ndarray) -> float:
if n == 0:
return 1.0
elif station == 0: # First station (0-indexed)
# Product of min(1:n, c)
vals = np.minimum(np.arange(1, n + 1), c)
return 1.0 / np.prod(vals)
else:
result = 0.0
for k in range(n + 1):
if k == 0:
a = 1.0
else:
vals = np.minimum(np.arange(1, k + 1), c)
a = 1.0 / np.prod(vals)
b = g[station - 1, n - k]
result += a * b
return result
# Fill in the g table
for n in range(N + 1):
for station in range(m):
g[station, n] = gnaux(n, station, c, g)
# Compute mu from g
for n in range(1, N + 1):
if g[m - 1, n] > 0:
mu[n - 1] = g[m - 1, n - 1] / g[m - 1, n]
else:
mu[n - 1] = 0.0
return mu
[docs]
def pfqn_nc_sanitize(lam: np.ndarray, L: np.ndarray, N: np.ndarray,
Z: np.ndarray, atol: float = 1e-8
) -> Tuple[np.ndarray, np.ndarray, np.ndarray,
np.ndarray, float]:
"""
Sanitize and preprocess network parameters for NC solvers.
Removes empty/ill-defined classes, rescales demands for numerical
stability, and reorders classes by think time.
Args:
lam: Arrival rate vector
L: Service demand matrix (M x R)
N: Population vector
Z: Think time vector
atol: Absolute tolerance for numerical comparisons
Returns:
Tuple of (lambda, L, N, Z, lGremaind) where:
- lambda: Sanitized arrival rates
- L: Sanitized service demands (rescaled)
- N: Sanitized populations
- Z: Sanitized think times (rescaled)
- lGremaind: Log normalization factor from removed classes
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_nc_sanitize.m
"""
lam = np.asarray(lam, dtype=float).flatten()
L = np.atleast_2d(np.asarray(L, dtype=float))
N = np.asarray(N, dtype=float).flatten()
Z = np.asarray(Z, dtype=float).flatten()
# Replace NaN with 0
L = np.nan_to_num(L, nan=0.0)
Z = np.nan_to_num(Z, nan=0.0)
# Erase empty classes (N == 0)
nnzclasses = np.where(N > 0)[0]
if len(nnzclasses) == 0:
return lam, L, N, Z, 0.0
L = L[:, nnzclasses]
N = N[nnzclasses]
Z = Z[nnzclasses] if len(Z) > 0 else np.zeros(len(nnzclasses))
lam = lam[nnzclasses] if len(lam) > 0 else np.zeros(len(nnzclasses))
# Erase ill-defined classes (sum of L and Z < atol)
class_totals = np.sum(L, axis=0) + Z
valid_classes = np.where(class_totals >= atol)[0]
if len(valid_classes) < L.shape[1]:
L = L[:, valid_classes]
N = N[valid_classes]
Z = Z[valid_classes]
lam = lam[valid_classes]
lGremaind = 0.0
# Find zero demand classes (all stations have demand < atol)
R = L.shape[1]
zerodemands = []
for r in range(R):
if np.max(L[:, r]) < atol:
zerodemands.append(r)
if len(zerodemands) > 0:
zerodemands = np.array(zerodemands)
# Contribution from zero-demand classes
for r in zerodemands:
if Z[r] > 0:
lGremaind += N[r] * np.log(Z[r]) - factln(N[r])
# Remove zero-demand classes
keep = np.setdiff1d(np.arange(R), zerodemands)
if len(keep) > 0:
L = L[:, keep]
Z = Z[keep]
N = N[keep]
lam = lam[keep] if len(lam) > 0 else np.array([])
else:
return lam, np.zeros((L.shape[0], 0)), np.array([]), np.array([]), lGremaind
if L.shape[1] == 0:
return lam, L, N, Z, lGremaind
# Rescale demands for numerical stability
Lmax = np.max(L, axis=0)
Lmax[Lmax < atol] = 1.0 # Avoid division by zero
L = L / Lmax
Z = Z / Lmax
lGremaind += np.dot(N, np.log(Lmax))
# Sort from smallest to largest think time
if len(Z) > 0 and np.sum(Z) > 0:
rsort = np.argsort(Z)
L = L[:, rsort]
Z = Z[rsort]
N = N[rsort]
if len(lam) > 0:
lam = lam[rsort]
# Ensure zero think time classes are first
zerothinktimes = np.where(Z < atol)[0]
nonzerothinktimes = np.setdiff1d(np.arange(L.shape[1]), zerothinktimes)
order = np.concatenate([zerothinktimes, nonzerothinktimes])
L = L[:, order]
N = N[order]
Z = Z[order]
if len(lam) > 0:
lam = lam[order]
return lam, L, N, Z, lGremaind
[docs]
def pfqn_cdfun(nvec: np.ndarray, cdscaling: Optional[List] = None) -> np.ndarray:
"""
AMVA-QD class-dependence function for queue-dependent scaling.
Computes the scaling factor at each station based on the class-dependent
population state.
Args:
nvec: Population state matrix (M x R) or vector (M,)
cdscaling: List of class-dependent scaling functions, one per station.
Each function takes a class population vector and returns a scalar.
Returns:
Scaling factor vector (M,)
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_cdfun.m
"""
nvec = np.atleast_2d(np.asarray(nvec, dtype=float))
M = nvec.shape[0]
r = np.ones(M)
if cdscaling is not None and len(cdscaling) > 0:
for i in range(M):
if i < len(cdscaling) and cdscaling[i] is not None:
try:
scale_val = cdscaling[i](nvec[i, :])
if scale_val > 0 and np.isfinite(scale_val):
r[i] = 1.0 / scale_val
except Exception:
pass
return r
[docs]
def pfqn_ljdfun(n: np.ndarray, ljdscaling: Optional[np.ndarray] = None) -> np.ndarray:
"""
AMVA-QD limited joint-dependence function.
Computes scaling factors for stations with limited joint-dependence,
where the service rate depends on the total population across stations.
Args:
n: Queue population vector (M,)
ljdscaling: Limited joint-dependence scaling matrix (M x Nmax), optional
Returns:
Scaling factor vector (M,)
References:
Original MATLAB: matlab/src/api/pfqn/pfqn_ljdfun.m
"""
n = np.asarray(n, dtype=float).flatten()
M = len(n)
r = np.ones(M)
if ljdscaling is None:
return r
ljdscaling = np.atleast_2d(np.asarray(ljdscaling, dtype=float))
smax = ljdscaling.shape[1]
# Total population
nt = int(np.sum(n))
for i in range(M):
if i < ljdscaling.shape[0] and nt > 0:
# Get the scaling value at total population
idx = min(nt, smax) - 1
if idx >= 0 and ljdscaling[i, idx] > 0:
r[i] = 1.0 / ljdscaling[i, idx]
return r
__all__ = [
'factln',
'factln_vec',
'softmin',
'oner',
'multichoose',
'matchrow',
'pfqn_lldfun',
'pfqn_mu_ms',
'pfqn_nc_sanitize',
'pfqn_cdfun',
'pfqn_ljdfun',
]