"""
Discrete-Time Markov Chain (DTMC) analysis algorithms.
Native Python implementations for DTMC steady-state and related methods.
Leverages the CTMC solver by converting (P - I) to a generator-like matrix.
Key algorithms:
dtmc_solve: Steady-state distribution
dtmc_makestochastic: Convert matrix to stochastic
dtmc_isfeasible: Validate transition matrix
dtmc_simulate: Sample path simulation
"""
import numpy as np
from numpy.linalg import LinAlgError
from scipy import linalg
from typing import Dict, Any, Optional, List
from .ctmc import ctmc_solve, _find_weakly_connected_components
[docs]
def dtmc_solve(P: np.ndarray) -> np.ndarray:
"""
Solve for steady-state probabilities of a DTMC.
Computes the stationary distribution π by solving π(P - I) = 0
with normalization constraint Σπ = 1.
This leverages the CTMC solver by treating (P - I) as an
infinitesimal generator.
Args:
P: Transition probability matrix (row stochastic)
Returns:
Steady-state probability distribution (1D array)
"""
P = np.asarray(P, dtype=np.float64)
# Convert to "generator" form: Q = P - I
# This has the property that πQ = π(P-I) = πP - π = 0 when πP = π
n = P.shape[0]
Q = P.copy()
for i in range(n):
Q[i, i] -= 1.0
return ctmc_solve(Q)
[docs]
def dtmc_solve_reducible(P: np.ndarray, pin: np.ndarray = None) -> np.ndarray:
"""
Solve reducible DTMCs with transient states.
Handles DTMCs with multiple recurrent classes and transient states by:
1. Decomposing into strongly connected components (SCCs)
2. Identifying recurrent vs transient SCCs
3. Computing limiting distribution considering absorption from transient states
For a reducible DTMC with a single transient SCC, this computes the limiting
distribution when starting from the transient states (e.g., class switching
networks where jobs start in a transient class).
Args:
P: Transition probability matrix (possibly reducible)
pin: Initial probability vector (optional)
Returns:
Steady-state probability vector
"""
from scipy.sparse.csgraph import connected_components
from scipy.sparse import csc_matrix
P = np.asarray(P, dtype=np.float64)
n = P.shape[0]
if n == 1:
return np.array([1.0])
# Find strongly connected components
# Use 'strong' connection to properly identify recurrent vs transient
n_components, scc_labels = connected_components(
csc_matrix(P > 1e-10), directed=True, connection='strong', return_labels=True
)
if n_components == 1:
# Irreducible chain - use standard solve
return dtmc_solve(P)
# Identify recurrent vs transient SCCs
# A SCC is recurrent if all outgoing transitions stay within the SCC
num_scc = n_components
scc_idx = [np.where(scc_labels == i)[0] for i in range(num_scc)]
is_rec = np.zeros(num_scc, dtype=bool)
for i in range(num_scc):
states_i = scc_idx[i]
# Check if all outgoing transitions from SCC i stay within SCC i
outgoing_prob = 0.0
for s in states_i:
for j in range(num_scc):
if j != i:
states_j = scc_idx[j]
outgoing_prob += np.sum(P[s, states_j])
# If no outgoing transitions to other SCCs, this is recurrent
is_rec[i] = outgoing_prob < 1e-10
# Build lumped transition matrix between SCCs
Pl = np.zeros((num_scc, num_scc))
for i in range(num_scc):
states_i = scc_idx[i]
for j in range(num_scc):
if i != j:
states_j = scc_idx[j]
Pl[i, j] = np.sum(P[np.ix_(states_i, states_j)])
Pl = dtmc_makestochastic(Pl)
# Recurrent SCCs have self-loops with probability 1
for i in range(num_scc):
if is_rec[i]:
Pl[i, :] = 0.0
Pl[i, i] = 1.0
# Compute initial distribution over SCCs
# This matches MATLAB's dtmc_solve_reducible lines 72-89
if pin is None:
# Start with uniform over all SCCs
pinl = np.ones(num_scc)
# Find states with near-zero column sums (absorbing/unreachable states)
# These states have no incoming transitions from other states
col_sums = np.sum(P, axis=0)
z_cols = np.where(col_sums < 1e-12)[0]
# Zero out SCCs that contain only absorbing states
for j in z_cols:
pinl[scc_labels[j]] = 0
# Normalize
if np.sum(pinl) > 0:
pinl = pinl / np.sum(pinl)
else:
# Fallback to uniform
pinl = np.ones(num_scc) / num_scc
else:
# Lump initial vector by SCC
pinl = np.zeros(num_scc)
for i in range(num_scc):
pinl[i] = np.sum(pin[scc_idx[i]])
# Compute limiting distribution of lumped chain via power iteration
PI = _compute_limiting_matrix_power(Pl)
# Compute stationary distribution for each starting SCC
pis = np.zeros((num_scc, n))
for i in range(num_scc):
if pinl[i] > 1e-10:
# Compute limiting distribution starting from SCC i
pi0_l = np.zeros(num_scc)
pi0_l[i] = 1.0
pil_i = pi0_l @ PI
# For each SCC, solve internal stationary distribution
for j in range(num_scc):
if pil_i[j] > 1e-10:
states_j = scc_idx[j]
if len(states_j) == 1:
pis[i, states_j[0]] = pil_i[j]
else:
# Solve internal DTMC
Pj = P[np.ix_(states_j, states_j)]
pij = dtmc_solve(Pj)
pis[i, states_j] = pil_i[j] * pij
# Combine distributions weighted by initial SCC probabilities
pi = np.zeros(n)
for i in range(num_scc):
if pinl[i] > 1e-10:
pi += pis[i, :] * pinl[i]
# Special case: single transient SCC with no explicit initial distribution
transient_sccs = np.where(~is_rec)[0]
if len(transient_sccs) == 1 and pin is None:
pi = pis[transient_sccs[0], :]
return pi
def _compute_limiting_matrix_power(P: np.ndarray, max_iter: int = 1000, tol: float = 1e-10) -> np.ndarray:
"""
Compute limiting matrix using power iteration.
For DTMCs, P^n converges to a matrix where each row gives the limiting
distribution starting from that state.
Args:
P: Stochastic transition matrix
max_iter: Maximum iterations (default: 1000)
tol: Convergence tolerance (default: 1e-10)
Returns:
Limiting matrix (each row is limiting distribution from that start state)
"""
Pk = P.copy()
for _ in range(max_iter):
Pk1 = Pk @ P
max_diff = np.max(np.abs(Pk1 - Pk))
Pk = Pk1
if max_diff < tol:
break
return Pk
[docs]
def dtmc_makestochastic(A: np.ndarray) -> np.ndarray:
"""
Convert matrix to row-stochastic transition matrix.
Normalizes each row to sum to 1. Rows with zero sum are
replaced with uniform distribution.
Args:
A: Input matrix to normalize
Returns:
Row-stochastic matrix
"""
A = np.asarray(A, dtype=np.float64)
A = np.maximum(A, 0.0) # Ensure non-negative
row_sums = A.sum(axis=1, keepdims=True)
# Handle zero rows: replace with uniform
zero_rows = (row_sums == 0).flatten()
n = A.shape[1]
A[zero_rows] = 1.0 / n
# Re-compute row sums
row_sums = A.sum(axis=1, keepdims=True)
return A / row_sums
[docs]
def dtmc_isfeasible(P: np.ndarray, tolerance: float = 1e-10) -> bool:
"""
Check if matrix is a valid DTMC transition matrix.
Validates:
- All elements are non-negative
- All row sums equal 1
Args:
P: Candidate transition matrix
tolerance: Numerical tolerance (default: 1e-10)
Returns:
True if matrix is valid DTMC transition matrix
"""
P = np.asarray(P)
if P.shape[0] != P.shape[1]:
return False
# Check non-negative
if np.any(P < -tolerance):
return False
# Check row sums = 1
row_sums = P.sum(axis=1)
if np.any(np.abs(row_sums - 1.0) > tolerance):
return False
return True
[docs]
def dtmc_simulate(
P: np.ndarray,
initial_state: int,
num_steps: int,
seed: Optional[int] = None
) -> np.ndarray:
"""
Simulate DTMC sample path.
Generates a realization of the discrete-time Markov chain
for a specified number of steps.
Args:
P: Transition probability matrix
initial_state: Starting state index
num_steps: Number of simulation steps
Returns:
Array of visited states (length num_steps + 1)
"""
if seed is not None:
np.random.seed(seed)
P = np.asarray(P, dtype=np.float64)
n = P.shape[0]
states = np.zeros(num_steps + 1, dtype=int)
states[0] = initial_state
current_state = initial_state
for step in range(num_steps):
probs = P[current_state, :]
next_state = np.random.choice(n, p=probs)
states[step + 1] = next_state
current_state = next_state
return states
[docs]
def dtmc_rand(
n: int,
density: float = 0.5
) -> np.ndarray:
"""
Generate random DTMC transition matrix.
Args:
n: Number of states
density: Sparsity density (0 to 1, default 0.5)
Returns:
Random transition probability matrix
"""
# Generate random entries
P = np.random.rand(n, n)
# Apply density mask
mask = np.random.rand(n, n) < density
P = P * mask
# Ensure at least one non-zero per row
for i in range(n):
if P[i, :].sum() == 0:
j = np.random.randint(n)
P[i, j] = np.random.rand()
# Normalize to stochastic
return dtmc_makestochastic(P)
[docs]
def dtmc_timereverse(
P: np.ndarray,
pi: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Compute time-reversed DTMC transition matrix.
The time-reversed chain has transition probabilities:
P*_{ij} = π_j * P_{ji} / π_i
Args:
P: Original transition matrix
pi: Steady-state distribution (optional, computed if None)
Returns:
Time-reversed transition matrix
"""
P = np.asarray(P, dtype=np.float64)
if pi is None:
pi = dtmc_solve(P)
else:
pi = np.asarray(pi, dtype=np.float64).flatten()
n = P.shape[0]
P_rev = np.zeros_like(P)
for i in range(n):
for j in range(n):
if pi[i] > 0:
P_rev[i, j] = pi[j] * P[j, i] / pi[i]
# Ensure stochastic
P_rev = dtmc_makestochastic(P_rev)
return P_rev
[docs]
def dtmc_stochcomp(
P: np.ndarray,
keep_states: np.ndarray,
eliminate_states: Optional[np.ndarray] = None
) -> np.ndarray:
"""
Compute stochastic complement of DTMC.
Reduces the DTMC by eliminating specified states while preserving
the steady-state distribution restricted to the kept states.
Args:
P: Transition probability matrix
keep_states: States to retain in reduced model
eliminate_states: States to eliminate (optional, inferred if None)
Returns:
Reduced transition matrix (stochastic complement)
"""
P = np.asarray(P, dtype=np.float64)
keep_states = np.asarray(keep_states, dtype=int).flatten()
n = P.shape[0]
all_states = set(range(n))
keep_set = set(keep_states)
if eliminate_states is None:
eliminate_states = np.array(sorted(all_states - keep_set), dtype=int)
else:
eliminate_states = np.asarray(eliminate_states, dtype=int).flatten()
if len(eliminate_states) == 0:
return P
# Extract submatrices
I = keep_states
Ic = eliminate_states
P11 = P[np.ix_(I, I)]
P12 = P[np.ix_(I, Ic)]
P21 = P[np.ix_(Ic, I)]
P22 = P[np.ix_(Ic, Ic)]
# Stochastic complement: S = P11 + P12 * (I - P22)^{-1} * P21
n_elim = len(eliminate_states)
I_mat = np.eye(n_elim)
try:
inv_term = linalg.inv(I_mat - P22)
except LinAlgError:
inv_term = linalg.pinv(I_mat - P22)
S = P11 + P12 @ inv_term @ P21
return S
[docs]
def dtmc_transient(
P: np.ndarray,
initial_dist: np.ndarray,
steps: int
) -> np.ndarray:
"""
Compute transient probabilities of a DTMC.
Calculates π(n) = π(0) * P^n for each step from 0 to steps.
Args:
P: Transition probability matrix
initial_dist: Initial probability distribution π(0)
steps: Number of time steps
Returns:
Array of shape (steps+1, n) with transient probabilities
"""
P = np.asarray(P, dtype=np.float64)
initial_dist = np.asarray(initial_dist, dtype=np.float64).flatten()
n = P.shape[0]
results = np.zeros((steps + 1, n))
results[0] = initial_dist
pi = initial_dist.copy()
for k in range(1, steps + 1):
pi = pi @ P
results[k] = pi
return results
[docs]
def dtmc_hitting_time(
P: np.ndarray,
target_states: np.ndarray
) -> np.ndarray:
"""
Compute mean hitting times to target states.
Calculates the expected number of steps to reach any target state
from each starting state.
Args:
P: Transition probability matrix
target_states: Array of target state indices
Returns:
Array of mean hitting times from each state
"""
P = np.asarray(P, dtype=np.float64)
target_states = np.asarray(target_states, dtype=int).flatten()
n = P.shape[0]
target_set = set(target_states)
non_target = [i for i in range(n) if i not in target_set]
if len(non_target) == 0:
return np.zeros(n)
# For non-target states: h = 1 + P_NT * h_NT
# (I - P_NT) * h_NT = 1
P_NT = P[np.ix_(non_target, non_target)]
A = np.eye(len(non_target)) - P_NT
b = np.ones(len(non_target))
try:
h_NT = linalg.solve(A, b)
except LinAlgError:
h_NT, _, _, _ = linalg.lstsq(A, b)
h = np.zeros(n)
h[non_target] = h_NT
return h
__all__ = [
'dtmc_solve',
'dtmc_solve_reducible',
'dtmc_makestochastic',
'dtmc_isfeasible',
'dtmc_simulate',
'dtmc_rand',
'dtmc_timereverse',
'dtmc_stochcomp',
'dtmc_transient',
'dtmc_hitting_time',
]