Source code for line_solver.api.pfqn.mvald

"""
Load-dependent MVA for Product-Form Queueing Networks.

Implements exact MVA for closed queueing networks with load-dependent
service rates, where the service rate at each station may depend on
the number of jobs present.
"""

import numpy as np
from typing import Tuple, Optional

# Try to import JIT-compiled kernels
try:
    from .mvald_jit import (
        HAS_NUMBA as MVALD_HAS_NUMBA,
        mvald_iteration_kernel_jit,
    )
except ImportError:
    MVALD_HAS_NUMBA = False
    mvald_iteration_kernel_jit = None

# Threshold for using JIT (number of population states)
MVALD_JIT_THRESHOLD = 100


def _pprod_init(N: np.ndarray) -> np.ndarray:
    """Initialize population product iterator.

    Args:
        N: Maximum population vector.

    Returns:
        Initial state (zeros).
    """
    return np.zeros_like(N, dtype=int)


def _pprod_next(n: np.ndarray, N: np.ndarray) -> Optional[np.ndarray]:
    """Get next population vector in lexicographic order.

    Iterates through all vectors n where 0 <= n <= N.

    Args:
        n: Current population vector.
        N: Maximum population vector.

    Returns:
        Next population vector, or None if done.
    """
    n = n.copy()
    R = len(N)

    # Check if at maximum
    if np.all(n == N):
        return None

    # Find rightmost index that can be incremented
    s = R - 1
    while s >= 0 and n[s] == N[s]:
        n[s] = 0
        s -= 1

    if s < 0:
        return None

    n[s] += 1
    return n


def _hashpop(n: np.ndarray, N: np.ndarray) -> int:
    """Hash population vector to linear index.

    Converts a multi-dimensional population vector to a unique
    linear index for array storage.

    Args:
        n: Population vector.
        N: Maximum population vector.

    Returns:
        Linear index (0-based).
    """
    index = 0
    for r in range(len(N)):
        prod = 1
        for j in range(r):
            prod *= N[j] + 1
        index += prod * n[r]
    return index


def _oner(n: np.ndarray, s: int) -> np.ndarray:
    """Create population vector with one less job in class s.

    Args:
        n: Population vector.
        s: Class index to decrement.

    Returns:
        Modified population vector.
    """
    n_copy = n.copy()
    if s >= 0 and s < len(n_copy):
        n_copy[s] = max(0, n_copy[s] - 1)
    return n_copy


