Source code for line_solver.api.qsys.mapdc

"""
MAP/D/c queue analysis using Non-Skip-Free (NSF) Markov chain methods.

Implements the Q-MAM algorithm for MAP/D/c queues with:
- Markovian Arrival Process (MAP) arrivals
- Deterministic service times
- c parallel FCFS servers

References:
    Van Houdt, B. "Q-MAM: A Toolbox for Numerical Analysis of MMAP[K]/G[K]/1
    Queues." ACM Transactions on Mathematical Software, 2011.
"""

import numpy as np
from scipy import linalg
from typing import Dict, Tuple, Optional
from dataclasses import dataclass

# Import CTMC solver from mc module
from ..mc import ctmc_solve


@dataclass
class MapDcResult:
    """Result container for MAP/D/c queue analysis."""
    mean_queue_length: float
    mean_waiting_time: float
    mean_sojourn_time: float
    utilization: float
    queue_length_dist: np.ndarray
    waiting_time_dist: np.ndarray
    analyzer: str


[docs] def qsys_mapdc( D0: np.ndarray, D1: np.ndarray, s: float, c: int, max_num_comp: int = 1000, num_steps: int = 1, verbose: int = 0 ) -> Dict: """ Analyze MAP/D/c queue (MAP arrivals, deterministic service, c servers). Uses Non-Skip-Free (NSF) Markov chain analysis embedding at deterministic service intervals. Multiple arrivals can occur per interval. Args: D0: MAP hidden transition matrix (n x n). D1: MAP arrival transition matrix (n x n). s: Deterministic service time (positive scalar). c: Number of servers. max_num_comp: Maximum number of queue length components (default 1000). num_steps: Number of waiting time distribution points per interval (default 1). verbose: Verbosity level (default 0). Returns: dict: Performance metrics including: - mean_queue_length: Mean number of customers in system - mean_waiting_time: Mean waiting time in queue - mean_sojourn_time: Mean sojourn time (waiting + service) - utilization: Server utilization (per server) - queue_length_dist: Queue length distribution P(Q=n) - waiting_time_dist: Waiting time CDF at discrete points - analyzer: Analyzer identifier """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) m = D0.shape[0] # Validate inputs assert D0.shape == (m, m) and D1.shape == (m, m), "D0 and D1 must be square matrices" assert s > 1e-14, "Service time s must be strictly positive" assert c >= 1, "Number of servers c must be at least 1" # Uniformization constant lam = np.max(np.abs(np.diag(D0))) P0 = D0 / lam + np.eye(m) P1 = D1 / lam # Test load Q = D0 + D1 theta = ctmc_solve(Q) lambda_arr = theta @ D1 @ np.ones(m) epsilon = 1e-12 load = lambda_arr * s / c if load >= 1 - epsilon: raise ValueError(f"The load {load} of the system exceeds one") # Compute NSF blocks P(k,s) - probability of k arrivals in time s # P(0,s) = exp(D0*s) P0s = linalg.expm(D0 * s) Ptot = P0s.copy() # Poisson terms for uniformization h = _compute_poisson_terms(lam * s, epsilon) Pterms = len(h) # Initialize K matrices Kold = [np.eye(m) if j == 0 else np.linalg.matrix_power(P0, j) for j in range(Pterms)] # Compute P(k,s) for k = 1, 2, ... Ps = [] k = 1 prob_cum = np.min(np.sum(Ptot, axis=1)) while prob_cum < 1 - epsilon: K = [P1 @ Kold[0]] htemp = h[k-1] if k <= len(h) else _poisson_pdf(k, lam * s) Psk = htemp * K[0] for j in range(1, Pterms): K.append(P0 @ K[j-1] + P1 @ Kold[j]) htemp = htemp * lam * s / (k + j) Psk = Psk + htemp * K[j] Ps.append(Psk) Ptot = Ptot + Psk Kold = K prob_cum = np.min(np.sum(Ptot, axis=1)) k += 1 # Build A matrix: A = [P0s Ps{1} Ps{2} ... ] num_blocks = 1 + len(Ps) A = np.zeros((m, m * num_blocks)) A[:, :m] = P0s for i, Psk in enumerate(Ps): A[:, (i+1)*m:(i+2)*m] = Psk # Compute G using NSF-GHT algorithm G = _nsf_ght(A, c, max_num_it=10000, verbose=verbose) # Compute stationary distribution pi pi = _nsf_pi(A, G, c, max_num_comp=max_num_comp, verbose=verbose) # Queue length distribution num_levels = pi.shape[0] // m ql = np.zeros(num_levels) for i in range(num_levels): ql[i] = np.sum(pi[i*m:(i+1)*m]) # Mean queue length mean_ql = np.sum(np.arange(len(ql)) * ql) # Waiting time distribution W(t) for t = {0, s, 2s, ...} w = [] w0 = np.sum(pi[:min(c*m, len(pi))]) w.append(w0) wt_accum = w0 i = 2 while wt_accum < 1 - 1e-10 and i * m * c < len(pi): start_idx = (i-1) * m * c end_idx = min(i * m * c, len(pi)) w_level = w[-1] + np.sum(pi[start_idx:end_idx]) w.append(w_level) wt_accum = w_level i += 1 w = np.array(w) # Mean waiting time from CDF using survival function integration step_size = s / num_steps mean_wt = np.sum(1.0 - w[:-1]) * step_size if len(w) > 1 else 0.0 # Mean sojourn time mean_st = mean_wt + s # Utilization rho = lambda_arr * s / c return { 'mean_queue_length': float(mean_ql), 'mean_waiting_time': float(mean_wt), 'mean_sojourn_time': float(mean_st), 'utilization': float(rho), 'queue_length_dist': ql, 'waiting_time_dist': w, 'analyzer': f'Q-MAM:MAP/D/{c}' }
[docs] def qsys_mapd1( D0: np.ndarray, D1: np.ndarray, s: float, max_num_comp: int = 1000, num_steps: int = 1 ) -> Dict: """ Analyze MAP/D/1 queue (single-server convenience function). Args: D0: MAP hidden transition matrix. D1: MAP arrival transition matrix. s: Deterministic service time. max_num_comp: Maximum number of queue length components. num_steps: Number of waiting time points per interval. Returns: dict: Performance metrics (see qsys_mapdc). """ return qsys_mapdc(D0, D1, s, 1, max_num_comp, num_steps)
def _compute_poisson_terms(lambda_s: float, epsilon: float) -> np.ndarray: """Compute Poisson probability terms for uniformization.""" terms = [] h = np.exp(-lambda_s) * lambda_s sum_h = h + np.exp(-lambda_s) terms.append(h) while sum_h < 1 - epsilon: next_h = terms[-1] * lambda_s / (len(terms) + 1) terms.append(next_h) sum_h += next_h return np.array(terms) def _poisson_pdf(k: int, lam: float) -> float: """Compute Poisson PDF P(X=k) for parameter lambda.""" result = np.exp(-lam) for i in range(1, k + 1): result *= lam / i return result def _nsf_ght( A: np.ndarray, N: int, max_num_it: int = 10000, verbose: int = 0 ) -> np.ndarray: """ Non-Skip-Free Gail-Hantler-Taylor algorithm for computing G matrix. Computes the minimal nonnegative solution to: G = C0 + C1*G + C2*G^2 + ... + C_max*G^max Args: A: Block matrix [A0 A1 A2 ... A_max] with m rows N: Skip factor (e.g., c for MAP/D/c) max_num_it: Maximum iterations verbose: Verbosity level Returns: G matrix of dimension (m*N) x (m*N) """ m = A.shape[0] K = A.shape[1] // m - 1 # Initialize G = A[:, :m*N] G = A[:, :min(m*N, A.shape[1])].copy() if G.shape[1] < m * N: G_padded = np.zeros((m, m * N)) G_padded[:, :G.shape[1]] = G G = G_padded check = 1.0 numit = 0 while check > 1e-14 and numit < max_num_it: numit += 1 Gold = G.copy() temp = G.copy() # G = A[:, :m*N] + A[:, N*m:(N+1)*m] @ G AN = A[:, N*m:(N+1)*m] if (N+1)*m <= A.shape[1] else np.zeros((m, m)) G = A[:, :m*N] + AN @ G # For j = N+1 to K for j in range(N + 1, K + 1): # temp = [zeros(m) temp[:, :(N-1)*m]] + temp[:, (N-1)*m:] @ Gold new_temp = np.zeros((m, m * N)) new_temp[:, m:] = temp[:, :(N-1)*m] new_temp += temp[:, (N-1)*m:] @ Gold temp = new_temp # G = G + A[:, j*m:(j+1)*m] @ temp if (j+1)*m <= A.shape[1]: Aj = A[:, j*m:(j+1)*m] G = G + Aj @ temp check = np.max(np.abs(G - Gold)) if verbose > 0 and numit % verbose == 0: print(f"Check after {numit} iterations: {check}") if numit == max_num_it and check > 1e-14: print(f"Warning: NSF_GHT - Maximum iterations {numit} reached") # Expand to full (m*N) x (m*N) if needed if N > 1: full_G = np.zeros((m * N, m * N)) full_G[:m, :] = G # Compute remaining block rows for j in range(2, N + 1): prev_row = full_G[(j-2)*m:(j-1)*m, :] shifted = np.zeros((m, m * N)) shifted[:, m:] = prev_row[:, :(N-1)*m] mult_part = prev_row[:, (N-1)*m:] @ full_G[:m, :] full_G[(j-1)*m:j*m, :] = shifted + mult_part G = full_G return G def _nsf_pi( A: np.ndarray, G: np.ndarray, N: int, max_num_comp: int = 1000, verbose: int = 0 ) -> np.ndarray: """ Compute stationary distribution of NSF Markov chain. Args: A: Block matrix [A0 A1 ... A_max] G: G matrix from NSF-GHT N: Skip factor max_num_comp: Maximum components verbose: Verbosity level Returns: Stationary distribution vector """ m = A.shape[0] K = A.shape[1] // m - 1 # Construct reblocked C matrices extra_blocks_C = (N - 1) - ((K - 1) % N) if K > 0 else 0 C_width = m * (N - 1) + A.shape[1] + m * extra_blocks_C C = np.zeros((N * m, C_width)) # Fill C with shifted copies of A for row_block in range(N): row_start = row_block * m col_start = row_block * m for col in range(A.shape[1]): if col_start + col < C_width: C[row_start:row_start+m, col_start+col] = A[:, col].reshape(-1, 1).flatten() # Homogeneous case # Compute FirstRowSumsG first_row_sums_G = np.zeros((m, N * m)) G1 = G[:m, :] if G.shape[0] >= m else G for i in range(m): for block in range(N): if block == 0: first_row_sums_G[i, block*m:(block+1)*m] = G1[i, block*m:(block+1)*m] else: first_row_sums_G[i, block*m:(block+1)*m] = ( first_row_sums_G[i, (block-1)*m:block*m] + G1[i, block*m:(block+1)*m] ) # ghat = stat(FirstRowSumsG[:, (N-1)*m:]) last_block = first_row_sums_G[:, (N-1)*m:N*m] # Compute stationary distribution of last_block try: eigenvalues, eigenvectors = linalg.eig(last_block.T) idx = np.argmin(np.abs(eigenvalues)) ghat = np.real(eigenvectors[:, idx]) ghat = ghat / np.sum(ghat) ghat = ghat.reshape(1, -1) except: ghat = np.ones((1, m)) / m # pi0 = ghat @ G1 pi0 = ghat @ G1 # Compute normalization factor g = ghat @ first_row_sums_G # Simplified normalization using series summation # For M/G/1-type, pi = pi0 * (I-R)^(-1) where R is derived from G # Use iterative computation for stability # Compute R matrix: R = sum(A_i * G^i) R = A[:, :m].copy() G_pow = G1.copy() for i in range(1, K + 1): if (i+1)*m <= A.shape[1]: R = R + A[:, i*m:(i+1)*m] @ G_pow G_pow = G_pow @ G1 # Compute stationary vector using R pi_list = [pi0.flatten()] total_mass = np.sum(pi0) R_pow = R.copy() for i in range(1, max_num_comp): pi_i = pi0 @ R_pow mass = np.sum(pi_i) if mass < 1e-15: break pi_list.append(pi_i.flatten()) total_mass += mass R_pow = R_pow @ R # Normalize pi = np.concatenate(pi_list) pi = pi / np.sum(pi) return pi