Source code for line_solver.api.mam.ldqbd

"""
Level-Dependent Quasi-Birth-Death (LDQBD) process solver.

Native Python implementation for analyzing LDQBD processes using matrix
continued fractions. Implements Algorithm 1 from "A Simple Algorithm for
the Rate Matrices of Level-Dependent QBD Processes" by Phung-Duc,
Masuyama, Kasahara, and Takahashi (2010), QTNA Conference.

An LDQBD has a block-tridiagonal infinitesimal generator:
    Q^(0)_1  Q^(0)_0   O        O      ...   O
    Q^(1)_2  Q^(1)_1  Q^(1)_0   O      ...   O
    O       Q^(2)_2  Q^(2)_1  Q^(2)_0 ...   O
    ...

where Q_0^(n) are upward transitions, Q_1^(n) are local transitions,
and Q_2^(n) are downward transitions.
"""

import numpy as np
from scipy import linalg
from dataclasses import dataclass
from typing import List, Tuple, Optional


[docs] @dataclass class LdqbdResult: """Result of LDQBD solver. Attributes: R: List of rate matrices R^(1), R^(2), ..., R^(N) pi: Stationary distribution vector [pi_0, pi_1, ..., pi_N] """ R: List[np.ndarray] pi: np.ndarray
[docs] @dataclass class LdqbdOptions: """Options for LDQBD solver. Attributes: epsilon: Convergence tolerance (default: 1e-10) max_iter: Maximum number of iterations (default: 1000) verbose: Print debug information (default: False) """ epsilon: float = 1e-10 max_iter: int = 1000 verbose: bool = False
[docs] def ldqbd(Q0: List[np.ndarray], Q1: List[np.ndarray], Q2: List[np.ndarray], options: Optional[LdqbdOptions] = None) -> LdqbdResult: """ Solve a level-dependent QBD process. Args: Q0: List of upward transition matrices [Q0^(0), Q0^(1), ..., Q0^(N-1)] Q0^(n) has shape (states_n, states_{n+1}) Q1: List of local transition matrices [Q1^(0), Q1^(1), ..., Q1^(N)] Q1^(n) has shape (states_n, states_n) Q2: List of downward transition matrices [Q2^(1), Q2^(2), ..., Q2^(N)] Q2^(n) has shape (states_n, states_{n-1}) options: LdqbdOptions instance (uses defaults if None) Returns: LdqbdResult containing rate matrices R and stationary distribution pi Algorithm: 1. Compute rate matrices backward using continued fraction recursion 2. Compute stationary distribution forward using R matrices """ if options is None: options = LdqbdOptions() # Determine number of levels # Q0 has entries for levels 0 to N-1 # Q1 has entries for levels 0 to N # Q2 has entries for levels 1 to N N = len(Q1) - 1 # Maximum level if options.verbose: print(f"LD-QBD Solver: N={N} levels, epsilon={options.epsilon}") # Validate input dimensions if len(Q0) != N: raise ValueError(f"Q0 must have {N} matrices (levels 0 to {N-1})") if len(Q2) != N: raise ValueError(f"Q2 must have {N} matrices (levels 1 to {N})") # Compute all R matrices using backward recursion R = _compute_all_rate_matrices(N, Q0, Q1, Q2, options) if options.verbose: for n in range(N): print(f" R^({n+1}) computed ({R[n].shape})") # Compute stationary distribution pi = _compute_stationary_dist(R, Q0, Q1, Q2, N, options) return LdqbdResult(R, pi)
def _compute_all_rate_matrices(N: int, Q0: List[np.ndarray], Q1: List[np.ndarray], Q2: List[np.ndarray], options: LdqbdOptions) -> List[np.ndarray]: """ Compute all R matrices using backward recursion. For heterogeneous LDQBD: R^(N) = Q0^(N-1) * (-Q1^(N))^{-1} R^(n) = Q0^(n-1) * (-Q1^(n) - R^(n+1) * Q2^(n+1))^{-1} for n = N-1,...,1 Args: N: Maximum level Q0, Q1, Q2: List of transition matrices options: Solver options Returns: List of R matrices [R^(1), R^(2), ..., R^(N)] """ R = [None] * N # Start at level N (boundary): R^(N) = Q0^(N-1) * (-Q1^(N))^{-1} Q0_Nminus1 = np.asarray(Q0[N - 1], dtype=np.float64) Q1_N = np.asarray(Q1[N], dtype=np.float64) U_N = -Q1_N R[N - 1] = _safe_matrix_divide(Q0_Nminus1, U_N) # Backward recursion for n = N-1 down to 1 for n in range(N - 1, 0, -1): Q0_nminus1 = np.asarray(Q0[n - 1], dtype=np.float64) Q1_n = np.asarray(Q1[n], dtype=np.float64) Q2_nplus1 = np.asarray(Q2[n], dtype=np.float64) # This is Q2^(n+1) R_nplus1 = R[n] # Compute R^(n+1) * Q2^(n+1) RQ2 = R_nplus1 @ Q2_nplus1 # Compute U = -Q1^(n) - R^(n+1) * Q2^(n+1) U = -Q1_n - RQ2 # Compute R^(n) = Q0^(n-1) * U^{-1} R[n - 1] = _safe_matrix_divide(Q0_nminus1, U) return R def _safe_matrix_divide(A: np.ndarray, U: np.ndarray) -> np.ndarray: """ Safely compute A * U^{-1}. Uses pseudo-inverse if U is singular. Args: A: Numerator matrix U: Denominator matrix (to be inverted) Returns: Product A * U^{-1} """ A = np.asarray(A, dtype=np.float64) U = np.asarray(U, dtype=np.float64) if U.shape == (1, 1): # Scalar case u_val = U[0, 0] if abs(u_val) > 1e-14: return A / u_val else: return np.zeros_like(A) else: # Matrix case try: # Try standard inversion first det = np.linalg.det(U) if abs(det) > 1e-14: U_inv = np.linalg.inv(U) return A @ U_inv else: # Use pseudo-inverse for singular matrices U_pinv = np.linalg.pinv(U) return A @ U_pinv except np.linalg.LinAlgError: # If inversion fails, use pseudo-inverse U_pinv = np.linalg.pinv(U) return A @ U_pinv def _compute_stationary_dist(R: List[np.ndarray], Q0: List[np.ndarray], Q1: List[np.ndarray], Q2: List[np.ndarray], N: int, options: LdqbdOptions) -> np.ndarray: """ Compute stationary distribution from rate matrices. Algorithm: 1. Boundary equation: pi_0 * (Q1^(0) + R^(1)*Q2^(1)) = 0 2. Forward recursion: pi_n = pi_{n-1} * R^(n) 3. Normalize: sum(pi) = 1 Args: R: List of rate matrices Q0, Q1, Q2: Transition matrices N: Maximum level options: Solver options Returns: Stationary distribution vector [pi_0, pi_1, ..., pi_N] """ # Check if all levels have same dimension (homogeneous) and scalar dims = [Q1_n.shape[0] for Q1_n in Q1] is_homogeneous = len(set(dims)) == 1 is_scalar = all(d == 1 for d in dims) if is_scalar and is_homogeneous: # Scalar case: direct computation pi = np.zeros(N + 1) # Start with pi_0 = 1 (will normalize later) pi[0] = 1.0 # Forward recursion: pi_n = pi_{n-1} * R^(n) for n in range(1, N + 1): if R[n - 1].shape == (1, 1): pi[n] = pi[n - 1] * R[n - 1][0, 0] else: pi[n] = 0.0 # Normalize total = np.sum(pi) if total > 0: pi = pi / total return pi else: # Heterogeneous or matrix case pi_cells = [np.ones((1, 1)) for _ in range(N + 1)] # Initialize pi_0 based on its dimension if Q1[0].shape == (1, 1): # Scalar level 0: start with pi_0 = 1 pi_cells[0] = np.array([[1.0]]) else: # Matrix level 0: solve boundary equation Q1_0 = np.asarray(Q1[0], dtype=np.float64) Q2_1 = np.asarray(Q2[0], dtype=np.float64) A = Q1_0 + (R[0] @ Q2_1) # Find left null space (solve pi * A = 0) pi0 = _solve_left_null_space(A) pi_cells[0] = pi0 # Forward recursion: pi_n = pi_{n-1} * R^(n) for n in range(1, N + 1): pi_cells[n] = pi_cells[n - 1] @ R[n - 1] # Normalize and convert to scalar probabilities total = sum(np.sum(pi_cells[n]) for n in range(N + 1)) pi = np.zeros(N + 1) for n in range(N + 1): if total > 0: pi[n] = np.sum(pi_cells[n]) / total else: pi[n] = 1.0 / (N + 1) return pi def _solve_left_null_space(A: np.ndarray) -> np.ndarray: """ Find left null space of matrix A (find pi such that pi * A = 0). Uses iterative power method to find the eigenvector corresponding to the smallest magnitude eigenvalue. Args: A: Matrix to find null space of Returns: Left null space vector (row vector) """ A = np.asarray(A, dtype=np.float64) n = A.shape[0] # Use SVD to find left null space U, s, Vt = np.linalg.svd(A) # Find the column of V corresponding to the smallest singular value # For A = U * S * V^T, the left null space is the row of V^T with smallest s # Which is the row of V^T with smallest singular value # In Python's SVD: A = U @ diag(s) @ Vt # V^T has shape (n_rows_A, n_rows_A), so V has shape (n_cols_A, n_cols_A) # For square A: find right null space of A^T = left null space of A # Actually, use eigendecomposition of A^T try: eigenvalues, eigenvectors = np.linalg.eig(A.T) # Find eigenvector corresponding to smallest magnitude eigenvalue min_idx = np.argmin(np.abs(eigenvalues)) pi = eigenvectors[:, min_idx].real # Make positive pi = np.abs(pi) # Normalize pi_sum = np.sum(pi) if pi_sum > 1e-14: pi = pi / pi_sum else: pi = np.ones(n) / n return pi.reshape(1, -1) except: # Fallback: return uniform distribution return np.ones((1, n)) / n