[docs] def pfqn_mvald( L: np.ndarray, N: np.ndarray, Z: np.ndarray, mu: np.ndarray, stabilize: bool = True ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, bool, np.ndarray]: """ Exact MVA for load-dependent closed queueing networks. This algorithm extends standard MVA to handle stations where the service rate depends on the number of jobs present. Args: L: Service demand matrix (M x R) - rows are stations, columns are classes. N: Population vector (R,). Z: Think time vector (R,). mu: Load-dependent rate matrix (M x Ntot) where mu[i,k] is the service rate at station i when k jobs are present. stabilize: Force non-negative probabilities (default: True). Returns: Tuple of (XN, QN, UN, CN, lGN, isNumStable, pi): XN: Class throughputs (1 x R). QN: Mean queue lengths (M x R). UN: Utilizations (M x R). CN: Cycle times (1 x R). lGN: Log normalizing constant evolution. isNumStable: True if numerically stable. pi: Marginal queue-length probabilities (M x Ntot+1). """ L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=int).ravel() Z = np.asarray(Z, dtype=float).ravel() mu = np.atleast_2d(np.asarray(mu, dtype=float)) M, R = L.shape Ntot = int(np.sum(N)) # Initialize outputs XN = np.zeros((1, R)) QN = np.zeros((M, R)) UN = np.zeros((M, R)) CN = np.zeros((1, R)) WN = np.zeros((M, R)) # Handle negative population if np.any(N < 0): lGN = np.array([-np.inf]) pi = np.zeros((M, Ntot + 1)) return XN, QN, UN, CN, lGN, True, pi # Number of population states num_states = int(np.prod(N + 1)) # Use JIT kernel for large state spaces if MVALD_HAS_NUMBA and num_states > MVALD_JIT_THRESHOLD: N_float = N.astype(np.float64) XN_jit, QN_jit, UN_jit, CN_jit, lGN_jit, is_stable, pi_jit = mvald_iteration_kernel_jit( L, N_float, Z, mu, stabilize ) # The JIT kernel returns simplified lGN (just final value), need to wrap lGN_out = lGN_jit return XN_jit, QN_jit, UN_jit, CN_jit, lGN_out, is_stable, pi_jit # Throughputs for each population Xs = np.zeros((R, num_states)) # Marginal queue-length probabilities: pi[i,k,pop] = P(queue i has k jobs | pop) pi_all = np.ones((M, Ntot + 1, num_states)) is_num_stable = True warn = True lGN_list = [0.0] # Iterate through all population vectors n = _pprod_init(N) while n is not None: pop_idx = _hashpop(n, N) WN_local = np.zeros((M, R)) # Compute waiting times for each class for s in range(R): if n[s] > 0: n_minus_s = _oner(n, s) n_minus_s_idx = _hashpop(n_minus_s, N) for ist in range(M): for k in range(1, int(np.sum(n)) + 1): if k <= mu.shape[1]: mu_val = mu[ist, k - 1] # mu is 0-indexed, k starts at 1 if mu_val > 0: WN_local[ist, s] += (L[ist, s] / mu_val) * k * pi_all[ist, k - 1, n_minus_s_idx] # Compute throughput denom = Z[s] + np.sum(WN_local[:, s]) if denom > 0: Xs[s, pop_idx] = n[s] / denom # Compute pi(k|n) for k >= 1 for k in range(1, int(np.sum(n)) + 1): for ist in range(M): pi_all[ist, k, pop_idx] = 0 for s in range(R): if n[s] > 0: n_minus_s = _oner(n, s) n_minus_s_idx = _hashpop(n_minus_s, N) for ist in range(M): if k <= mu.shape[1]: mu_val = mu[ist, k - 1] if mu_val > 0: pi_all[ist, k, pop_idx] += (L[ist, s] / mu_val) * Xs[s, pop_idx] * pi_all[ist, k - 1, n_minus_s_idx] # Compute pi(0|n) for ist in range(M): sum_pi = np.sum(pi_all[ist, 1:int(np.sum(n)) + 1, pop_idx]) p0 = 1.0 - sum_pi if p0 < 0: if warn: warn = False is_num_stable = False if stabilize: pi_all[ist, 0, pop_idx] = np.finfo(float).eps else: pi_all[ist, 0, pop_idx] = p0 else: pi_all[ist, 0, pop_idx] = p0 # Track log normalizing constant evolution last_nnz = -1 for r in range(R - 1, -1, -1): if n[r] > 0: last_nnz = r break if last_nnz >= 0: # Check condition: sum(n[0:last_nnz-1]) == sum(N[0:last_nnz-1]) and sum(n[last_nnz+1:]) == 0 cond1 = np.sum(n[:last_nnz]) == np.sum(N[:last_nnz]) if last_nnz > 0 else True cond2 = np.sum(n[last_nnz + 1:]) == 0 if last_nnz < R - 1 else True if cond1 and cond2: X_val = Xs[last_nnz, pop_idx] if X_val > 0: lGN_list.append(lGN_list[-1] - np.log(X_val)) # Next population n = _pprod_next(n, N) # Get final results for full population N N_idx = _hashpop(N, N) XN = Xs[:, N_idx].reshape(1, -1) pi = pi_all[:, :, N_idx] # Compute waiting times at full population WN = np.zeros((M, R)) for s in range(R): if N[s] > 0: n_minus_s = _oner(N, s) n_minus_s_idx = _hashpop(n_minus_s, N) for ist in range(M): for k in range(1, Ntot + 1): if k <= mu.shape[1]: mu_val = mu[ist, k - 1] if mu_val > 0: WN[ist, s] += (L[ist, s] / mu_val) * k * pi_all[ist, k - 1, n_minus_s_idx] # Compute metrics QN = WN * np.tile(XN, (M, 1)) UN = 1 - pi[:, 0].reshape(-1, 1) # Utilization from pi(0) UN = np.tile(UN, (1, R)) # Broadcast to all classes # Zero out utilization for classes with zero demand (Disabled services) UN[L == 0] = 0.0 # Compute cycle times CN = np.zeros((1, R)) for r in range(R): if XN[0, r] > 0: CN[0, r] = N[r] / XN[0, r] - Z[r] lGN = np.array(lGN_list) return XN, QN, UN, CN, lGN, is_num_stable, pi
[docs] def pfqn_mvams( lambda_arr: np.ndarray, L: np.ndarray, N: np.ndarray, Z: np.ndarray, mi: Optional[np.ndarray] = None, S: Optional[np.ndarray] = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, float]: """ General-purpose MVA for mixed networks with multiserver nodes. This function handles networks with open/closed classes and multi-server stations, routing to the appropriate specialized algorithm. Args: lambda_arr: Arrival rate vector (R,). Use 0 for closed classes. L: Service demand matrix (M x R). N: Population vector (R,). Use np.inf for open classes. Z: Think time vector (R,). mi: Queue replication factors (M,) (default: ones). S: Number of servers per station (M,) (default: ones). Returns: Tuple of (XN, QN, UN, CN, lG): XN: Class throughputs (1 x R). QN: Mean queue lengths (M x R). UN: Utilizations (M x R). CN: Cycle times (1 x R). lG: Log normalizing constant. References: Original MATLAB: matlab/src/api/pfqn/pfqn_mvams.m """ from .mva import pfqn_mva from .mixed import pfqn_mvamx from .mvaldmx import pfqn_mvaldms L = np.atleast_2d(np.asarray(L, dtype=float)) N = np.asarray(N, dtype=float).ravel() # Keep as float to allow inf Z = np.asarray(Z, dtype=float).ravel() lambda_arr = np.asarray(lambda_arr, dtype=float).ravel() M, R = L.shape # Compute Ntot only from finite (closed class) populations Ntot = int(np.sum(N[np.isfinite(N)])) if S is None: S = np.ones(M, dtype=int) else: S = np.asarray(S, dtype=int).ravel() if mi is None: mi = np.ones(M) else: mi = np.asarray(mi, dtype=float).ravel() # If Z is empty, use zeros if len(Z) == 0: Z = np.zeros(R) # Build load-dependent rate matrix for multi-server stations if Ntot > 0: mu = np.ones((M, Ntot)) for ist in range(M): for k in range(1, Ntot + 1): mu[ist, k - 1] = min(k, S[ist]) else: mu = np.ones((M, 1)) # Get max servers (only from finite S values) S_finite = S[np.isfinite(S)] max_S = int(np.max(S_finite)) if len(S_finite) > 0 else 1 if max_S == 1: # No multi-server nodes if np.any(np.isinf(N)): # Open or mixed model XN, QN, UN, CN, lG = pfqn_mvamx(lambda_arr, L, N, Z, mi) else: # Closed model N_int = N.astype(int) XN, CN_out, QN, UN, RN, TN, AN = pfqn_mva(L, N_int, Z, mi) # Compute log normalizing constant from .nc import pfqn_ca _, lG = pfqn_ca(L, N_int, Z) CN = CN_out # Use cycle times from MVA else: # Model has multi-server nodes if np.any(np.isinf(N)): # Open or mixed model if np.max(mi) == 1: lG = np.nan # NC not available in this case XN, QN, UN, CN, _ = pfqn_mvaldms(lambda_arr, L, N, Z, S) else: raise ValueError("Queue replicas not available in exact MVA for mixed models.") else: # Closed model with multi-server N_int = N.astype(int) XN, QN, UN, CN, lGN, _, _ = pfqn_mvald(L, N_int, Z, mu) lG = lGN[-1] if len(lGN) > 0 else np.nan return XN, QN, UN, CN, lG
__all__ = [ 'pfqn_mvald', 'pfqn_mvams', ]