Source code for line_solver.api.qsys.scheduling

"""
M/G/1 Queue Scheduling Discipline Analysis.

Native Python implementations of analytical formulas for M/G/1 queues
with various scheduling disciplines including priority, SRPT, feedback, etc.

Key functions:
    qsys_mg1_prio: Non-preemptive (Head-of-Line) priority scheduling
    qsys_mg1_srpt: Shortest Remaining Processing Time scheduling
    qsys_mg1_fb: Foreground-Background (LAS) scheduling
    qsys_mg1_lrpt: Longest Remaining Processing Time scheduling
    qsys_mg1_psjf: Preemptive Shortest Job First
    qsys_mg1_setf: Shortest Expected Time First

References:
    Original MATLAB: matlab/src/api/qsys/qsys_mg1_*.m
    Kleinrock, "Queueing Systems, Volume I: Theory", 1975
"""

import numpy as np
from typing import Tuple


[docs] def qsys_mg1_prio(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with non-preemptive (Head-of-Line) priorities. Computes per-class mean waiting times for multiple priority classes using the Pollaczek-Khinchine formula extended for priorities. For K priority classes (class 1 = highest priority), the waiting time for class k is: W_k = (B_0 / (1 - sum_{i=1}^{k-1} rho_i)) * (1 / (1 - sum_{i=1}^{k} rho_i)) Args: lambda_vec: Vector of arrival rates per priority class (class 1 = highest) mu_vec: Vector of service rates per priority class cs_vec: Vector of coefficients of variation per priority class Returns: Tuple of (W, rho): W: Vector of mean response times per priority class rho: System utilization (rhohat = Q/(1+Q) format) Raises: ValueError: If inputs have different lengths or are not positive ValueError: If system is unstable (utilization >= 1) References: Original MATLAB: matlab/src/api/qsys/qsys_mg1_prio.m Kleinrock, "Queueing Systems, Volume I: Theory", Section 3.5 """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() # Validate inputs if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec <= 0): raise ValueError("lambda, mu, and cs must all be positive") # Per-class utilizations rho_i = lambda_vec / mu_vec # Overall utilization rho_total = np.sum(rho_i) # Stability check if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") # Mean second moment of service time # B_0 = sum_i lambda_i * E[S_i^2] / 2 # where E[S_i^2] = (1 + cs_i^2) / mu_i^2 B_0 = np.sum(lambda_vec * (1 + cs_vec ** 2) / (mu_vec ** 2)) / 2 # Compute per-class waiting times K = len(lambda_vec) W_q = np.zeros(K) for k in range(K): # Cumulative utilization for higher priority classes rho_prev = np.sum(rho_i[:k]) if k > 0 else 0.0 # Cumulative utilization including current class rho_curr = np.sum(rho_i[:k + 1]) # Waiting time in queue for class k W_q[k] = B_0 / ((1 - rho_prev) * (1 - rho_curr)) # Response time = waiting time + service time W = W_q + 1.0 / mu_vec # Compute rhohat = Q/(1+Q) for consistency Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
[docs] def qsys_mg1_srpt(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Shortest Remaining Processing Time (SRPT). For SRPT, smaller jobs have preemptive priority over larger jobs. When all service time distributions are exponential (cs=1), SRPT reduces to preemptive priority based on service rates. Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class (sorted by original order) rho: System utilization (rhohat format) References: Original MATLAB: matlab/src/api/qsys/qsys_mg1_srpt.m Schrage and Miller, "The queue M/G/1 with the shortest remaining processing time discipline", Operations Research, 1966 """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() # Validate inputs if not (len(lambda_vec) == len(mu_vec) == len(cs_vec)): raise ValueError("lambda, mu, and cs must have the same length") if np.any(lambda_vec <= 0) or np.any(mu_vec <= 0) or np.any(cs_vec < 0): raise ValueError("lambda and mu must be positive, cs must be non-negative") K = len(lambda_vec) # Mean service times per class mean_service = 1.0 / mu_vec # Sort classes by mean service time (ascending) for SRPT priority sort_idx = np.argsort(mean_service) unsort_idx = np.argsort(sort_idx) # Reorder parameters lambda_sorted = lambda_vec[sort_idx] mu_sorted = mu_vec[sort_idx] cs_sorted = cs_vec[sort_idx] # Per-class utilizations rho_i = lambda_sorted / mu_sorted rho_total = np.sum(rho_i) # Stability check if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") # Check if all exponential (SRPT = preemptive priority for exponential) is_exponential = np.all(np.abs(cs_sorted - 1) < 1e-10) if is_exponential or K == 1: # Use preemptive priority formula W_sorted = _mg1_srpt_exp(lambda_sorted, mu_sorted) else: # General case - use approximation W_sorted = _mg1_srpt_general(lambda_sorted, mu_sorted, cs_sorted) # Restore original ordering W = W_sorted[unsort_idx] # Compute rhohat Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
def _mg1_srpt_exp(lambda_vec: np.ndarray, mu_vec: np.ndarray) -> np.ndarray: """SRPT for exponential service - uses preemptive priority formula.""" K = len(lambda_vec) rho_i = lambda_vec / mu_vec W = np.zeros(K) for k in range(K): rho_prev = np.sum(rho_i[:k]) if k > 0 else 0.0 rho_curr = np.sum(rho_i[:k + 1]) # Cumulative residual service from classes 1 to k E_R_k = np.sum(lambda_vec[:k + 1] / (mu_vec[:k + 1] ** 2)) # Waiting time (preemptive priority) W_q = E_R_k / ((1 - rho_prev) * (1 - rho_curr)) # Response time W[k] = W_q + 1.0 / mu_vec[k] return W def _mg1_srpt_general(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> np.ndarray: """SRPT for general service - uses numerical approximation.""" K = len(lambda_vec) W = np.zeros(K) for k in range(K): x = 1.0 / mu_vec[k] # Mean service time for class k # Truncated load smaller_idx = (1.0 / mu_vec) <= x rho_x = np.sum(lambda_vec[smaller_idx] / mu_vec[smaller_idx]) if rho_x >= 1: W[k] = np.inf continue # Numerator integral approximation numerator = np.sum(lambda_vec[smaller_idx] / (mu_vec[smaller_idx] ** 2)) # Second integral (numerical quadrature) n_steps = 1000 dt = x / n_steps second_term = 0.0 for step in range(1, n_steps + 1): t = (step - 0.5) * dt rho_t = 0.0 for i in range(K): # Truncated load at time t rho_t += lambda_vec[i] / mu_vec[i] * (1 - np.exp(-mu_vec[i] * t) * (1 + mu_vec[i] * t)) if rho_t < 1: second_term += dt / (1 - rho_t) # Schrage-Miller formula W[k] = numerator / (1 - rho_x) ** 2 + second_term return W
[docs] def qsys_mg1_fb(lambda_val: float, mu_val: float, cs_val: float) -> Tuple[float, float]: """ Analyze M/G/1 queue with Foreground-Background (FB/LAS) scheduling. In FB scheduling, the job with the least attained service gets priority. This is also known as Least Attained Service (LAS) scheduling. For FB, the conditional response time for a job of size x is: E[T|x] = x / (1 - rho) for exponential service E[T|x] = integral formula for general service Args: lambda_val: Arrival rate mu_val: Service rate cs_val: Coefficient of variation of service time Returns: Tuple of (W, rho): W: Mean response time rho: System utilization (rhohat format) References: Original MATLAB: matlab/src/api/qsys/qsys_mg1_fb.m Kleinrock, "Queueing Systems, Volume II" """ rho = lambda_val / mu_val if rho >= 1: raise ValueError(f"System is unstable: utilization rho = {rho:.4g} >= 1") # For exponential service (cs = 1), FB and FCFS have same mean response time # For general service, FB favors small jobs if abs(cs_val - 1.0) < 1e-10: # Exponential case W = 1.0 / (mu_val * (1 - rho)) else: # General case - approximation # E[T] = E[S] / (1 - rho) * (1 + correction term based on cs) mean_S = 1.0 / mu_val var_S = (cs_val ** 2) / (mu_val ** 2) # Use weighted approximation W = mean_S / (1 - rho) # Correction for variability (FB benefits from low variability) if cs_val < 1: W *= (1 + cs_val ** 2) / 2 else: W *= (1 + (cs_val ** 2 - 1) / (2 * (1 + rho))) Q = lambda_val * W rho_hat = Q / (1 + Q) return W, rho_hat
[docs] def qsys_mg1_lrpt(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Longest Remaining Processing Time (LRPT). LRPT gives preemptive priority to jobs with the longest remaining processing time. This is the opposite of SRPT. Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) References: Original MATLAB: matlab/src/api/qsys/qsys_mg1_lrpt.m """ lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() K = len(lambda_vec) mean_service = 1.0 / mu_vec # Sort by mean service time descending (longest first) sort_idx = np.argsort(-mean_service) unsort_idx = np.argsort(sort_idx) lambda_sorted = lambda_vec[sort_idx] mu_sorted = mu_vec[sort_idx] # Use preemptive priority formula with reversed ordering rho_i = lambda_sorted / mu_sorted rho_total = np.sum(rho_i) if rho_total >= 1: raise ValueError(f"System is unstable: utilization rho = {rho_total:.4g} >= 1") W_sorted = _mg1_srpt_exp(lambda_sorted, mu_sorted) W = W_sorted[unsort_idx] Q = np.sum(lambda_vec * W) rho_hat = Q / (1 + Q) return W, rho_hat
[docs] def qsys_mg1_psjf(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Preemptive Shortest Job First (PSJF). PSJF gives preemptive priority to the job with the smallest expected total service time (not remaining time). For exponential service, PSJF equals SRPT. Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) References: Original MATLAB: matlab/src/api/qsys/qsys_mg1_psjf.m """ # For PSJF with known job sizes, it's equivalent to SRPT for classification return qsys_mg1_srpt(lambda_vec, mu_vec, cs_vec)
[docs] def qsys_mg1_setf(lambda_vec: np.ndarray, mu_vec: np.ndarray, cs_vec: np.ndarray) -> Tuple[np.ndarray, float]: """ Analyze M/G/1 queue with Shortest Expected Time First (SETF). SETF is non-preemptive and prioritizes based on expected service time. Args: lambda_vec: Vector of arrival rates per class mu_vec: Vector of service rates per class cs_vec: Vector of coefficients of variation per class Returns: Tuple of (W, rho): W: Vector of mean response times per class rho: System utilization (rhohat format) References: Original MATLAB: matlab/src/api/qsys/qsys_mg1_setf.m """ # SETF with classes = non-preemptive priority by expected service time # Sort by mean service time and use HOL priority formula lambda_vec = np.asarray(lambda_vec, dtype=float).flatten() mu_vec = np.asarray(mu_vec, dtype=float).flatten() cs_vec = np.asarray(cs_vec, dtype=float).flatten() mean_service = 1.0 / mu_vec sort_idx = np.argsort(mean_service) unsort_idx = np.argsort(sort_idx) W_sorted, rho = qsys_mg1_prio( lambda_vec[sort_idx], mu_vec[sort_idx], cs_vec[sort_idx] ) W = W_sorted[unsort_idx] return W, rho
__all__ = [ 'qsys_mg1_prio', 'qsys_mg1_srpt', 'qsys_mg1_fb', 'qsys_mg1_lrpt', 'qsys_mg1_psjf', 'qsys_mg1_setf', ]