Source code for line_solver.api.mam.mapm1ps

"""
MAP/M/1-PS Sojourn Time Distribution.

Computes the complementary distribution function of sojourn time in a
MAP/M/1 processor-sharing queue.

References:
    Masuyama, H., & Takine, T. (2003). Sojourn time distribution in a
    MAP/M/1 processor-sharing queue. Operations Research Letters, 31(6), 406-412.
"""

import numpy as np
from scipy import linalg
from typing import Union

ArrayLike = Union[np.ndarray, list]


def _compute_stationary_distribution(C: np.ndarray, D: np.ndarray) -> np.ndarray:
    """
    Compute stationary distribution of MAP.

    Solves pi * Q = 0 with pi * e = 1.

    Args:
        C: MAP C matrix (transitions without arrivals).
        D: MAP D matrix (transitions with arrivals).

    Returns:
        Stationary distribution as row vector.
    """
    M = C.shape[0]
    Q = C + D

    # Solve pi * Q = 0 with pi * e = 1
    # Convert to Q^T * pi^T = 0, replace last equation with sum = 1
    A = Q.T.copy()

    # Replace last row with normalization constraint
    A[-1, :] = 1.0

    b = np.zeros(M)
    b[-1] = 1.0

    # Solve system
    pi_T = linalg.solve(A, b)
    return pi_T.reshape(1, -1)


def _compute_r_matrix(C: np.ndarray, D: np.ndarray, mu: float) -> np.ndarray:
    """
    Compute R matrix (minimal nonnegative solution).

    Solves: D + R(C - mu*I) + mu*R^2 = 0

    Args:
        C: MAP C matrix.
        D: MAP D matrix.
        mu: Service rate.

    Returns:
        R matrix.
    """
    M = C.shape[0]

    if M == 1:
        # Scalar case: use quadratic formula
        # mu*R^2 + (C - mu)*R + D = 0
        a = mu
        b = C[0, 0] - mu
        c = D[0, 0]

        discriminant = b * b - 4 * a * c
        if discriminant < 0:
            raise ValueError("No real solution for R matrix")

        R1 = (-b - np.sqrt(discriminant)) / (2 * a)
        R2 = (-b + np.sqrt(discriminant)) / (2 * a)

        # Choose minimal nonnegative solution
        if R1 >= 0 and R1 < 1:
            Rval = R1
        elif R2 >= 0 and R2 < 1:
            Rval = R2
        else:
            raise ValueError("No valid R in [0, 1)")

        return np.array([[Rval]])

    # Matrix case: Newton iteration
    I = np.eye(M)
    CminusMuI = C - mu * I

    # Initial guess: R = -D * inv(C - mu*I)
    R = -D @ linalg.inv(CminusMuI)

    max_iter = 1000
    tol = 1e-12

    for _ in range(max_iter):
        Rold = R.copy()

        # F(R) = D + R(C - mu*I) + mu*R^2
        F = D + R @ CminusMuI + mu * R @ R

        # F'(R) = (C - mu*I) + 2*mu*R
        Fprime = CminusMuI + 2 * mu * R

        # Newton step: R_new = R_old - F(R) * inv(F'(R))
        R = R - F @ linalg.inv(Fprime)

        # Check convergence
        if np.max(np.abs(R - Rold)) < tol:
            break

    # Ensure non-negativity
    R = np.maximum(R, 0.0)

    return R


def _determine_n_epsilon(
    pi0: np.ndarray,
    R: np.ndarray,
    D: np.ndarray,
    lam: float,
    epsilon: float
) -> int:
    """
    Determine N(epsilon) - minimum n such that truncated sum > 1 - epsilon.
    """
    M = R.shape[0]
    e = np.ones((M, 1))
    cumsum_prob = 0.0
    Rpower = np.eye(M)

    for n in range(1001):
        cumsum_prob += (1.0 / lam) * (pi0 @ Rpower @ D @ e).item()
        if cumsum_prob > 1 - epsilon:
            return n
        Rpower = Rpower @ R

    return 100  # Default fallback


