"""
PH/M/c queue analysis (phase-type inter-arrivals, exponential service, c servers).
Solves the GI/M/c QBD via Neuts' matrix-geometric method:
R^2 A2 + R A1 + A0 = 0
where
A0 = (-T 1) alpha, A1 = T - c mu I, A2 = c mu I.
The stationary level-n vector follows pi_n = pi_c R^{n-c} for n >= c, with
boundary states pi_0..pi_c determined by linear balance + normalization.
Returns time-average performance metrics.
"""
from typing import Dict
import numpy as np
[docs]
def qsys_phmc(alpha, T, mu: float, c: int, max_iter: int = 50000, tol: float = 1e-14) -> Dict:
"""Solve PH/M/c via matrix-geometric.
Args:
alpha: PH entry probability vector (length k).
T: PH sub-generator matrix (k x k), row sums <= 0.
mu: Exponential service rate per server (>0).
c: Number of servers (>=1).
max_iter: Maximum iterations for R fixed-point.
tol: Convergence tolerance for R.
Returns:
dict with mean_queue_length, mean_waiting_queue, mean_waiting_time,
mean_sojourn_time, utilization.
"""
alpha_row = np.asarray(alpha, dtype=float).flatten()
Tm = np.asarray(T, dtype=float)
if Tm.ndim != 2 or Tm.shape[0] != Tm.shape[1]:
raise ValueError("T must be square")
k = Tm.shape[0]
if alpha_row.size != k:
raise ValueError("alpha length must match T dimension")
if mu <= 0:
raise ValueError("Service rate mu must be positive")
if c < 1:
raise ValueError("Number of servers c must be >= 1")
ones_k = np.ones(k)
t_vec = -Tm @ ones_k
D1 = np.outer(t_vec, alpha_row)
try:
mean_ia = float(alpha_row @ np.linalg.solve(-Tm, ones_k))
except np.linalg.LinAlgError as e:
raise ValueError(f"PH sub-generator T is singular: {e}")
if mean_ia <= 0:
raise ValueError(f"Non-positive mean inter-arrival: {mean_ia}")
lam = 1.0 / mean_ia
rho = lam / (c * mu)
if rho >= 1.0 - 1e-12:
raise ValueError(f"Load rho={rho} must be strictly less than 1")
A0 = D1
A1 = Tm - c * mu * np.eye(k)
A2 = c * mu * np.eye(k)
R = np.zeros((k, k))
for _ in range(max_iter):
try:
inv_term = np.linalg.inv(A1 + R @ A2)
except np.linalg.LinAlgError:
break
R_new = -A0 @ inv_term
if np.max(np.abs(R_new - R)) < tol:
R = R_new
break
R = R_new
# Boundary linear system: variables pi_0, pi_1, ..., pi_c (each row vec dim k)
n_var = (c + 1) * k
M = np.zeros((n_var, n_var))
def block_col(n: int) -> int:
return n * k
# Level 0 balance: pi_0 T + pi_1 (mu I) = 0
for j in range(k):
for i in range(k):
M[j, block_col(0) + i] += Tm[i, j]
M[j, block_col(1) + j] += mu
# Levels 1 .. c-1 balance: pi_{n-1} D1 + pi_n (T - n mu I) + pi_{n+1} ((n+1) mu I) = 0
for n in range(1, c):
eqrow_base = n * k
A1n = Tm - n * mu * np.eye(k)
for j in range(k):
row = eqrow_base + j
for i in range(k):
M[row, block_col(n - 1) + i] += D1[i, j]
M[row, block_col(n) + i] += A1n[i, j]
M[row, block_col(n + 1) + j] += (n + 1) * mu
# Level c balance: pi_{c-1} D1 + pi_c (T - c mu I + R (c mu I)) = 0
A1c_plus_RA2 = Tm - c * mu * np.eye(k) + R * (c * mu)
eqrow_base = c * k
for j in range(k):
row = eqrow_base + j
for i in range(k):
M[row, block_col(c - 1) + i] += D1[i, j]
M[row, block_col(c) + i] += A1c_plus_RA2[i, j]
# Replace last row with normalization
IR = np.eye(k) - R
try:
sum_geom = np.linalg.solve(IR, ones_k)
except np.linalg.LinAlgError as e:
raise ValueError(f"I - R is singular (rho={rho}): {e}")
norm_row = np.zeros(n_var)
for n in range(c):
for i in range(k):
norm_row[block_col(n) + i] = 1.0
for i in range(k):
norm_row[block_col(c) + i] = sum_geom[i]
M[-1, :] = norm_row
b = np.zeros(n_var)
b[-1] = 1.0
x = np.linalg.solve(M, b)
pis = [x[n * k:(n + 1) * k] for n in range(c + 1)]
pi_c = pis[c]
IR_inv = np.linalg.inv(IR)
IR_inv2 = IR_inv @ IR_inv
Lq = float(pi_c @ R @ IR_inv2 @ ones_k)
L_bulk = float(pi_c @ (c * IR_inv + R @ IR_inv2) @ ones_k)
L = sum(n * pis[n].sum() for n in range(c)) + L_bulk
Wq = Lq / lam
W = Wq + 1.0 / mu
return {
"mean_queue_length": L,
"mean_waiting_queue": Lq,
"mean_waiting_time": Wq,
"mean_sojourn_time": W,
"utilization": rho,
}