"""
Quasi-Birth-Death (QBD) Process Utilities.
Native Python implementations for QBD matrix computations including
rate matrix R computation and QBD block construction.
Key algorithms:
qbd_R: Rate matrix via successive substitutions
qbd_R_logred: Rate matrix via logarithmic reduction
qbd_rg: Compute R and G matrices
References:
Original MATLAB: matlab/src/api/mam/qbd_*.m
Latouche & Ramaswami, "Introduction to Matrix Analytic Methods
in Stochastic Modeling", 1999
"""
import numpy as np
from numpy.linalg import LinAlgError
from scipy import linalg
from typing import Tuple, Optional
from dataclasses import dataclass
[docs]
@dataclass
class QBDResult:
"""Result of QBD analysis."""
R: np.ndarray # Rate matrix R
G: Optional[np.ndarray] = None # Rate matrix G
U: Optional[np.ndarray] = None # U matrix
eta: Optional[float] = None # Caudal characteristic
[docs]
def qbd_R(B: np.ndarray, L: np.ndarray, F: np.ndarray,
iter_max: int = 100000, tol: float = 1e-12) -> np.ndarray:
"""
Compute QBD rate matrix R using successive substitutions.
Solves the matrix quadratic equation:
R^2 * A_{-1} + R * A_0 + A_1 = 0
where A_{-1} = B, A_0 = L, A_1 = F.
Args:
B: Backward transition block A_{-1}
L: Local transition block A_0
F: Forward transition block A_1
iter_max: Maximum iterations (default: 100000)
tol: Convergence tolerance (default: 1e-12)
Returns:
Rate matrix R
References:
Original MATLAB: matlab/src/api/mam/qbd_R.m
"""
B = np.asarray(B, dtype=np.float64)
L = np.asarray(L, dtype=np.float64)
F = np.asarray(F, dtype=np.float64)
try:
L_inv = linalg.inv(L)
except LinAlgError:
L_inv = linalg.pinv(L)
Fil = F @ L_inv
BiL = B @ L_inv
R = -Fil
Rprime = -Fil - R @ R @ BiL
for _ in range(iter_max):
R = Rprime
Rprime = -Fil - R @ R @ BiL
if linalg.norm(R - Rprime, 1) <= tol:
break
return Rprime
[docs]
def qbd_R_logred(B: np.ndarray, L: np.ndarray, F: np.ndarray,
iter_max: int = 1000, tol: float = 1e-14) -> np.ndarray:
"""
Compute QBD rate matrix R using logarithmic reduction.
Uses the logarithmic reduction algorithm which has quadratic
convergence compared to linear convergence of successive substitutions.
Args:
B: Backward transition block A_{-1}
L: Local transition block A_0
F: Forward transition block A_1
iter_max: Maximum iterations (default: 1000)
tol: Convergence tolerance (default: 1e-14)
Returns:
Rate matrix R
References:
Original MATLAB: matlab/src/api/mam/qbd_R_logred.m
Latouche & Ramaswami, Ch. 8
"""
B = np.asarray(B, dtype=np.float64)
L = np.asarray(L, dtype=np.float64)
F = np.asarray(F, dtype=np.float64)
n = L.shape[0]
try:
L_inv = linalg.inv(-L)
except LinAlgError:
L_inv = linalg.pinv(-L)
A = L_inv @ F # A_1
C = L_inv @ B # A_{-1}
H = A.copy()
T = C.copy()
for _ in range(iter_max):
# Compute (I - AC - CA)^{-1}
I = np.eye(n)
M = I - A @ C - C @ A
try:
M_inv = linalg.inv(M)
except LinAlgError:
M_inv = linalg.pinv(M)
# Update
A_new = M_inv @ A @ A
C_new = M_inv @ C @ C
H_new = H + T @ M_inv @ A
if linalg.norm(H_new - H, 1) <= tol:
H = H_new
break
A = A_new
C = C_new
T = T @ M_inv @ (A + C)
H = H_new
# R = H * (-L)
R = H @ (-L)
return R
[docs]
def qbd_rg(B: np.ndarray, L: np.ndarray, F: np.ndarray,
method: str = 'logred', iter_max: int = 1000,
tol: float = 1e-14) -> QBDResult:
"""
Compute both R and G matrices for a QBD process.
G is the minimal non-negative solution to:
A_1 * G^2 + A_0 * G + A_{-1} = 0
R is the minimal non-negative solution to:
R^2 * A_{-1} + R * A_0 + A_1 = 0
Args:
B: Backward transition block A_{-1}
L: Local transition block A_0
F: Forward transition block A_1
method: 'logred' or 'successive' (default: 'logred')
iter_max: Maximum iterations
tol: Convergence tolerance
Returns:
QBDResult with R, G, U, and eta (caudal characteristic)
References:
Original MATLAB: matlab/src/api/mam/qbd_rg.m
"""
B = np.asarray(B, dtype=np.float64)
L = np.asarray(L, dtype=np.float64)
F = np.asarray(F, dtype=np.float64)
n = L.shape[0]
# Compute R
if method == 'logred':
R = qbd_R_logred(B, L, F, iter_max, tol)
else:
R = qbd_R(B, L, F, iter_max, tol)
# Compute G using the relation: G = A_{-1} * (R*A_{-1} + A_0)^{-1}
# Or alternatively iterate: G_{n+1} = -(A_0 + A_1*G_n^2)^{-1} * A_{-1}
try:
L_inv = linalg.inv(-L)
except LinAlgError:
L_inv = linalg.pinv(-L)
A = L_inv @ F
C = L_inv @ B
# Use logarithmic reduction for G
G = C.copy()
T = A.copy()
for _ in range(iter_max):
I = np.eye(n)
M = I - A @ C - C @ A
try:
M_inv = linalg.inv(M)
except LinAlgError:
M_inv = linalg.pinv(M)
A_new = M_inv @ A @ A
C_new = M_inv @ C @ C
G_new = G + T @ M_inv @ C
if linalg.norm(G_new - G, 1) <= tol:
G = G_new
break
A = A_new
C = C_new
T = T @ M_inv @ (A + C)
G = G_new
G = G @ (-L)
# Compute U = A_0 + A_1 * G
U = L + F @ G
# Caudal characteristic (spectral radius of R)
try:
eigvals = linalg.eigvals(R)
eta = np.max(np.abs(eigvals))
except:
eta = None
return QBDResult(R=R, G=G, U=U, eta=eta)
[docs]
def qbd_blocks_mapmap1(D0_arr: np.ndarray, D1_arr: np.ndarray,
D0_srv: np.ndarray, D1_srv: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Construct QBD blocks for a MAP/MAP/1 queue.
Builds the backward (B), local (L), and forward (F) transition
blocks for the QBD representation of a MAP/MAP/1 queue.
Args:
D0_arr: Arrival MAP D0 matrix
D1_arr: Arrival MAP D1 matrix
D0_srv: Service MAP D0 matrix
D1_srv: Service MAP D1 matrix
Returns:
Tuple of (B, L, F) QBD blocks
References:
Original MATLAB: matlab/src/api/mam/qbd_mapmap1.m
"""
D0_arr = np.asarray(D0_arr, dtype=np.float64)
D1_arr = np.asarray(D1_arr, dtype=np.float64)
D0_srv = np.asarray(D0_srv, dtype=np.float64)
D1_srv = np.asarray(D1_srv, dtype=np.float64)
na = D0_arr.shape[0]
ns = D0_srv.shape[0]
# Forward transitions (arrivals): F = D1_arr \otimes I_ns
F = np.kron(D1_arr, np.eye(ns))
# Backward transitions (service completions): B = I_na \otimes D1_srv
B = np.kron(np.eye(na), D1_srv)
# Local transitions: L = D0_arr \otimes I_ns + I_na \otimes D0_srv
L = np.kron(D0_arr, np.eye(ns)) + np.kron(np.eye(na), D0_srv)
return B, L, F
[docs]
def qbd_bmapbmap1(
MAPa: Tuple[np.ndarray, np.ndarray],
pbatcha: np.ndarray,
MAPs: Tuple[np.ndarray, np.ndarray]
) -> Tuple[np.ndarray, np.ndarray, list, np.ndarray, list]:
"""
Compute QBD blocks for a BMAP/BMAP/1 queue.
Constructs the QBD (Quasi-Birth-Death) transition blocks for a
BMAP/BMAP/1 queue with batch arrivals.
Args:
MAPa: Arrival process MAP as (D0, D1)
pbatcha: Probability distribution of batch sizes (array of length maxbatch)
MAPs: Service process MAP as (D0, D1)
Returns:
Tuple of (A0, A_1, A1_list, B0, B1_list) where:
A0: Local transition block
A_1: Downward transition block
A1_list: List of upward transition blocks for each batch size
B0: Initial boundary local block
B1_list: List of boundary upward blocks for each batch size
References:
Original MATLAB: matlab/src/api/mam/qbd_bmapbmap1.m
"""
D0_arr, D1_arr = MAPa
D0_srv, D1_srv = MAPs
D0_arr = np.asarray(D0_arr, dtype=np.float64)
D1_arr = np.asarray(D1_arr, dtype=np.float64)
D0_srv = np.asarray(D0_srv, dtype=np.float64)
D1_srv = np.asarray(D1_srv, dtype=np.float64)
pbatcha = np.asarray(pbatcha, dtype=np.float64)
na = D0_arr.shape[0]
ns = D0_srv.shape[0]
maxbatch = len(pbatcha)
# Build upward transition blocks for each batch size
A1_list = []
for b in range(maxbatch):
A1_b = np.kron(D1_arr * pbatcha[b], np.eye(ns))
A1_list.append(A1_b)
# Local transitions: A0 = D0_arr \otimes I_ns + I_na \otimes D0_srv (Kronecker sum)
A0 = np.kron(D0_arr, np.eye(ns)) + np.kron(np.eye(na), D0_srv)
# Downward transitions: A_1 = I_na \otimes D1_srv
A_1 = np.kron(np.eye(na), D1_srv)
# Boundary blocks (for level 0)
B0 = np.kron(D0_arr, np.eye(ns))
B1_list = []
for b in range(maxbatch):
B1_b = np.kron(D1_arr * pbatcha[b], np.eye(ns))
B1_list.append(B1_b)
return A0, A_1, A1_list, B0, B1_list
[docs]
def qbd_mapmap1(
MAPa: Tuple[np.ndarray, np.ndarray],
MAPs: Tuple[np.ndarray, np.ndarray],
util: Optional[float] = None
) -> Tuple[float, float, float, np.ndarray, np.ndarray, Optional[float],
np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray,
Tuple[np.ndarray, np.ndarray]]:
"""
Analyze a MAP/MAP/1 queue using QBD methods.
Solves a MAP/MAP/1 queue using Quasi-Birth-Death process methods,
computing throughput, queue length, utilization, and other metrics.
Args:
MAPa: Arrival process MAP as (D0, D1)
MAPs: Service process MAP as (D0, D1)
util: Optional target utilization to scale service rate
Returns:
Tuple of (XN, QN, UN, pqueue, R, eta, G, A_1, A0, A1, U, MAPs_scaled)
where:
XN: System throughput
QN: Mean queue length
UN: Utilization
pqueue: Queue length distribution
R: Rate matrix R
eta: Caudal characteristic (spectral radius of R)
G: Rate matrix G
A_1: Downward transition block
A0: Local transition block
A1: Upward transition block
U: Matrix U
MAPs_scaled: Scaled service process
References:
Original MATLAB: matlab/src/api/mam/qbd_mapmap1.m
"""
from ..mam import map_scale, map_lambda
D0_arr, D1_arr = MAPa
D0_srv, D1_srv = MAPs
D0_arr = np.asarray(D0_arr, dtype=np.float64)
D1_arr = np.asarray(D1_arr, dtype=np.float64)
D0_srv = np.asarray(D0_srv, dtype=np.float64)
D1_srv = np.asarray(D1_srv, dtype=np.float64)
na = D0_arr.shape[0]
ns = D0_srv.shape[0]
# Scale service process if target utilization provided
if util is not None:
lambda_a = map_lambda(D0_arr, D1_arr)
D0_srv, D1_srv = map_scale(D0_srv, D1_srv, util / lambda_a)
lambda_a = map_lambda(D0_arr, D1_arr)
lambda_s = map_lambda(D0_srv, D1_srv)
actual_util = lambda_a / lambda_s
# Build QBD blocks
A1 = np.kron(D1_arr, np.eye(ns)) # Forward (arrivals)
A0 = np.kron(D0_arr, np.eye(ns)) + np.kron(np.eye(na), D0_srv) # Local
A_1 = np.kron(np.eye(na), D1_srv) # Backward (services)
A0bar = np.kron(D0_arr, np.eye(ns)) # Boundary local
# Compute R and G using QBD solver
result = qbd_rg(A_1, A0, A1)
R = result.R
G = result.G
U = result.U
eta = result.eta
# Compute queue length distribution using matrix-geometric method
# Solve pi_0 from: pi_0 * (A0bar + R * A_1) * e = 0, pi_0 * e = 1 - rho
# For now, use simplified approach
I = np.eye(R.shape[0])
try:
inv_I_minus_R = linalg.inv(I - R)
except LinAlgError:
inv_I_minus_R = linalg.pinv(I - R)
# Compute steady-state distribution at level 0
# Solve: pi_0 * (A0bar + R * A_1) = 0 with normalization
L0 = A0bar + R @ A_1
try:
from ..mc.ctmc import ctmc_solve
pi0 = ctmc_solve(L0.T).flatten()
except:
# Fallback to eigenvalue method
eigvals, eigvecs = linalg.eig(L0.T)
idx = np.argmin(np.abs(eigvals))
pi0 = np.real(eigvecs[:, idx]).flatten()
pi0 = np.abs(pi0) / np.sum(np.abs(pi0))
# Normalize
norm_const = np.sum(pi0) * np.sum(inv_I_minus_R, axis=1).mean()
if norm_const > 0:
pi0 = pi0 / norm_const * (1 - actual_util)
# Build queue length distribution (first few levels)
max_levels = 100
pqueue = np.zeros((max_levels, na * ns))
pqueue[0, :] = pi0
pi_n = pi0.copy()
for n in range(1, max_levels):
pi_n = pi_n @ R
pqueue[n, :] = pi_n
if np.sum(pi_n) < 1e-12:
break
# Compute performance measures
if na == 1 and ns == 1:
UN = 1 - pqueue[0, 0]
QN = np.sum(np.arange(pqueue.shape[0]) * pqueue.flatten())
else:
UN = 1 - np.sum(pqueue[0, :])
QN = np.sum(np.arange(pqueue.shape[0])[:, None] * np.sum(pqueue, axis=1, keepdims=True))
XN = lambda_a
MAPs_scaled = (D0_srv, D1_srv)
return XN, QN, UN, pqueue, R, eta, G, A_1, A0, A1, U, MAPs_scaled
[docs]
def qbd_raprap1(
RAPa: Tuple[np.ndarray, np.ndarray],
RAPs: Tuple[np.ndarray, np.ndarray],
util: Optional[float] = None
) -> Tuple[float, float, float, np.ndarray, np.ndarray, float,
np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Analyze a RAP/RAP/1 queue using QBD methods.
Solves a RAP/RAP/1 queue (Rational Arrival Process) using QBD methods,
computing throughput, queue length, utilization, and other metrics.
Note: This is a simplified implementation. For full RAP analysis,
specialized algorithms like those in SMCSolver are needed.
Args:
RAPa: Arrival process RAP as (H0, H1)
RAPs: Service process RAP as (H0, H1)
util: Optional target utilization to scale service rate
Returns:
Tuple of (XN, QN, UN, pqueue, R, eta, G, B, L, F) where:
XN: System throughput
QN: Mean queue length
UN: Utilization
pqueue: Queue length distribution
R: Rate matrix R
eta: Caudal characteristic
G: Rate matrix G
B: Backward transition block
L: Local transition block
F: Forward transition block
References:
Original MATLAB: matlab/src/api/mam/qbd_raprap1.m
"""
from ..mam import map_scale, map_lambda
H0_arr, H1_arr = RAPa
H0_srv, H1_srv = RAPs
H0_arr = np.asarray(H0_arr, dtype=np.float64)
H1_arr = np.asarray(H1_arr, dtype=np.float64)
H0_srv = np.asarray(H0_srv, dtype=np.float64)
H1_srv = np.asarray(H1_srv, dtype=np.float64)
na = H0_arr.shape[0]
ns = H0_srv.shape[0]
# Scale service process if target utilization provided
if util is not None:
lambda_a = map_lambda(H0_arr, H1_arr)
H0_srv, H1_srv = map_scale(H0_srv, H1_srv, util / lambda_a)
lambda_a = map_lambda(H0_arr, H1_arr)
lambda_s = map_lambda(H0_srv, H1_srv)
actual_util = lambda_a / lambda_s
# Build QBD blocks (same structure as MAP/MAP/1)
F = np.kron(H1_arr, np.eye(ns)) # Forward (arrivals)
L = np.kron(H0_arr, np.eye(ns)) + np.kron(np.eye(na), H0_srv) # Local
B = np.kron(np.eye(na), H1_srv) # Backward (services)
# Compute R and G
result = qbd_rg(B, L, F)
R = result.R
G = result.G
eta = result.eta if result.eta is not None else np.max(np.abs(linalg.eigvals(R)))
# Compute queue length distribution
# Solve pi_0 from boundary condition
A0bar = np.kron(H0_arr, np.eye(ns))
L0 = A0bar + R @ B
try:
from ..mc.ctmc import ctmc_solve
pi0 = ctmc_solve(L0.T).flatten()
except:
eigvals, eigvecs = linalg.eig(L0.T)
idx = np.argmin(np.abs(eigvals))
pi0 = np.real(eigvecs[:, idx]).flatten()
pi0 = np.abs(pi0) / np.sum(np.abs(pi0))
# Normalize
I = np.eye(R.shape[0])
try:
inv_I_minus_R = linalg.inv(I - R)
except LinAlgError:
inv_I_minus_R = linalg.pinv(I - R)
norm_const = np.sum(pi0 @ inv_I_minus_R)
if norm_const > 0:
pi0 = pi0 / norm_const
# Build queue length distribution
max_levels = 100
pqueue = np.zeros((max_levels, na * ns))
pqueue[0, :] = pi0
pi_n = pi0.copy()
for n in range(1, max_levels):
pi_n = pi_n @ R
pqueue[n, :] = pi_n
if np.sum(pi_n) < 1e-12:
break
# Compute performance measures
if na == 1 and ns == 1:
UN = 1 - pqueue[0, 0]
else:
UN = 1 - np.sum(pqueue[0, :])
QN = np.sum(np.arange(pqueue.shape[0])[:, None] * np.sum(pqueue, axis=1, keepdims=True))
XN = lambda_a
return XN, QN, UN, pqueue, R, eta, G, B, L, F
[docs]
def qbd_setupdelayoff(
lambda_val: float,
mu: float,
alpharate: float,
alphascv: float,
betarate: float,
betascv: float
) -> float:
"""
Analyze queue with setup delay and turn-off phases.
Performs queue-length analysis for a queueing system with setup
delay (warm-up) and turn-off periods using QBD methods.
The system operates as follows:
1. When empty and job arrives, server enters setup phase
2. After setup, server becomes active and serves jobs
3. When queue empties, server enters turn-off phase
4. After turn-off, server becomes idle
Args:
lambda_val: Arrival rate
mu: Service rate
alpharate: Rate of setup delay phase
alphascv: Squared coefficient of variation for setup delay
betarate: Rate of turn-off phase
betascv: Squared coefficient of variation for turn-off period
Returns:
Average queue length QN
References:
Original MATLAB: matlab/src/api/mam/qbd_setupdelayoff.m
"""
from ..butools.ph.baseph import AcyclicPHFromMeansAndSCVs
# Fit PH distributions for setup and turn-off phases
try:
alpha_ph = AcyclicPHFromMeansAndSCVs([1.0 / alpharate], [alphascv])
alpha_D0 = alpha_ph[1] # Subgenerator matrix
alpha_D1 = -alpha_D0 @ np.ones((alpha_D0.shape[0], 1))
except:
# Fallback to Erlang approximation
na = max(1, int(round(1.0 / alphascv)))
rate_a = na * alpharate
alpha_D0 = np.diag([-rate_a] * na)
for i in range(na - 1):
alpha_D0[i, i + 1] = rate_a
alpha_D1 = np.zeros((na, 1))
alpha_D1[-1, 0] = rate_a
try:
beta_ph = AcyclicPHFromMeansAndSCVs([1.0 / betarate], [betascv])
beta_D0 = beta_ph[1]
beta_D1 = -beta_D0 @ np.ones((beta_D0.shape[0], 1))
except:
# Fallback to Erlang approximation
nb = max(1, int(round(1.0 / betascv)))
rate_b = nb * betarate
beta_D0 = np.diag([-rate_b] * nb)
for i in range(nb - 1):
beta_D0[i, i + 1] = rate_b
beta_D1 = np.zeros((nb, 1))
beta_D1[-1, 0] = rate_b
na = alpha_D0.shape[0]
nb = beta_D0.shape[0]
n = na + nb
# Build QBD blocks
# States: [setup phases (1..na), active + turn-off phases (na+1..n)]
F = np.zeros((n, n)) # Forward transitions (arrivals)
B = np.zeros((n, n)) # Backward transitions (service)
# Arrivals in setup phase
for i in range(na):
F[i, i] = lambda_val
# Arrivals in turn-off phase (go to active state)
for i in range(nb):
F[na + i, na] = lambda_val
F[na, na] = lambda_val
# Service completions (only from active state)
B[na, na] = mu
# Local transitions
L = np.zeros((n, n))
# Setup phase transitions
for i in range(na):
L[i, i] = alpha_D0[i, i] - lambda_val
if i < na - 1:
L[i, i + 1:na] = alpha_D0[i, i + 1:na]
else:
# Transition from last setup phase to active
L[na - 1, na] = -alpha_D0[na - 1, na - 1]
# Active state
L[na, na] = -mu - lambda_val
# Turn-off phase (only reachable from level 0)
for i in range(1, nb):
L[na + i, na + i] = -lambda_val
# Boundary block L0 for level 0
L0 = np.zeros((n, n))
# Setup phase at level 0
for i in range(na):
L0[i, i] = -lambda_val
# Turn-off phase at level 0
for i in range(nb):
L0[na + i, na + i] = beta_D0[i, i] - lambda_val if i < nb else -lambda_val
if i == nb - 1:
L0[na + i, 0] = -beta_D0[i, i] # Return to setup phase
elif i < nb - 1:
L0[na + i, na + i + 1] = -beta_D0[i, i]
# Compute R matrix
R = qbd_R(B, L, F)
# Compute steady-state distribution at level 0
try:
from ..mc.dtmc import dtmc_solve
pi_boundary = dtmc_solve(L0 + R @ B)
except:
pi_boundary = np.ones(n) / n
# Normalize
I = np.eye(n)
try:
inv_I_minus_R = linalg.inv(I - R)
except LinAlgError:
inv_I_minus_R = linalg.pinv(I - R)
norm_const = np.sum(pi_boundary @ inv_I_minus_R)
if norm_const > 0:
pi_boundary = pi_boundary / norm_const
# Compute queue length distribution and mean
max_levels = 1000
QN = 0.0
pi_n = pi_boundary.copy()
for level in range(1, max_levels):
pi_n = pi_n @ R
level_prob = np.sum(pi_n)
QN += level * level_prob
if level_prob < 1e-12:
break
return QN
__all__ = [
'QBDResult',
'qbd_R',
'qbd_R_logred',
'qbd_rg',
'qbd_blocks_mapmap1',
'qbd_bmapbmap1',
'qbd_mapmap1',
'qbd_raprap1',
'qbd_setupdelayoff',
]