Source code for line_solver.api.pfqn.stdf

"""
Sojourn Time Distribution Functions for PFQN.

Native Python implementation for computing sojourn time distributions
in product-form queueing networks with multiserver FCFS nodes.

References:
    McKenna, J. "Mean Value Analysis for networks with service-time-dependent
    queue disciplines." Performance Evaluation, 1987.
"""

import numpy as np
from typing import List, Dict, Optional, Tuple
from scipy import linalg

from ..mam import (
    map_exponential, map_erlang, map_sumind, map_cdf
)
from .utils import oner, factln
from .mvald import pfqn_mvald
from .ncld import pfqn_comomrm_ld
from .schmidt import pprod


[docs] def pfqn_stdf(L: np.ndarray, N: np.ndarray, Z: np.ndarray, S: np.ndarray, fcfs_nodes: np.ndarray, rates: np.ndarray, tset: np.ndarray) -> Dict[Tuple[int, int], np.ndarray]: """ Compute sojourn time distribution for multiserver FCFS nodes. Implements McKenna's method for computing the response time distribution at multiserver FCFS stations in closed queueing networks. Args: L: Load matrix (M x R) - service demands N: Population vector (R,) - number of jobs per class Z: Think time vector (R,) or matrix (1 x R) S: Number of servers at each station (M,) fcfs_nodes: Array of FCFS station indices (1-indexed as in MATLAB) rates: Service rates matrix (M x R) tset: Time points at which to evaluate the distribution Returns: Dictionary with (station, class) tuples as keys and response time distribution arrays as values. Each array has shape (len(tset), 2) with: - column 0: CDF values - column 1: time points Examples: >>> L = np.array([[0.5, 0.3], [0.2, 0.4]]) >>> N = np.array([2, 3]) >>> Z = np.array([1.0, 1.0]) >>> S = np.array([2, 1]) >>> fcfs_nodes = np.array([0]) # 0-indexed >>> rates = np.array([[1.0, 1.0], [2.0, 2.0]]) >>> tset = np.linspace(0.1, 5.0, 50) >>> RD = pfqn_stdf(L, N, Z, S, fcfs_nodes, rates, tset) """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=int).flatten() Z = np.atleast_1d(np.asarray(Z, dtype=float)).flatten() S = np.asarray(S, dtype=int).flatten() fcfs_nodes = np.asarray(fcfs_nodes, dtype=int).flatten() rates = np.atleast_2d(np.asarray(rates, dtype=float)) tset = np.asarray(tset, dtype=float).flatten() M, R = L.shape T = len(tset) # Replace zero times with small value to avoid numerical issues FINE_TOL = 1e-10 tset = np.where(tset == 0, FINE_TOL, tset) # Build mu matrix (load-dependent service rates) mu = np.zeros((M, int(np.sum(N)))) for k in range(M): for n in range(int(np.sum(N))): mu[k, n] = min(S[k], n + 1) stability_warn_issued = False RD = {} for k in fcfs_nodes: # Check that all rates at this station are equal (product-form requirement) if np.ptp(rates[k, :]) > FINE_TOL: raise ValueError( f"FCFS station {k} has distinct service rates, the model is invalid." ) # Build h_kc(t, n) - sojourn time CDF for queue length n hMAP = {} hkc = np.zeros((T, 1 + int(np.sum(N)))) for n in range(int(np.sum(N)) + 1): if n < S[k]: # Exponential service (no queueing) D0, D1 = map_exponential(1.0 / rates[k, 0]) hMAP[n] = (D0, D1) else: # Service + waiting in queue exp_map = map_exponential(1.0 / rates[k, 0]) erlang_phases = n - S[k] + 1 erlang_mean = (n - S[k] + 1) / (S[k] * rates[k, 0]) erlang = map_erlang(erlang_mean, erlang_phases) D0, D1 = map_sumind([exp_map, erlang]) hMAP[n] = (D0, D1) # Compute CDF at all time points D0, D1 = hMAP[n] cdf_vals = map_cdf(D0, D1, tset) hkc[:, n] = cdf_vals # Compute response time distribution for each class for r in range(R): if L[k, r] > FINE_TOL: Nr = oner(N, r) # Compute normalizing constant for reduced population if M == 1: # Use COMOM for single station _, lGr = _pfqn_comomrm_ld_wrapper(L, Nr, Z, mu) is_num_stable = True else: # Use load-dependent MVA (returns 7 values) _, _, _, _, lGr, is_num_stable, _ = pfqn_mvald(L, Nr, Z, mu) lGr = lGr[-1] if hasattr(lGr, '__len__') else lGr if not is_num_stable and not stability_warn_issued: stability_warn_issued = True # Initialize result array RD[(k, r)] = np.zeros((T, 2)) RD[(k, r)][:, 1] = tset # Use recursive form for load-dependent models (faster) Hkrt = np.zeros(T) # Compute lGk for network without station k other_stations = np.array([i for i in range(M) if i != k]) if len(other_stations) > 0: if len(other_stations) == 1: _, lGk = _pfqn_comomrm_ld_wrapper( L[other_stations, :], Nr, Z, mu[other_stations, :] ) else: _, _, _, _, lGk, _, _ = pfqn_mvald( L[other_stations, :], Nr, Z, mu[other_stations, :] ) lGk = lGk[-1] if hasattr(lGk, '__len__') else lGk else: lGk = 0.0 for t_idx in range(T): gammat = mu.copy() # Adjust gamma for time-dependent service for m in range(1, int(np.sum(Nr))): if hkc[t_idx, m] > FINE_TOL: gammat[k, m - 1] = mu[k, m - 1] * hkc[t_idx, m - 1] / hkc[t_idx, m] gammak = _mushift(gammat, k) if gammak.shape[1] > 0: gammak = gammak[:, :int(np.sum(Nr)) - 1] # nvec = 0 case Hkrt[t_idx] = hkc[t_idx, 0] * np.exp(lGk) # nvec >= 1 cases for s in range(R): if Nr[s] > 0: Nr_s = oner(Nr, s) if M == 1: _, lYks_t = _pfqn_comomrm_ld_wrapper( L, Nr_s, Z, gammak ) else: _, _, _, _, lYks_t, _, _ = pfqn_mvald( L, Nr_s, Z, gammak ) lYks_t = lYks_t[-1] if hasattr(lYks_t, '__len__') else lYks_t if gammat[k, 0] > FINE_TOL: Hkrt[t_idx] += (L[k, s] * hkc[t_idx, 0] / gammat[k, 0]) * np.exp(lYks_t) # Handle NaN values Hkrt = np.where(np.isnan(Hkrt), FINE_TOL, Hkrt) lHkrt = np.log(np.maximum(Hkrt, FINE_TOL)) # Compute final CDF RD[(k, r)][:, 0] = np.minimum(1.0, np.exp(lHkrt - lGr)) return RD
def _mushift(mu: np.ndarray, i: int) -> np.ndarray: """ Shift the service rate vector. For station i, shifts rates left by one position. For other stations, keeps rates unchanged. Args: mu: Service rate matrix (M x N) i: Station index to shift Returns: Shifted service rate matrix """ M, N = mu.shape if N <= 1: return np.zeros((M, 0)) mushifted = np.zeros((M, N - 1)) for m in range(M): if m == i: mushifted[m, :] = mu[m, 1:N] else: mushifted[m, :] = mu[m, :N - 1] return mushifted def _pfqn_comomrm_ld_wrapper(L: np.ndarray, N: np.ndarray, Z: np.ndarray, mu: np.ndarray) -> Tuple[np.ndarray, float]: """ Wrapper for pfqn_comomrm_ld that handles edge cases. Args: L: Load matrix N: Population vector Z: Think time vector mu: Service rate matrix Returns: Tuple of (result, log_normalizing_constant) """ try: result = pfqn_comomrm_ld(L, N, Z, mu) if hasattr(result, 'lG'): return result.G, result.lG else: # Handle tuple return return result[0], result[1] except Exception: # Fallback for numerical issues return np.zeros(len(N)), 0.0
[docs] def pfqn_stdf_heur(L: np.ndarray, N: np.ndarray, Z: np.ndarray, S: np.ndarray, fcfs_nodes: np.ndarray, rates: np.ndarray, tset: np.ndarray) -> Dict[Tuple[int, int], np.ndarray]: """ Heuristic sojourn time distribution for multiserver FCFS nodes. Implements a variant of McKenna's 1987 method that uses per-class service rate weighting for improved accuracy in multiclass networks. Unlike pfqn_stdf which uses a simpler Erlang model for the waiting component, this heuristic accounts for class-dependent waiting times based on the expected queue composition. Args: L: Load matrix (M x R) - service demands N: Population vector (R,) - number of jobs per class Z: Think time vector (R,) or matrix (1 x R) S: Number of servers at each station (M,) fcfs_nodes: Array of FCFS station indices (0-indexed) rates: Service rates matrix (M x R) tset: Time points at which to evaluate the distribution Returns: Dictionary with (station, class) tuples as keys and response time distribution arrays as values. Each array has shape (len(tset), 2) with: - column 0: CDF values - column 1: time points References: McKenna, J. "Mean Value Analysis for networks with service-time-dependent queue disciplines." Performance Evaluation, 1987. """ from .rd import pfqn_rd L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=int).flatten() Z = np.atleast_1d(np.asarray(Z, dtype=float)).flatten() S = np.asarray(S, dtype=int).flatten() fcfs_nodes = np.asarray(fcfs_nodes, dtype=int).flatten() rates = np.atleast_2d(np.asarray(rates, dtype=float)) tset = np.asarray(tset, dtype=float).flatten() M, R = L.shape T = len(tset) FINE_TOL = 1e-10 tset = np.where(tset == 0, FINE_TOL, tset) # Build mu matrix mu = np.zeros((M, int(np.sum(N)))) for k in range(M): for n in range(int(np.sum(N))): mu[k, n] = min(S[k], n + 1) stability_warn_issued = False RD = {} for k in fcfs_nodes: for r in range(R): if L[k, r] > FINE_TOL: Nr = oner(N, r) # Compute normalizing constant and queue lengths if M == 1: _, lGr = _pfqn_comomrm_ld_wrapper(L, Nr, Z, mu) # Compute Q1 using repairman formula from .utils import pfqn_mu_ms mu_ms = pfqn_mu_ms(int(np.sum(N)), 2, S[k]) mu_ms_mat = np.zeros((1, int(np.sum(N)))) mu_ms_mat[0, :len(mu_ms)] = mu_ms _, lGra = _pfqn_comomrm_ld_wrapper(L, Nr, Z, mu_ms_mat) Q1 = np.zeros((M, R)) for s in range(R): Q1[k, s] = L[k, s] * np.exp(lGra - lGr) is_num_stable = True else: _, Q1, _, _, lGr, is_num_stable, _ = pfqn_mvald(L, Nr, Z, mu) lGr = lGr[-1] if hasattr(lGr, '__len__') else lGr if not is_num_stable and not stability_warn_issued: stability_warn_issued = True # Build heuristic h_kc(t, n) with per-class weighting hkc = np.zeros((T, 1 + int(np.sum(N)))) for n in range(int(np.sum(N)) + 1): if n < S[k]: # Exponential service (no queueing) D0, D1 = map_exponential(1.0 / rates[k, r]) else: # Service + weighted waiting in queue # The heuristic: weight waiting by class-specific queue lengths C = [map_exponential(1.0 / rates[k, r])] Q1_k_total = np.sum(Q1[k, :]) if Q1_k_total > FINE_TOL: for s in range(R): if Q1[k, s] > FINE_TOL: # Mean waiting time component for class s mean_wait_s = Q1[k, s] * (n - S[k] + 1) / Q1_k_total / rates[k, s] C.append(map_exponential(mean_wait_s)) else: # Fallback to uniform weighting mean_wait = (n - S[k] + 1) / (S[k] * rates[k, r]) erlang = map_erlang(mean_wait, n - S[k] + 1) C = [map_exponential(1.0 / rates[k, r]), erlang] D0, D1 = map_sumind(C) # Compute CDF at all time points cdf_vals = map_cdf(D0, D1, tset) hkc[:, n] = cdf_vals # Initialize result array RD[(k, r)] = np.zeros((T, 2)) RD[(k, r)][:, 1] = tset # Compute lGk for network without station k other_stations = np.array([i for i in range(M) if i != k]) if len(other_stations) > 0: if len(other_stations) == 1: _, lGk = _pfqn_comomrm_ld_wrapper( L[other_stations, :], Nr, Z, mu[other_stations, :] ) else: _, _, _, _, lGk, _, _ = pfqn_mvald( L[other_stations, :], Nr, Z, mu[other_stations, :] ) lGk = lGk[-1] if hasattr(lGk, '__len__') else lGk else: lGk = 0.0 # Use recursive form for load-dependent models Hkrt = np.zeros(T) for t_idx in range(T): gammat = mu.copy() # Adjust gamma for time-dependent service for m in range(1, int(np.sum(Nr)) + 1): if hkc[t_idx, m] > FINE_TOL: gammat[k, m - 1] = mu[k, m - 1] * hkc[t_idx, m - 1] / hkc[t_idx, m] gammak = _mushift(gammat, k) if gammak.shape[1] > 0: gammak = gammak[:, :int(np.sum(Nr))] # nvec = 0 case Hkrt[t_idx] = hkc[t_idx, 0] * np.exp(lGk) # nvec >= 1 cases for s in range(R): if Nr[s] > 0: Nr_s = oner(Nr, s) # Use pfqn_rd for the recursive computation try: lYks_t = pfqn_rd(L, Nr_s, Z, gammak) lYks_t = lYks_t[-1] if hasattr(lYks_t, '__len__') else lYks_t except Exception: # Fallback to mvald if M == 1: _, lYks_t = _pfqn_comomrm_ld_wrapper( L, Nr_s, Z, gammak ) else: _, _, _, _, lYks_t, _, _ = pfqn_mvald( L, Nr_s, Z, gammak ) lYks_t = lYks_t[-1] if hasattr(lYks_t, '__len__') else lYks_t if gammat[k, 0] > FINE_TOL: Hkrt[t_idx] += (L[k, s] * hkc[t_idx, 0] / gammat[k, 0]) * np.exp(lYks_t) # Handle NaN values Hkrt = np.where(np.isnan(Hkrt), FINE_TOL, Hkrt) lHkrt = np.log(np.maximum(Hkrt, FINE_TOL)) # Compute final CDF RD[(k, r)][:, 0] = np.minimum(1.0, np.exp(lHkrt - lGr)) return RD
__all__ = [ 'pfqn_stdf', 'pfqn_stdf_heur', ]