Source code for line_solver.api.qsys.scheduling

"""
M/G/1 Queue Scheduling Discipline Analysis.

Native Python implementations of analytical formulas for M/G/1 queues
with various scheduling disciplines including priority, SRPT, feedback, etc.

Key functions:
    qsys_mg1_prio: Non-preemptive (Head-of-Line) priority scheduling
    qsys_mg1_srpt: Shortest Remaining Processing Time scheduling
    qsys_mg1_fb: Foreground-Background (LAS) scheduling
    qsys_mg1_lrpt: Longest Remaining Processing Time scheduling
    qsys_mg1_psjf: Preemptive Shortest Job First
    qsys_mg1_setf: Shortest Expected Time First

References:
    Original MATLAB: matlab/src/api/qsys/qsys_mg1_*.m
    Wierman and Harchol-Balter, SIGMETRICS 2003
    Kleinrock, "Queueing Systems, Volume I: Theory", 1975
"""

import numpy as np
from typing import Tuple
from scipy.integrate import quad


[docs] def qsys_mg1_prio(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with non-preemptive (Head-of-Line) priorities. Matches MATLAB qsys_mg1_prio.m exactly. Args: lambda_vec: Vector of arrival rates per priority class (class 1 = highest) mu_vec: Vector of service rates per priority class cs_vec: Vector of coefficients of variation per priority class Returns: Tuple of (W, rho): W: Vector of mean response times per priority class rho: System utilization (rhohat = Q/(1+Q) format) """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec <= 0): raise ValueError("lambda, mu, and cs must all be positive") rho_i = lambda_vec / mu_vec rho_total = np.sum(rho_i) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") # B_0 = sum_i lambda_i * (1 + cs_i^2) / mu_i^2 / 2 B_0 = np.sum(lambda_vec * (1 + cs_vec ** 2) / (mu_vec ** 2)) / 2 K = len(lambda_vec) W_q = np.zeros(K) for k in range(K): rho_prev = np.sum(rho_i[:k]) if k > 0 else 0.0 rho_curr = np.sum(rho_i[:k + 1]) W_q[k] = B_0 / ((1 - rho_prev) * (1 - rho_curr)) W = W_q + 1.0 / mu_vec Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
[docs] def qsys_mg1_srpt(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Shortest Remaining Processing Time (SRPT). Matches MATLAB qsys_mg1_srpt.m exactly: - Exponential case: preemptive priority with cumulative residual E_R_k - General case: Schrage-Miller formula with numerical integration Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec < 0): raise ValueError("lambda and mu must be positive, cs must be non-negative") K = len(lambda_vec) mean_service = 1.0 / mu_vec # Sort by mean service time ascending sort_idx = np.argsort(mean_service) unsort_idx = np.argsort(sort_idx) lambda_sorted = lambda_vec[sort_idx] mu_sorted = mu_vec[sort_idx] cs_sorted = cs_vec[sort_idx] rho_i = lambda_sorted / mu_sorted rho_total = np.sum(rho_i) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") is_exponential = np.all(np.abs(cs_sorted - 1) < 1e-10) if is_exponential or K == 1: W_sorted = _srpt_exp(lambda_sorted, mu_sorted) else: W_sorted = _srpt_general(lambda_sorted, mu_sorted, cs_sorted) W = W_sorted[unsort_idx] Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
def _srpt_exp(lambda_arr, mu_arr): """SRPT for exponential service - preemptive priority formula. Matches MATLAB qsys_mg1_srpt_exp.""" K = len(lambda_arr) rho_i = lambda_arr / mu_arr W = np.zeros(K) for k in range(K): rho_prev = np.sum(rho_i[:k]) if k > 0 else 0.0 rho_curr = np.sum(rho_i[:k + 1]) # Cumulative residual service from classes 1 to k E_R_k = np.sum(lambda_arr[:k + 1] / (mu_arr[:k + 1] ** 2)) W_q = E_R_k / ((1 - rho_prev) * (1 - rho_curr)) W[k] = W_q + 1.0 / mu_arr[k] return W def _srpt_general(lambda_arr, mu_arr, cs_arr): """SRPT for general service - Schrage-Miller formula. Matches MATLAB qsys_mg1_srpt_general.""" K = len(lambda_arr) lambda_total = np.sum(lambda_arr) p = lambda_arr / lambda_total W = np.zeros(K) for k in range(K): x = 1.0 / mu_arr[k] # Truncated load from classes with smaller mean service times smaller_idx = (1.0 / mu_arr) <= x rho_x = np.sum(lambda_arr[smaller_idx] / mu_arr[smaller_idx]) if rho_x >= 1: W[k] = np.inf continue # Numerator numerator = 0.0 for i in range(K): if 1.0 / mu_arr[i] <= x: numerator += lambda_arr[i] / (mu_arr[i] ** 2) # Second integral: integral_0^x dt/(1-rho(t)) n_steps = 1000 dt = x / n_steps second_term = 0.0 for step in range(1, n_steps + 1): t = (step - 0.5) * dt rho_t = 0.0 for i in range(K): rho_t += lambda_arr[i] / mu_arr[i] * (1 - np.exp(-mu_arr[i] * t) * (1 + mu_arr[i] * t)) if rho_t < 1: second_term += dt / (1 - rho_t) W[k] = numerator / (1 - rho_x) ** 2 + second_term return W
[docs] def qsys_mg1_fb(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Foreground-Background (FB/LAS) scheduling. Matches MATLAB qsys_mg1_fb.m exactly: - Exponential case: numerical integration of E[T(x)] * f_k(x) - General case: class-based approximation Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec < 0): raise ValueError("lambda and mu must be positive, cs must be non-negative") K = len(lambda_vec) rho_total = np.sum(lambda_vec / mu_vec) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") if np.all(np.abs(cs_vec - 1) < 1e-6): W = _fb_exp(lambda_vec, mu_vec) else: W = _fb_general(lambda_vec, mu_vec, cs_vec) Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
def _compute_fb_response(x, lambda_arr, mu_arr, p, lambda_total): """Compute FB/LAS response time E[T(x)] for a job of size x. Matches MATLAB compute_fb_response.""" K = len(lambda_arr) # rho_x = lambda * integral_0^x F_bar(t) dt rho_x = 0.0 for i in range(K): mu_i = mu_arr[i] int_Fbar = (1 - np.exp(-mu_i * x)) / mu_i rho_x += p[i] * lambda_total * int_Fbar # numerator = lambda * integral_0^x t * F_bar(t) dt numerator = 0.0 for i in range(K): mu_i = mu_arr[i] int_tFbar = (1 - np.exp(-mu_i * x) * (1 + mu_i * x)) / mu_i**2 numerator += p[i] * lambda_total * int_tFbar if rho_x >= 1: return np.inf return numerator / (1 - rho_x)**2 + x / (1 - rho_x) def _fb_exp(lambda_arr, mu_arr): """FB for exponential service - numerical integration. Matches MATLAB qsys_mg1_fb_exp.""" K = len(lambda_arr) lambda_total = np.sum(lambda_arr) p = lambda_arr / lambda_total W = np.zeros(K) for k in range(K): mu_k = mu_arr[k] x_max = 20 / mu_k def integrand(x): T = _compute_fb_response(x, lambda_arr, mu_arr, p, lambda_total) f_k = mu_k * np.exp(-mu_k * x) return T * f_k W[k], _ = quad(integrand, 0, x_max, limit=200, epsrel=1e-8, epsabs=1e-10) return W def _fb_general(lambda_arr, mu_arr, cs_arr): """FB for general service - class-based approximation. Matches MATLAB qsys_mg1_fb_general.""" K = len(lambda_arr) W = np.zeros(K) for k in range(K): x = 1.0 / mu_arr[k] rho_x = 0.0 for i in range(K): if abs(cs_arr[i] - 1) < 1e-6: integral_Fbar = (1 - np.exp(-mu_arr[i] * x)) / mu_arr[i] else: integral_Fbar = min(x, 1.0 / mu_arr[i]) rho_x += lambda_arr[i] * integral_Fbar numerator = 0.0 for i in range(K): if abs(cs_arr[i] - 1) < 1e-6: mu_i = mu_arr[i] integral_tFbar = (1 - np.exp(-mu_i * x) * (1 + mu_i * x)) / mu_i**2 else: integral_tFbar = min(x**2 / 2, 1.0 / mu_arr[i]**2) numerator += lambda_arr[i] * integral_tFbar if rho_x >= 1: W[k] = np.inf else: W[k] = numerator / (1 - rho_x)**2 + x / (1 - rho_x) return W
[docs] def qsys_mg1_lrpt(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Longest Remaining Processing Time (LRPT). Matches MATLAB qsys_mg1_lrpt.m exactly: - Exponential case: numerical integration of E[T(x)] * f_k(x) - General case: preemptive priority with descending service time ordering Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec < 0): raise ValueError("lambda and mu must be positive, cs must be non-negative") K = len(lambda_vec) rho_total = np.sum(lambda_vec / mu_vec) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") if np.all(np.abs(cs_vec - 1) < 1e-6): W = _lrpt_exp(lambda_vec, mu_vec) else: W = _lrpt_general(lambda_vec, mu_vec, cs_vec) Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
def _lrpt_exp(lambda_arr, mu_arr): """LRPT for exponential service - numerical integration. Matches MATLAB qsys_mg1_lrpt_exp.""" K = len(lambda_arr) lambda_total = np.sum(lambda_arr) rho_total = np.sum(lambda_arr / mu_arr) p = lambda_arr / lambda_total # E[X^2] for the mixture = sum_i p_i * 2/mu_i^2 E_X2 = np.sum(p * 2.0 / (mu_arr**2)) W = np.zeros(K) for k in range(K): mu_k = mu_arr[k] x_max = 20 / mu_k def T_of_x(x): return x / (1 - rho_total) + lambda_total * E_X2 / (2 * (1 - rho_total)**2) def integrand(x): f_k = mu_k * np.exp(-mu_k * x) return T_of_x(x) * f_k W[k], _ = quad(integrand, 0, x_max, limit=200, epsrel=1e-8, epsabs=1e-10) return W def _lrpt_general(lambda_arr, mu_arr, cs_arr): """LRPT for general service - preemptive priority with descending ordering. Matches MATLAB qsys_mg1_lrpt_general.""" K = len(lambda_arr) mean_service = 1.0 / mu_arr # Sort by mean service time DESCENDING for LRPT priority sort_idx = np.argsort(-mean_service) unsort_idx = np.argsort(sort_idx) lambda_sorted = lambda_arr[sort_idx] mu_sorted = mu_arr[sort_idx] rho_i = lambda_sorted / mu_sorted W_sorted = np.zeros(K) for k in range(K): rho_prev = np.sum(rho_i[:k]) if k > 0 else 0.0 rho_curr = np.sum(rho_i[:k + 1]) E_R_k = np.sum(lambda_sorted[:k + 1] / (mu_sorted[:k + 1]**2)) W_q = E_R_k / ((1 - rho_prev) * (1 - rho_curr)) W_sorted[k] = W_q + 1.0 / mu_sorted[k] return W_sorted[unsort_idx]
[docs] def qsys_mg1_psjf(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Preemptive Shortest Job First (PSJF). Matches MATLAB qsys_mg1_psjf.m exactly: - Exponential case: numerical integration of E[T(x)] * f_k(x) - General case: class-based truncated moment formula Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec < 0): raise ValueError("lambda and mu must be positive, cs must be non-negative") K = len(lambda_vec) rho_total = np.sum(lambda_vec / mu_vec) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") if np.all(np.abs(cs_vec - 1) < 1e-6): W = _psjf_exp(lambda_vec, mu_vec) else: W = _psjf_general(lambda_vec, mu_vec, cs_vec) Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
def _compute_psjf_response(x, lambda_arr, mu_arr, p, lambda_total): """Compute PSJF response time E[T(x)] for a job of size x. Matches MATLAB compute_psjf_response.""" K = len(lambda_arr) # Truncated first moment: integral_0^x t * f(t) dt m1_x = 0.0 for i in range(K): mu_i = mu_arr[i] int_t = 1.0 / mu_i - (1.0 / mu_i + x) * np.exp(-mu_i * x) m1_x += p[i] * int_t rho_x = lambda_total * m1_x # Truncated second moment: integral_0^x t^2 * f(t) dt m2_x = 0.0 for i in range(K): mu_i = mu_arr[i] int_t2 = 2.0 / mu_i**2 - (2.0 / mu_i**2 + 2.0 * x / mu_i + x**2) * np.exp(-mu_i * x) m2_x += p[i] * int_t2 m2_x_scaled = lambda_total * m2_x if rho_x >= 1: return np.inf return x / (1 - rho_x) + m2_x_scaled / (2 * (1 - rho_x)**2) def _psjf_exp(lambda_arr, mu_arr): """PSJF for exponential service - numerical integration. Matches MATLAB qsys_mg1_psjf_exp.""" K = len(lambda_arr) lambda_total = np.sum(lambda_arr) p = lambda_arr / lambda_total W = np.zeros(K) for k in range(K): mu_k = mu_arr[k] x_max = 20 / mu_k def integrand(x): T = _compute_psjf_response(x, lambda_arr, mu_arr, p, lambda_total) f_k = mu_k * np.exp(-mu_k * x) return T * f_k W[k], _ = quad(integrand, 0, x_max, limit=200, epsrel=1e-8, epsabs=1e-10) return W def _psjf_general(lambda_arr, mu_arr, cs_arr): """PSJF for general service - class-based formula. Matches MATLAB qsys_mg1_psjf_general.""" K = len(lambda_arr) mean_service = 1.0 / mu_arr sort_idx = np.argsort(mean_service) unsort_idx = np.argsort(sort_idx) lambda_sorted = lambda_arr[sort_idx] mu_sorted = mu_arr[sort_idx] cs_sorted = cs_arr[sort_idx] rho_i = lambda_sorted / mu_sorted W_sorted = np.zeros(K) for k in range(K): x = 1.0 / mu_sorted[k] rho_x = np.sum(rho_i[:k + 1]) m2_x = 0.0 for i in range(k + 1): E_S2_i = (1 + cs_sorted[i]**2) / mu_sorted[i]**2 m2_x += lambda_sorted[i] * E_S2_i if rho_x >= 1: W_sorted[k] = np.inf else: waiting_term = m2_x / (2 * (1 - rho_x)**2) service_term = x / (1 - rho_x) W_sorted[k] = waiting_term + service_term return W_sorted[unsort_idx]
[docs] def qsys_mg1_setf(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Shortest Expected Time First (SETF). Matches MATLAB qsys_mg1_setf.m exactly: SETF = FB/LAS + residual service time penalty (non-preemptive). Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec < 0): raise ValueError("lambda and mu must be positive, cs must be non-negative") K = len(lambda_vec) rho_i = lambda_vec / mu_vec rho_total = np.sum(rho_i) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") # Mean residual service time for the mixture distribution lambda_total = np.sum(lambda_vec) E_R = 0.0 for i in range(K): p_i = lambda_vec[i] / lambda_total E_S_i = 1.0 / mu_vec[i] E_S2_i = (1 + cs_vec[i]**2) / mu_vec[i]**2 E_R += p_i * E_S2_i / (2 * E_S_i) W = np.zeros(K) for k in range(K): x = 1.0 / mu_vec[k] rho_x = 0.0 for i in range(K): if abs(cs_vec[i] - 1) < 1e-6: integral_Fbar = (1 - np.exp(-mu_vec[i] * x)) / mu_vec[i] else: integral_Fbar = min(x, 1.0 / mu_vec[i]) rho_x += lambda_vec[i] * integral_Fbar numerator = 0.0 for i in range(K): if abs(cs_vec[i] - 1) < 1e-6: mu_i = mu_vec[i] integral_tFbar = (1 - np.exp(-mu_i * x) * (1 + mu_i * x)) / mu_i**2 else: integral_tFbar = min(x**2 / 2, 1.0 / mu_vec[i]**2) numerator += lambda_vec[i] * integral_tFbar if rho_x >= 1: W[k] = np.inf else: fb_waiting_term = numerator / (1 - rho_x)**2 fb_service_term = x / (1 - rho_x) np_penalty = E_R / (1 - rho_x) W[k] = fb_waiting_term + fb_service_term + np_penalty Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
__all__ = [ 'qsys_mg1_prio', 'qsys_mg1_srpt', 'qsys_mg1_fb', 'qsys_mg1_lrpt', 'qsys_mg1_psjf', 'qsys_mg1_setf', ]