def _poisson_pmf(k: int, lam: float) -> float:
    """
    Compute Poisson PMF using log-space computation to avoid overflow.
    """
    if lam == 0.0:
        return 1.0 if k == 0 else 0.0

    # P(k; lambda) = exp(k*log(lambda) - lambda - log(k!))
    log_prob = k * np.log(lam) - lam

    # Subtract log(k!)
    for i in range(1, k + 1):
        log_prob -= np.log(float(i))

    return np.exp(log_prob)


def _compute_h_recursive(
    C: np.ndarray,
    D: np.ndarray,
    mu: float,
    N: int,
    K: int,
    theta: float
) -> list:
    """
    Compute h_{n,k} recursively using Algorithm from Theorem 1.

    h_{n,0} = e for all n
    h_{n,k+1} = 1/(theta+mu) * [n*mu/(n+1) h_{n-1,k} + (theta*I + C)h_{n,k} + D h_{n+1,k}]

    Returns:
        2D list h[n][k] of (M,1) arrays.
    """
    M = C.shape[0]
    I = np.eye(M)
    e = np.ones((M, 1))

    theta_plus_mu = theta + mu
    thetaI_plus_C = theta * I + C

    # Initialize h array: h[n][k] where n in [0,N], k in [0,K]
    h = [[np.zeros((M, 1)) for _ in range(K + 1)] for _ in range(N + 1)]

    # Base case: h_{n,0} = e for all n
    for n in range(N + 1):
        h[n][0] = e.copy()

    # Recursive computation: fill column by column (increasing k)
    for k in range(K):
        for n in range(N + 1):
            term1 = np.zeros((M, 1))
            term2 = thetaI_plus_C @ h[n][k]
            term3 = np.zeros((M, 1))

            # First term: n*mu/(n+1) * h_{n-1,k}
            if n > 0:
                term1 = (n * mu / (n + 1)) * h[n - 1][k]

            # Third term: D * h_{n+1,k}
            if n < N:
                term3 = D @ h[n + 1][k]

            h[n][k + 1] = (term1 + term2 + term3) / theta_plus_mu

    return h


[docs] def map_m1ps_cdf_respt( C: ArrayLike, D: ArrayLike, mu: float, x: ArrayLike, epsilon: float = 1e-11, epsilon_prime: float = 1e-10 ) -> np.ndarray: """ Compute complementary sojourn time CDF for MAP/M/1-PS queue. Based on: Masuyama, H., & Takine, T. (2003). Sojourn time distribution in a MAP/M/1 processor-sharing queue. Args: C: MAP C matrix (transitions without arrivals). D: MAP D matrix (transitions with arrivals). mu: Service rate (must be > 0). x: Time points for CDF evaluation. epsilon: Queue length truncation parameter. epsilon_prime: Uniformization truncation parameter. Returns: Complementary CDF values W_bar(x) = Pr[W > x] at each point in x. Example: >>> # Exponential arrivals (Poisson process with rate lambda=0.5) >>> C = np.array([[-0.5]]) >>> D = np.array([[0.5]]) >>> mu = 1.0 >>> x = np.array([0.0, 0.5, 1.0, 2.0, 5.0]) >>> cdf = map_m1ps_cdf_respt(C, D, mu, x) """ C = np.asarray(C, dtype=np.float64) D = np.asarray(D, dtype=np.float64) x = np.asarray(x, dtype=np.float64).ravel() M = C.shape[0] if C.shape != (M, M): raise ValueError("C must be square") if D.shape != (M, M): raise ValueError("C and D must have same dimensions") if mu <= 0: raise ValueError("Service rate mu must be positive") # Compute stationary distribution pi of underlying Markov chain pi = _compute_stationary_distribution(C, D) # Compute mean arrival rate lambda e = np.ones((M, 1)) lam = (pi @ D @ e).item() # Check stability condition rho = lam / mu if rho >= 1.0: raise ValueError(f"System is unstable (rho = {rho} >= 1)") # Compute R matrix R = _compute_r_matrix(C, D, mu) # Compute pi_0 = pi * (I - R) I = np.eye(M) pi0 = pi @ (I - R) # Determine N(epsilon) - queue length truncation N_epsilon = _determine_n_epsilon(pi0, R, D, lam, epsilon) # Compute theta (uniformization parameter) theta = np.max(np.abs(np.diag(C))) # Allocate result array result = np.zeros(len(x)) # Main computation loop for each x value for idx, x_val in enumerate(x): # Determine K_max for uniformization theta_plus_mu = theta + mu mean_val = theta_plus_mu * x_val if mean_val > 0: L = max(0, int(np.floor(mean_val - 10 * np.sqrt(mean_val)))) K_max = int(np.ceil(mean_val + 10 * np.sqrt(mean_val))) else: L = 0 K_max = 0 # Compute h_{n,k} recursively h = _compute_h_recursive(C, D, mu, N_epsilon, K_max, theta) # Compute W_bar(x) = (1/lambda) * sum over n of pi0 * R^n * D * sum over k of Poisson * h_{n,k} W_bar = 0.0 Rpower = np.eye(M) for n in range(N_epsilon + 1): # Compute weight = pi0 * R^n * D weight = pi0 @ Rpower @ D # Compute sum over k sum_k = np.zeros((M, 1)) for k in range(L, K_max + 1): poisson_term = _poisson_pmf(k, mean_val) if k <= len(h[n]) - 1: sum_k = sum_k + poisson_term * h[n][k] W_bar += (1.0 / lam) * (weight @ sum_k).item() Rpower = Rpower @ R result[idx] = W_bar return result
[docs] def map_compute_R(C: ArrayLike, D: ArrayLike, mu: float) -> np.ndarray: """ Compute rate matrix R for MAP/M/1 queue. Computes the minimal nonnegative solution of the matrix equation: D + R(C - mu*I) + mu*R^2 = 0 This matrix is used in the analysis of MAP/M/1 queues based on quasi-birth-death processes. Args: C: M x M matrix governing MAP transitions without arrivals D: M x M matrix governing MAP transitions with arrivals mu: Service rate (scalar) Returns: M x M rate matrix (minimal nonnegative solution) References: Original MATLAB: matlab/src/api/map/map_compute_R.m Masuyama, H., & Takine, T. (2003). Sojourn time distribution in a MAP/M/1 processor-sharing queue. """ C = np.asarray(C, dtype=np.float64) D = np.asarray(D, dtype=np.float64) return _compute_r_matrix(C, D, mu)
[docs] def map_m1ps_h_recursive(C: ArrayLike, D: ArrayLike, mu: float, N: int, K: int) -> list: """ Recursive computation of h_{n,k} coefficients for MAP/M/1-PS. The h_{n,k} vectors satisfy the recursion (Theorem 1 in paper): h_{n,0} = e (vector of ones), for n = 0, 1, ... h_{n,k+1} = 1/(theta+mu) * [n*mu/(n+1) * h_{n-1,k} + (theta*I + C) * h_{n,k} + D * h_{n+1,k}] where h_{-1,k} = 0 for all k Args: C: M x M matrix governing MAP transitions without arrivals D: M x M matrix governing MAP transitions with arrivals mu: Service rate (scalar) N: Maximum value of n to compute (determines rows) K: Maximum value of k to compute (determines columns) Returns: 2D list h[n][k] of (M,1) arrays containing h_{n,k} References: Original MATLAB: matlab/src/api/map/map_m1ps_h_recursive.m Masuyama, H., & Takine, T. (2003). """ C = np.asarray(C, dtype=np.float64) D = np.asarray(D, dtype=np.float64) theta = np.max(np.abs(np.diag(C))) return _compute_h_recursive(C, D, mu, N, K, theta)
[docs] def map_m1ps_sojourn(C: ArrayLike, D: ArrayLike, mu: float, x: ArrayLike, epsilon: float = 1e-11, epsilon_prime: float = 1e-10) -> np.ndarray: """ Compute sojourn time distribution in MAP/M/1-PS queue. Alias for map_m1ps_cdf_respt. Args: C: MAP C matrix (transitions without arrivals) D: MAP D matrix (transitions with arrivals) mu: Service rate (must be > 0) x: Time points for CDF evaluation epsilon: Queue length truncation parameter epsilon_prime: Uniformization truncation parameter Returns: Complementary CDF values W_bar(x) = Pr[W > x] References: Original MATLAB: matlab/src/api/map/map_m1ps_sojourn.m """ return map_m1ps_cdf_respt(C, D, mu, x, epsilon, epsilon_prime)
__all__ = [ 'map_m1ps_cdf_respt', 'map_compute_R', 'map_m1ps_h_recursive', 'map_m1ps_sojourn', ]