Source code for line_solver.api.mam.map_analysis

"""
Markovian Arrival Process (MAP) analysis algorithms.

Native Python implementations for analyzing MAPs, including:
- Steady-state distributions (map_piq, map_pie)
- Arrival rate and moments (map_lambda, map_mean, map_var, map_scv)
- MAP transformations and utilities

References:
    Neuts, M.F. "Matrix-Geometric Solutions in Stochastic Models:
    An Algorithmic Approach." Dover Publications, 1994.
"""

import numpy as np
from numpy.linalg import LinAlgError
from scipy import linalg
from typing import Tuple, Optional, Union

# Import CTMC solver from mc module
from ..mc import ctmc_solve


[docs] def map_infgen(D0: np.ndarray, D1: np.ndarray) -> np.ndarray: """ Compute the infinitesimal generator of a MAP. The generator Q = D0 + D1 represents the underlying CTMC. Args: D0: Hidden transition matrix (non-arrival transitions) D1: Visible transition matrix (arrival transitions) Returns: Infinitesimal generator matrix Q = D0 + D1 """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) return D0 + D1
[docs] def map_piq(D0: np.ndarray, D1: np.ndarray = None) -> np.ndarray: """ Compute steady-state distribution of the underlying CTMC of a MAP. Solves πQ = 0 where Q = D0 + D1 is the generator. Args: D0: Hidden transition matrix, or stacked [D0, D1] if D1 is None D1: Visible transition matrix (optional) Returns: Steady-state probability vector π """ if D1 is None: # D0 is actually [D0, D1] stacked D0_arr = np.asarray(D0) if D0_arr.ndim == 3 and D0_arr.shape[0] == 2: D0, D1 = D0_arr[0], D0_arr[1] else: raise ValueError("D0 must be a (2, n, n) array when D1 is not provided") D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) Q = map_infgen(D0, D1) pi = ctmc_solve(Q) return pi
[docs] def map_lambda(D0: np.ndarray, D1: np.ndarray = None) -> float: """ Compute the arrival rate (λ) of a MAP. The arrival rate is λ = π * D1 * e where π is the steady-state and e is the column vector of ones. Args: D0: Hidden transition matrix, or stacked [D0, D1] if D1 is None D1: Visible transition matrix (optional) Returns: Arrival rate λ """ if D1 is None: D0_arr = np.asarray(D0) if D0_arr.ndim == 3 and D0_arr.shape[0] == 2: D0, D1 = D0_arr[0], D0_arr[1] else: raise ValueError("D0 must be a (2, n, n) array when D1 is not provided") D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) n = D0.shape[0] e = np.ones(n) pi = map_piq(D0, D1) lambda_rate = pi @ D1 @ e return float(lambda_rate)
[docs] def map_pie(D0: np.ndarray, D1: np.ndarray = None) -> np.ndarray: """ Compute equilibrium distribution of embedded DTMC. The embedded DTMC has transition matrix P = (-D0)^{-1} * D1. Its steady-state is π_e = π * D1 / (π * D1 * e). Args: D0: Hidden transition matrix, or stacked [D0, D1] if D1 is None D1: Visible transition matrix (optional) Returns: Equilibrium distribution of embedded DTMC """ if D1 is None: D0_arr = np.asarray(D0) if D0_arr.ndim == 3 and D0_arr.shape[0] == 2: D0, D1 = D0_arr[0], D0_arr[1] else: raise ValueError("D0 must be a (2, n, n) array when D1 is not provided") D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) n = D0.shape[0] e = np.ones(n) pi = map_piq(D0, D1) A = pi @ D1 # Row vector # Normalize normalizer = A @ e if normalizer > 0: pie = A / normalizer else: pie = np.ones(n) / n return pie
[docs] def map_mean(D0: np.ndarray, D1: np.ndarray = None) -> float: """ Compute mean inter-arrival time of a MAP. The mean is 1/λ where λ is the arrival rate. Args: D0: Hidden transition matrix, or stacked [D0, D1] if D1 is None D1: Visible transition matrix (optional) Returns: Mean inter-arrival time """ lambda_rate = map_lambda(D0, D1) if lambda_rate > 0: return 1.0 / lambda_rate else: return float('inf')
[docs] def map_var(D0: np.ndarray, D1: np.ndarray = None) -> float: """ Compute variance of inter-arrival times of a MAP. Var[X] = E[X²] - E[X]² Uses map_moment for consistency with Kotlin implementation. Args: D0: Hidden transition matrix, or stacked [D0, D1] if D1 is None D1: Visible transition matrix (optional) Returns: Variance of inter-arrival times """ if D1 is None: D0_arr = np.asarray(D0) if D0_arr.ndim == 3 and D0_arr.shape[0] == 2: D0, D1 = D0_arr[0], D0_arr[1] else: raise ValueError("D0 must be a (2, n, n) array when D1 is not provided") D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) # Variance = E[X²] - E[X]² using map_moment and map_mean mean = map_mean(D0, D1) moment2 = map_moment(D0, D1, 2) variance = moment2 - mean**2 return float(max(0, variance))
[docs] def map_scv(D0: np.ndarray, D1: np.ndarray = None) -> float: """ Compute squared coefficient of variation (SCV) of a MAP. SCV = Var[X] / E[X]² = (E[X²] - E[X]²) / E[X]² Args: D0: Hidden transition matrix, or stacked [D0, D1] if D1 is None D1: Visible transition matrix (optional) Returns: Squared coefficient of variation """ if D1 is None: D0_arr = np.asarray(D0) if D0_arr.ndim == 3 and D0_arr.shape[0] == 2: D0, D1 = D0_arr[0], D0_arr[1] else: raise ValueError("D0 must be a (2, n, n) array when D1 is not provided") D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) # Match Kotlin: compute from moments directly e1 = map_moment(D0, D1, 1) e2 = map_moment(D0, D1, 2) if e1 > 0: var = e2 - e1 * e1 scv = var / (e1 * e1) return max(0, scv) else: return 1.0
[docs] def map_moment(D0: np.ndarray, D1: np.ndarray, k: int) -> float: """ Compute the k-th moment of inter-arrival time distribution. E[X^k] = k! * π_e * (-D0)^{-k} * e where π_e is the embedded DTMC steady-state. Args: D0: Hidden transition matrix D1: Visible transition matrix k: Moment order (k >= 1) Returns: k-th raw moment """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) if k < 1: raise ValueError("Moment order must be >= 1") # Check if D0 is zero matrix or singular (matching Kotlin implementation) if np.all(np.abs(D0) < 1e-14) or np.abs(linalg.det(D0)) < 1e-12: return 0.0 n = D0.shape[0] e = np.ones(n) # Use embedded DTMC steady-state (map_pie), not CTMC steady-state pie = map_pie(D0, D1) try: D0_inv = linalg.inv(-D0) except LinAlgError: D0_inv = linalg.pinv(-D0) # Compute k! * (-D0)^{-k} incrementally as in Kotlin # Start with (-D0)^{-1}, then multiply by (-D0)^{-1} * i for factorial D0_inv_k = D0_inv.copy() for i in range(2, k + 1): D0_inv_k = D0_inv_k @ D0_inv D0_inv_k *= i # Incorporate factorial incrementally # E[X^k] = k! * π_e * (-D0)^{-k} * e (factorial already incorporated) moment = pie @ D0_inv_k @ e return float(moment)
[docs] def map_scale(D0: np.ndarray, D1: np.ndarray, factor: float ) -> Tuple[np.ndarray, np.ndarray]: """ Scale a MAP by a given factor. Scaling changes the time scale: D0' = factor * D0, D1' = factor * D1. Args: D0: Hidden transition matrix D1: Visible transition matrix factor: Scaling factor (> 0) Returns: Tuple of scaled (D0', D1') """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) if factor <= 0: raise ValueError("Scaling factor must be positive") return factor * D0, factor * D1
[docs] def map_normalize(D0: np.ndarray, D1: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Normalize a MAP to have unit mean inter-arrival time. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Tuple of normalized (D0', D1') """ mean = map_mean(D0, D1) if mean > 0 and np.isfinite(mean): return map_scale(D0, D1, mean) else: return D0.copy(), D1.copy()
[docs] def map_isfeasible(D0: np.ndarray, D1: np.ndarray, tolerance: float = 1e-10) -> bool: """ Check if (D0, D1) form a valid MAP. A valid MAP requires: - D0 has non-positive diagonal and non-negative off-diagonal - D1 has non-negative elements - D0 + D1 is a valid generator (row sums = 0) Args: D0: Hidden transition matrix D1: Visible transition matrix tolerance: Numerical tolerance Returns: True if valid MAP """ D0 = np.asarray(D0) D1 = np.asarray(D1) n = D0.shape[0] if D0.shape != (n, n) or D1.shape != (n, n): return False # Check D0 diagonal non-positive if np.any(np.diag(D0) > tolerance): return False # Check D0 off-diagonal non-negative D0_offdiag = D0.copy() np.fill_diagonal(D0_offdiag, 0) if np.any(D0_offdiag < -tolerance): return False # Check D1 non-negative if np.any(D1 < -tolerance): return False # Check row sums of Q = D0 + D1 are zero Q = D0 + D1 row_sums = Q.sum(axis=1) if np.any(np.abs(row_sums) > tolerance): return False return True
[docs] def exp_map(lambda_rate: float) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP representation of an exponential distribution. Args: lambda_rate: Rate parameter (λ > 0) Returns: Tuple of (D0, D1) matrices representing Exp(λ) """ if lambda_rate <= 0: raise ValueError("Rate must be positive") D0 = np.array([[-lambda_rate]]) D1 = np.array([[lambda_rate]]) return D0, D1
[docs] def erlang_map(k: int, lambda_rate: float) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP representation of an Erlang-k distribution. Args: k: Number of phases (k >= 1) lambda_rate: Overall rate parameter Returns: Tuple of (D0, D1) matrices representing Erlang(k, k*λ) """ if k < 1: raise ValueError("Number of phases must be >= 1") if lambda_rate <= 0: raise ValueError("Rate must be positive") # Phase rate = k * lambda to get mean = 1/lambda mu = k * lambda_rate # Build D0 and D1 D0 = np.zeros((k, k)) D1 = np.zeros((k, k)) for i in range(k): D0[i, i] = -mu if i < k - 1: D0[i, i + 1] = mu # Arrival from last phase D1[k - 1, 0] = mu return D0, D1
[docs] def hyperexp_map(probs: np.ndarray, rates: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP representation of a hyperexponential distribution. Args: probs: Probability vector for choosing each phase rates: Rate parameters for each phase Returns: Tuple of (D0, D1) matrices representing hyperexponential """ probs = np.asarray(probs, dtype=np.float64) rates = np.asarray(rates, dtype=np.float64) if len(probs) != len(rates): raise ValueError("probs and rates must have same length") k = len(probs) # D0: diagonal with -rates D0 = np.diag(-rates) # D1: arrivals return to initial state with probability probs D1 = np.zeros((k, k)) for i in range(k): D1[i, :] = rates[i] * probs return D0, D1
# MATLAB-style wrapper functions for compatibility
[docs] def map_exponential(mean: float) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP representation of an exponential distribution (MATLAB-style). This is a MATLAB-compatible wrapper that takes mean instead of rate. Args: mean: Mean inter-arrival time (= 1/λ) Returns: Tuple of (D0, D1) matrices representing Exp(1/mean) Examples: >>> D0, D1 = map_exponential(2) # Poisson process with rate λ=0.5 """ if mean <= 0: raise ValueError("Mean must be positive") mu = 1.0 / mean D0 = np.array([[-mu]]) D1 = np.array([[mu]]) return D0, D1
[docs] def map_erlang(mean: float, k: int) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP representation of an Erlang-k distribution (MATLAB-style). This is a MATLAB-compatible wrapper that takes mean as first argument. Args: mean: Mean inter-arrival time k: Number of phases Returns: Tuple of (D0, D1) matrices representing Erlang-k with given mean Examples: >>> D0, D1 = map_erlang(2, 3) # Erlang-3 with mean 2 """ if mean <= 0: raise ValueError("Mean must be positive") if k < 1: raise ValueError("Number of phases must be >= 1") mu = k / mean # Build D0 and D1 D0 = np.zeros((k, k)) D1 = np.zeros((k, k)) # D0: transitions between phases for i in range(k - 1): D0[i, i + 1] = mu # D1: arrival from last phase D1[k - 1, 0] = mu # Normalize D0 diagonal to make it a valid generator D0, D1 = _map_normalize_generator(D0, D1) return D0, D1
def _map_normalize_generator(D0: np.ndarray, D1: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Normalize MAP to have proper generator structure. Sets D0 diagonal so that (D0 + D1) has zero row sums. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Tuple of normalized (D0, D1) """ D0 = np.asarray(D0, dtype=np.float64).copy() D1 = np.asarray(D1, dtype=np.float64).copy() n = D0.shape[0] # Set non-negative off-diagonal elements for i in range(n): for j in range(n): D0[i, j] = np.real(D0[i, j]) D1[i, j] = np.real(D1[i, j]) if i != j and D0[i, j] < 0: D0[i, j] = 0 if D1[i, j] < 0: D1[i, j] = 0 # Normalize diagonal for i in range(n): D0[i, i] = 0 row_sum = np.sum(D0[i, :]) + np.sum(D1[i, :]) D0[i, i] = -row_sum return D0, D1
[docs] def map_sumind(maps: list) -> Tuple[np.ndarray, np.ndarray]: """ Compute the sum of independent MAPs. Creates a MAP representing the sum (concatenation) of independent random variables represented by the input MAPs. Args: maps: List of MAPs, each as (D0, D1) tuple Returns: Tuple of (D0, D1) matrices representing the sum Examples: >>> # Sum of exponential and Erlang-2 >>> MAP1 = map_exponential(1.0) >>> MAP2 = map_erlang(1.0, 2) >>> D0, D1 = map_sumind([MAP1, MAP2]) """ n = len(maps) if n == 0: raise ValueError("At least one MAP required") if n == 1: return maps[0] # Get orders of each MAP orders = [map[0].shape[0] for map in maps] total_order = sum(orders) D0 = np.zeros((total_order, total_order)) D1 = np.zeros((total_order, total_order)) curpos = 0 for i in range(n): D0_i, D1_i = maps[i][0], maps[i][1] order_i = orders[i] # Set diagonal block from D0_i D0[curpos:curpos+order_i, curpos:curpos+order_i] = D0_i if i < n - 1: # Transition to next MAP order_next = orders[i + 1] pie_next = map_pie(maps[i + 1][0], maps[i + 1][1]) e_i = np.ones((order_i, 1)) # D0 transition: D1_i * e * pie_next D0[curpos:curpos+order_i, curpos+order_i:curpos+order_i+order_next] = \ D1_i @ e_i @ pie_next.reshape(1, -1) else: # Last MAP: transition back to first order_first = orders[0] pie_first = map_pie(maps[0][0], maps[0][1]) e_i = np.ones((order_i, 1)) # D1 transition back to first MAP D1[curpos:curpos+order_i, 0:order_first] = \ D1_i @ e_i @ pie_first.reshape(1, -1) curpos += order_i return D0, D1
[docs] def map_cdf(D0: np.ndarray, D1: np.ndarray, points: np.ndarray) -> np.ndarray: """ Compute cumulative distribution function of inter-arrival times. F(t) = 1 - π_e * exp(D0*t) * e Args: D0: Hidden transition matrix D1: Visible transition matrix points: Time points at which to evaluate CDF Returns: CDF values at specified points Examples: >>> map_cdf(D0, D1, 1.0) # Returns P(T <= 1) >>> map_cdf(D0, D1, [1.0, 5.0]) # Returns [P(T<=1), P(T<=5)] """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) points = np.asarray(points, dtype=np.float64).ravel() n = D0.shape[0] e = np.ones(n) pie = map_pie(D0, D1) cdf_vals = np.zeros(len(points)) for i, t in enumerate(points): if t <= 0: cdf_vals[i] = 0.0 else: exp_D0t = linalg.expm(D0 * t) cdf_vals[i] = 1.0 - pie @ exp_D0t @ e return cdf_vals
[docs] def map_pdf(D0: np.ndarray, D1: np.ndarray, points: np.ndarray) -> np.ndarray: """ Compute probability density function of inter-arrival times. f(t) = π_e * exp(D0*t) * (-D0) * e Args: D0: Hidden transition matrix D1: Visible transition matrix points: Time points at which to evaluate PDF Returns: PDF values at specified points Examples: >>> map_pdf(D0, D1, [0.5, 1.0, 2.0]) """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) points = np.asarray(points, dtype=np.float64).ravel() n = D0.shape[0] e = np.ones(n) pie = map_pie(D0, D1) neg_D0 = -D0 pdf_vals = np.zeros(len(points)) for i, t in enumerate(points): if t < 0: pdf_vals[i] = 0.0 else: exp_D0t = linalg.expm(D0 * t) pdf_vals[i] = pie @ exp_D0t @ neg_D0 @ e return pdf_vals
[docs] def map_hyperexp(probs: np.ndarray, means: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP representation of a hyperexponential distribution (MATLAB-style). This is a MATLAB-compatible wrapper that takes means instead of rates. Args: probs: Probability vector for choosing each phase means: Mean values for each phase (1/rates) Returns: Tuple of (D0, D1) matrices representing hyperexponential """ probs = np.asarray(probs, dtype=np.float64) means = np.asarray(means, dtype=np.float64) rates = 1.0 / means return hyperexp_map(probs, rates)
[docs] def map_gamma(mean: float, n: int) -> Tuple[np.ndarray, np.ndarray]: """ Create a MAP approximation of a Gamma distribution. Uses Erlang-n with matching mean as an approximation. Args: mean: Mean of the distribution n: Number of phases (shape parameter) Returns: Tuple of (D0, D1) matrices """ return map_erlang(mean, n)
[docs] def map_sample(D0: np.ndarray, D1: np.ndarray, n_samples: int, rng: Optional[np.random.Generator] = None) -> np.ndarray: """ Generate random samples from a MAP distribution. Args: D0: Hidden transition matrix D1: Visible transition matrix n_samples: Number of samples to generate rng: Optional random number generator Returns: Array of inter-arrival times """ if rng is None: rng = np.random.default_rng() D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) n = D0.shape[0] # Get initial state distribution pie = map_pie(D0, D1) samples = np.zeros(n_samples) for i in range(n_samples): # Choose initial state state = rng.choice(n, p=pie) t = 0.0 while True: # Total rate out of current state rate_out = -D0[state, state] if rate_out <= 0: break # Sample sojourn time t += rng.exponential(1.0 / rate_out) # Transition probabilities trans_rates = np.zeros(2 * n) # Hidden transitions for j in range(n): if j != state: trans_rates[j] = D0[state, j] # Visible transitions for j in range(n): trans_rates[n + j] = D1[state, j] trans_probs = trans_rates / np.sum(trans_rates) # Choose next transition next_trans = rng.choice(2 * n, p=trans_probs) if next_trans >= n: # Visible transition (arrival) samples[i] = t break else: # Hidden transition state = next_trans return samples
# ============================================================================ # Statistics Functions # ============================================================================
[docs] def map_skew(D0: np.ndarray, D1: np.ndarray) -> float: """ Compute skewness of inter-arrival times. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Skewness of inter-arrival times """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) m1 = map_moment(D0, D1, 1) m2 = map_moment(D0, D1, 2) m3 = map_moment(D0, D1, 3) # M3 = E[X^3] - 3*E[X^2]*E[X] + 2*E[X]^3 (third central moment) M3 = m3 - 3 * m2 * m1 + 2 * m1 ** 3 scv = map_scv(D0, D1) if scv > 0: skewness = M3 / (np.sqrt(scv) * m1) ** 3 else: skewness = 0.0 return float(skewness)
[docs] def map_kurt(D0: np.ndarray, D1: np.ndarray) -> float: """ Compute kurtosis of inter-arrival times. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Kurtosis of inter-arrival times """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) m1 = map_moment(D0, D1, 1) m2 = map_moment(D0, D1, 2) m3 = map_moment(D0, D1, 3) m4 = map_moment(D0, D1, 4) # Fourth central moment / variance^2 var = map_var(D0, D1) if var > 0: kurt = (m4 - 4 * m3 * m1 + 6 * m2 * m1 ** 2 - 3 * m1 ** 4) / var ** 2 else: kurt = 0.0 return float(kurt)
[docs] def map_acf(D0: np.ndarray, D1: np.ndarray, lags: Union[int, np.ndarray] = 1) -> np.ndarray: """ Compute autocorrelation coefficients of inter-arrival times. Args: D0: Hidden transition matrix D1: Visible transition matrix lags: Lag(s) at which to compute autocorrelation (default: 1) Returns: Array of autocorrelation coefficients at specified lags Examples: >>> map_acf(D0, D1) # lag-1 autocorrelation >>> map_acf(D0, D1, np.arange(1, 11)) # first 10 autocorrelations """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) if isinstance(lags, (int, np.integer)): lags = np.array([lags]) else: lags = np.asarray(lags, dtype=int) n = D0.shape[0] e = np.ones(n) P = map_embedded(D0, D1) lam = map_lambda(D0, D1) piq = map_piq(D0, D1) x = lam * piq try: invD0 = linalg.inv(-D0) except LinAlgError: invD0 = linalg.pinv(-D0) y = invD0 @ e acf_coeffs = np.zeros(len(lags)) for i, lag in enumerate(lags): P_lag = np.linalg.matrix_power(P, lag) acf_coeffs[i] = x @ P_lag @ y scv = map_scv(D0, D1) if scv > 0: acf_coeffs = (acf_coeffs - 1) / scv else: acf_coeffs = np.zeros(len(lags)) return acf_coeffs
[docs] def map_acfc(D0: np.ndarray, D1: np.ndarray, kset: np.ndarray, u: float) -> np.ndarray: """ Compute autocorrelation of counting process at given lags. Args: D0: Hidden transition matrix D1: Visible transition matrix kset: Set of lags u: Length of timeslot (time scale) Returns: Autocorrelation coefficients at specified lags """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) kset = np.asarray(kset, dtype=int) n = D0.shape[0] Q = map_infgen(D0, D1) I = np.eye(n) e = np.ones(n) piq = map_piq(D0, D1) exp_Qu = linalg.expm(Q * u) I_minus_expQu = I - exp_Qu # PRE = piq * D1 * (I - exp(Q*u)) PRE = piq @ D1 @ I_minus_expQu # (e * piq - Q)^(-1) try: inv_term = linalg.inv(np.outer(e, piq) - Q) except LinAlgError: inv_term = linalg.pinv(np.outer(e, piq) - Q) # POST = (I - exp(Q*u)) * (e*piq - Q)^(-2) * D1 * e inv_term_sq = inv_term @ inv_term POST = I_minus_expQu @ inv_term_sq @ D1 @ e vart = map_varcount(D0, D1, np.array([u]))[0] acfc = np.zeros(len(kset)) for j, k in enumerate(kset): exp_Qku = linalg.expm(Q * (k - 1) * u) acfc[j] = PRE @ exp_Qku @ POST / vart if vart > 0 else 0.0 return acfc
[docs] def map_idc(D0: np.ndarray, D1: np.ndarray) -> float: """ Compute the asymptotic index of dispersion. I = SCV * (1 + 2 * sum_{k=1}^{inf} rho_k) where SCV is the squared coefficient of variation and rho_k is the lag-k autocorrelation coefficient. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Asymptotic index of dispersion """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) n = D0.shape[0] e = np.ones(n) lam = map_lambda(D0, D1) pie = map_pie(D0, D1) piq = map_piq(D0, D1) Q = map_infgen(D0, D1) try: inv_term = linalg.inv(Q + np.outer(e, piq)) except LinAlgError: inv_term = linalg.pinv(Q + np.outer(e, piq)) I = 1 + 2 * (lam - pie @ inv_term @ D1 @ e) return float(I)
# ============================================================================ # Count Process Functions # ============================================================================
[docs] def map_count_mean(D0: np.ndarray, D1: np.ndarray, t: np.ndarray) -> np.ndarray: """ Compute mean of counting process at resolution t. Args: D0: Hidden transition matrix D1: Visible transition matrix t: Time period(s) for counting Returns: Mean arrivals in (0, t] """ t = np.asarray(t, dtype=np.float64).ravel() lam = map_lambda(D0, D1) return lam * t
[docs] def map_count_var(D0: np.ndarray, D1: np.ndarray, t: np.ndarray) -> np.ndarray: """ Compute variance of counting process at resolution t. Args: D0: Hidden transition matrix D1: Visible transition matrix t: Time period(s) for counting Returns: Variance of arrivals in (0, t] Reference: He and Neuts, "Markov chains with marked transitions", 1998 """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) t = np.asarray(t, dtype=np.float64).ravel() n = D0.shape[0] D = D0 + D1 I = np.eye(n) e = np.ones(n) theta = map_piq(D0, D1) try: tmp = linalg.inv(np.outer(e, theta) - D) except LinAlgError: tmp = linalg.pinv(np.outer(e, theta) - D) lam = theta @ D1 @ e c = theta @ D1 @ tmp d = tmp @ D1 @ e ll = theta @ D1 @ e v = np.zeros(len(t)) for i, ti in enumerate(t): exp_Dt = linalg.expm(D * ti) v[i] = (ll - 2 * lam ** 2 + 2 * c @ D1 @ e) * ti - 2 * c @ (I - exp_Dt) @ d return v
[docs] def map_varcount(D0: np.ndarray, D1: np.ndarray, tset: np.ndarray) -> np.ndarray: """ Compute variance of counting process (alternative implementation). Args: D0: Hidden transition matrix D1: Visible transition matrix tset: Set of time points Returns: Variance at each time point """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) tset = np.asarray(tset, dtype=np.float64).ravel() n = D0.shape[0] Q = map_infgen(D0, D1) I = np.eye(n) e = np.ones(n) piq = map_piq(D0, D1) lam = 1.0 / map_mean(D0, D1) try: inv_term = linalg.inv(np.outer(e, piq) - Q) except LinAlgError: inv_term = linalg.pinv(np.outer(e, piq) - Q) PRE = lam - 2 * lam ** 2 + 2 * piq @ D1 @ inv_term @ D1 @ e inv_term_sq = inv_term @ inv_term varc = np.zeros(len(tset)) for j, t in enumerate(tset): exp_Qt = linalg.expm(Q * t) POST = -2 * piq @ D1 @ (I - exp_Qt) @ inv_term_sq @ D1 @ e varc[j] = PRE * t + POST return varc
[docs] def map_count_moment(D0: np.ndarray, D1: np.ndarray, t: float, orders: np.ndarray) -> np.ndarray: """ Compute power moments of counts at resolution t. Uses numerical differentiation of the moment generating function. Args: D0: Hidden transition matrix D1: Visible transition matrix t: Resolution (time period) orders: Orders of moments to compute Returns: Power moments of counts """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) orders = np.asarray(orders, dtype=int).ravel() n = D0.shape[0] e = np.ones(n) theta = map_piq(D0, D1) def mgfunc(z): return theta @ linalg.expm(D0 * t + D1 * np.exp(z) * t) @ e # Numerical differentiation from scipy.misc import derivative M = np.zeros(len(orders)) for i, order in enumerate(orders): M[i] = derivative(mgfunc, 0, n=order, dx=1e-6) return M
# ============================================================================ # Constructor Functions # ============================================================================
[docs] def map_mmpp2(mean: float, scv: float, skew: float = -1, acf1: float = -1) -> Tuple[np.ndarray, np.ndarray]: """ Fit an MMPP(2) as a MAP. Args: mean: Mean inter-arrival time scv: Squared coefficient of variation skew: Skewness (-1 for automatic minimization) acf1: Lag-1 autocorrelation (-1 for maximum feasible) Returns: Tuple of (D0, D1) matrices Examples: >>> D0, D1 = map_mmpp2(1, 2, -1, 0.2) # Minimal skewness, ACF=0.2 >>> D0, D1 = map_mmpp2(1, 2, -1, -1) # Minimal skewness, max ACF """ E1 = mean E2 = (1 + scv) * E1 ** 2 E3 = -(2 * E1 ** 3 - 3 * E1 * E2 - skew * (E2 - E1 ** 2) ** (3 / 2)) FEASTOL = map_feastol() if acf1 == -1: G2 = 1 - 10 * 10 ** (-FEASTOL) else: G2 = acf1 / (1 - 1 / scv) / 0.5 if scv != 1 else 0 if skew == -1 and scv > 1: E3 = (3 / 2 + 0.001) * E2 ** 2 / E1 scv_recomputed = (E2 - E1 ** 2) / E1 ** 2 # Simplified MMPP(2) construction if G2 < 1e-6: # Renewal case (no autocorrelation) denom = 6 * E1 ** 3 * scv_recomputed + 3 * E1 ** 3 * scv_recomputed ** 2 + 3 * E1 ** 3 - 2 * E3 if abs(denom) < 1e-14: denom = 1e-14 mu00 = 2 * (6 * E1 ** 3 * scv_recomputed - E3) / E1 / denom mu11 = 0 q01 = 9 * E1 ** 5 * (scv_recomputed - 1) * (scv_recomputed ** 2 - 2 * scv_recomputed + 1) / max(1e-14, (6 * E1 ** 3 * scv_recomputed - E3)) / denom q10 = -3 * (scv_recomputed - 1) * E1 ** 2 / max(1e-14, (6 * E1 ** 3 * scv_recomputed - E3)) else: # Use hyperexponential for general case if scv_recomputed >= 1: # H2 approximation mu1 = E1 - 0.5 * np.sqrt(max(0, -4 * E1 ** 2 + 2 * E2)) mu2 = E1 + 0.5 * np.sqrt(max(0, -4 * E1 ** 2 + 2 * E2)) p = 0.5 - 0.5 * G2 mu1 = max(1e-10, np.real(mu1)) mu2 = max(1e-10, np.real(mu2)) mu00 = 1 / mu1 mu11 = 1 / mu2 q01 = mu00 * p q10 = mu11 * (1 - p) else: # Erlang approximation for SCV < 1 mu00 = 2 / E1 mu11 = 2 / E1 q01 = mu00 * 0.5 q10 = mu11 * 0.5 D0 = np.array([[-mu00 - q01, q01], [q10, -mu11 - q10]]) D1 = np.array([[mu00, 0], [0, mu11]]) # Normalize to ensure valid generator D0, D1 = _map_normalize_generator(D0, D1) return D0, D1
[docs] def map_gamma2(D0: np.ndarray, D1: np.ndarray) -> float: """ Compute the second largest eigenvalue of embedded DTMC. This is the autocorrelation decay rate. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Second largest eigenvalue (gamma_2) """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) P = map_embedded(D0, D1) eigvals = linalg.eigvals(P) # Sort by absolute value descending sorted_idx = np.argsort(np.abs(eigvals))[::-1] sorted_eigvals = eigvals[sorted_idx] # Return second largest if len(sorted_eigvals) > 1: return float(np.real(sorted_eigvals[1])) else: return 0.0
[docs] def map_rand(k: int = 2) -> Tuple[np.ndarray, np.ndarray]: """ Generate a random MAP of order k. Args: k: Order of the MAP (default: 2) Returns: Tuple of (D0, D1) matrices """ D0 = np.random.rand(k, k) D1 = np.random.rand(k, k) return _map_normalize_generator(D0, D1)
[docs] def map_randn(k: int, mu: Tuple[float, float] = (1.0, 1.0), sigma: Tuple[float, float] = (0.5, 0.5) ) -> Tuple[np.ndarray, np.ndarray]: """ Generate a random MAP with normally distributed elements. Args: k: Order of the MAP mu: Mean for (D0, D1) elements sigma: Standard deviation for (D0, D1) elements Returns: Tuple of (D0, D1) matrices """ D0 = np.abs(np.random.normal(mu[0], sigma[0], (k, k))) D1 = np.abs(np.random.normal(mu[1], sigma[1], (k, k))) return _map_normalize_generator(D0, D1)
[docs] def map_renewal(D0: np.ndarray, D1: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Remove all correlations from a MAP, creating a renewal process. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Renewal MAP with same CDF but no correlations """ D0 = np.asarray(D0, dtype=np.float64).copy() D1 = np.asarray(D1, dtype=np.float64).copy() n = D0.shape[0] e = np.ones((n, 1)) pie = map_pie(D0, D1).reshape(1, -1) # D1_new = D1 * e * pie D1_new = D1 @ e @ pie return D0, D1_new
# ============================================================================ # Operation Functions # ============================================================================
[docs] def map_embedded(D0: np.ndarray, D1: np.ndarray) -> np.ndarray: """ Compute embedded discrete-time transition matrix. P = (-D0)^{-1} * D1 Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Transition matrix of embedded DTMC """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) try: P = linalg.inv(-D0) @ D1 except LinAlgError: P = linalg.pinv(-D0) @ D1 return P
[docs] def map_sum(D0: np.ndarray, D1: np.ndarray, n: int ) -> Tuple[np.ndarray, np.ndarray]: """ Create MAP for sum of n IID random variables. Args: D0: Hidden transition matrix D1: Visible transition matrix n: Number of variables to sum Returns: Tuple of (D0_new, D1_new) for the sum """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) order = D0.shape[0] D0_new = np.zeros((n * order, n * order)) D1_new = np.zeros((n * order, n * order)) curpos = 0 for i in range(n): D0_new[curpos:curpos + order, curpos:curpos + order] = D0 if i < n - 1: D0_new[curpos:curpos + order, curpos + order:curpos + 2 * order] = D1 else: D1_new[curpos:curpos + order, 0:order] = D1 curpos += order return D0_new, D1_new
def _krons(A: np.ndarray, B: np.ndarray) -> np.ndarray: """Kronecker sum: A ⊕ B = A \otimes I + I \otimes B.""" return np.kron(A, np.eye(B.shape[0])) + np.kron(np.eye(A.shape[0]), B)
[docs] def map_super(D0_a: np.ndarray, D1_a: np.ndarray, D0_b: np.ndarray, D1_b: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Create superposition of two MAPs. Args: D0_a, D1_a: First MAP D0_b, D1_b: Second MAP Returns: Superposed MAP (D0, D1) """ D0_new = _krons(D0_a, D0_b) D1_new = _krons(D1_a, D1_b) return _map_normalize_generator(D0_new, D1_new)
[docs] def map_mixture(alpha: np.ndarray, maps: list ) -> Tuple[np.ndarray, np.ndarray]: """ Create probabilistic mixture of MAPs. Args: alpha: Probability vector for choosing each MAP maps: List of MAPs as (D0, D1) tuples Returns: Mixture MAP (D0, D1) """ alpha = np.asarray(alpha, dtype=np.float64) n_maps = len(maps) if len(alpha) != n_maps: raise ValueError("Alpha length must match number of MAPs") # Build block diagonal D0 D0_blocks = [maps[i][0] for i in range(n_maps)] D0 = linalg.block_diag(*D0_blocks) # Build D1 with mixture transitions orders = [maps[i][0].shape[0] for i in range(n_maps)] total_order = sum(orders) D1 = np.zeros((total_order, total_order)) row_offset = 0 for i in range(n_maps): D1_i = maps[i][1] e_i = np.ones((orders[i], 1)) col_offset = 0 for j in range(n_maps): pie_j = map_pie(maps[j][0], maps[j][1]).reshape(1, -1) D1[row_offset:row_offset + orders[i], col_offset:col_offset + orders[j]] = \ D1_i @ e_i * alpha[j] @ pie_j col_offset += orders[j] row_offset += orders[i] return _map_normalize_generator(D0, D1)
[docs] def map_max(D0_a: np.ndarray, D1_a: np.ndarray, D0_b: np.ndarray, D1_b: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Create MAP for max(X, Y) where X ~ MAP_A and Y ~ MAP_B. Args: D0_a, D1_a: First MAP D0_b, D1_b: Second MAP Returns: MAP for the maximum (D0, D1) """ D0_a = np.asarray(D0_a, dtype=np.float64) D1_a = np.asarray(D1_a, dtype=np.float64) D0_b = np.asarray(D0_b, dtype=np.float64) D1_b = np.asarray(D1_b, dtype=np.float64) na = D0_a.shape[0] nb = D0_b.shape[0] a = -D0_a @ np.ones((na, 1)) b = -D0_b @ np.ones((nb, 1)) # Build D0 D0 = np.zeros((na * nb + na + nb, na * nb + na + nb)) # Block (1,1): krons(A, B) D0[:na * nb, :na * nb] = _krons(D0_a, D0_b) # Block (1,2): kron(a, I_na) D0[:na * nb, na * nb:na * nb + na] = np.kron(a, np.eye(na)) # Block (1,3): kron(b, I_nb) D0[:na * nb, na * nb + na:] = np.kron(b, np.eye(nb)) # Block (2,2): D0_b D0[na * nb:na * nb + nb, na * nb:na * nb + nb] = D0_b # Block (3,3): D0_a D0[na * nb + nb:, na * nb + nb:] = D0_a # Build D1 pie_a = map_pie(D0_a, D1_a) pie_b = map_pie(D0_b, D1_b) pie = np.concatenate([np.kron(pie_a, pie_b), np.zeros(na), np.zeros(nb)]) D1 = np.outer(pie, np.ones(D0.shape[0])) return D0, D1
[docs] def map_timereverse(D0: np.ndarray, D1: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Compute time-reversed MAP. Args: D0: Hidden transition matrix D1: Visible transition matrix Returns: Time-reversed MAP (D0_r, D1_r) """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) piq = map_piq(D0, D1) D = np.diag(piq) try: D_inv = linalg.inv(D) except LinAlgError: D_inv = linalg.pinv(D) D0_r = D_inv @ D0.T @ D D1_r = D_inv @ D1.T @ D return D0_r, D1_r
[docs] def map_mark(D0: np.ndarray, D1: np.ndarray, prob: np.ndarray ) -> list: """ Mark arrivals from a MAP according to given probabilities. Args: D0: Hidden transition matrix D1: Visible transition matrix prob: Marking probabilities (prob[k] = P(mark type k)) Returns: MMAP as list [D0, D1_class1, D1_class2, ...] """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) prob = np.asarray(prob, dtype=np.float64) if abs(np.sum(prob) - 1.0) > 1e-6: prob = prob / np.sum(prob) mmap = [D0] for k in range(len(prob)): mmap.append(prob[k] * D1) return mmap
[docs] def map_stochcomp(D0: np.ndarray, D1: np.ndarray, retain_idx: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Stochastic complementation to reduce MAP order. Args: D0: Hidden transition matrix D1: Visible transition matrix retain_idx: Indices of states to retain (0-indexed) Returns: Reduced MAP (D0_new, D1_new) """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) retain_idx = np.asarray(retain_idx, dtype=int) Q = D0 + D1 n = Q.shape[0] eliminated_idx = np.setdiff1d(np.arange(n), retain_idx) Q_RE = Q[np.ix_(retain_idx, eliminated_idx)] Q_EE = Q[np.ix_(eliminated_idx, eliminated_idx)] Q_RR = Q[np.ix_(retain_idx, retain_idx)] Q_ER = Q[np.ix_(eliminated_idx, retain_idx)] try: Q_EE_inv = linalg.inv(-Q_EE) except LinAlgError: Q_EE_inv = linalg.pinv(-Q_EE) Q_new = Q_RR + Q_RE @ Q_EE_inv @ Q_ER D1_RR = D1[np.ix_(retain_idx, retain_idx)] D1_ER = D1[np.ix_(eliminated_idx, retain_idx)] D0_new = Q_new - D1_RR D1_new = D1_RR + Q_RE @ Q_EE_inv @ D1_ER return _map_normalize_generator(D0_new, D1_new)
# ============================================================================ # Fitting Functions # ============================================================================
[docs] def map_kpc(maps: list) -> Tuple[np.ndarray, np.ndarray]: """ Kronecker product composition of MAPs. Args: maps: List of MAPs as (D0, D1) tuples Returns: Composed MAP (D0, D1) """ if len(maps) == 0: raise ValueError("At least one MAP required") if len(maps) == 1: return maps[0] D0, D1 = maps[0] for i in range(1, len(maps)): D0_i, D1_i = maps[i] D0 = -np.kron(D0, D0_i) D1 = np.kron(D1, D1_i) return D0, D1
[docs] def map_bernstein(f, n: int = 20) -> Tuple[np.ndarray, np.ndarray]: """ Convert distribution to MAP via Bernstein approximation. Args: f: PDF function handle n: Number of phases (default: 20) Returns: MAP representation (D0, D1) """ # Bernstein approximation c = 0.0 for i in range(1, n + 1): xi = -np.log(i / n) fi = f(xi) if np.isfinite(fi) and fi > 0: c += fi / i if c <= 0 or not np.isfinite(c): return map_erlang(1.0, n) # Build subgenerator T T = np.diag(-np.arange(1, n + 1)) + np.diag(np.arange(1, n), 1) # Build initial probability vector alpha alpha = np.zeros(n) for i in range(1, n + 1): xi = -np.log(i / n) fi = f(xi) if np.isfinite(fi) and fi > 0: alpha[i - 1] = fi / (i * c) if np.sum(alpha) > 0: alpha = alpha / np.sum(alpha) else: alpha[0] = 1 # Convert to MAP P = np.tile(alpha, (n, 1)) D0 = T D1 = -T @ P return D0, D1
[docs] def map_pntiter(D0: np.ndarray, D1: np.ndarray, na: int, t: float, M: Optional[int] = None) -> np.ndarray: """ Compute probability of na arrivals in interval [0, t]. Uses iterative bisection method (Neuts and Li). Args: D0: Hidden transition matrix D1: Visible transition matrix na: Number of arrivals t: Time interval length M: Bisection depth (auto-computed if None) Returns: Matrix of probabilities """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) if M is None: mean = map_mean(D0, D1) M = max(0, int(np.ceil(np.log2(t * 100 / mean)))) if mean > 0 else 0 def br(tau, t, r): """Poisson probability.""" return np.exp(-tau * t) * (tau * t) ** r / np.math.factorial(r) def pnt_bisect(na, t): """Base case computation.""" n_states = D0.shape[0] tau = np.max(-np.diag(D0)) I = np.eye(n_states) K = D0 / tau + I K1 = D1 / tau # Find N for convergence epsilon = np.finfo(float).eps Nmax = 100 N = 1 for N in range(1, Nmax): S = sum(br(tau, t, nn) for nn in range(N + 1, Nmax)) if S < epsilon: break # Initialize V = [[np.zeros_like(I) for _ in range(N + 1)] for _ in range(na + 1)] P = [np.zeros_like(I) for _ in range(na + 1)] V[0][0] = I P[0] = V[0][0] * br(tau, t, 0) for n in range(1, na + 1): for k in range(1, N + 1): V[n][k] = V[n][k - 1] @ K + V[n - 1][k - 1] @ K1 P[n] = P[n] + V[n][k] * br(tau, t, n) return P if M <= 0: P = pnt_bisect(na, t) else: P = pnt_bisect(na, t / (2 ** M)) for _ in range(M): Pold = P P = [np.zeros_like(Pold[0]) for _ in range(na + 1)] for n in range(na + 1): for j in range(n + 1): P[n] = P[n] + Pold[j] @ Pold[n - j] return P[na]
[docs] def map_pntquad(D0: np.ndarray, D1: np.ndarray, na: int, t: float ) -> np.ndarray: """ Compute probability of na arrivals using ODE solver. Args: D0: Hidden transition matrix D1: Visible transition matrix na: Number of arrivals t: Time interval length Returns: Matrix P_n(t) """ from scipy.integrate import solve_ivp D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) Ki = D0.shape[0] def pnt_ode(t_val, P): dP = np.zeros_like(P) # n = 0 case P0 = P[:Ki ** 2].reshape(Ki, Ki) dP[:Ki ** 2] = (P0 @ D0).flatten() # n >= 1 cases for n in range(1, na + 1): Pn_1 = P[((n - 1) * Ki ** 2):(n * Ki ** 2)].reshape(Ki, Ki) Pn = P[(n * Ki ** 2):((n + 1) * Ki ** 2)].reshape(Ki, Ki) dP[(n * Ki ** 2):((n + 1) * Ki ** 2)] = (Pn @ D0 + Pn_1 @ D1).flatten() return dP # Initial condition P0 = np.zeros((na + 1) * Ki ** 2) P0[:Ki ** 2] = np.eye(Ki).flatten() sol = solve_ivp(pnt_ode, [0, t], P0, method='RK45', rtol=1e-10, atol=1e-10) P_final = sol.y[:, -1] Pnt = P_final[(na * Ki ** 2):((na + 1) * Ki ** 2)].reshape(Ki, Ki) return Pnt
# ============================================================================ # Utility Functions # ============================================================================
[docs] def map_joint(D0: np.ndarray, D1: np.ndarray, a: np.ndarray, i: np.ndarray) -> float: """ Compute joint moments of a MAP. E[(X_{a1})^{i1} * (X_{a1+a2})^{i2} * ...] Args: D0: Hidden transition matrix D1: Visible transition matrix a: Vector of inter-arrival lags i: Vector of powers Returns: Joint moment """ D0 = np.asarray(D0, dtype=np.float64) D1 = np.asarray(D1, dtype=np.float64) a = np.asarray(a, dtype=int) i = np.asarray(i, dtype=int) a_cum = np.cumsum(a) P = map_embedded(D0, D1) try: invD0 = linalg.inv(-D0) except LinAlgError: invD0 = linalg.pinv(-D0) K = len(a) JM = np.eye(D0.shape[0]) for k in range(K - 1): P_power = np.linalg.matrix_power(P, a_cum[k + 1] - a_cum[k]) invD0_power = np.linalg.matrix_power(invD0, i[k]) JM = JM @ (np.math.factorial(i[k]) * invD0_power) @ P_power # Final term pie = map_pie(D0, D1) invD0_power_final = np.linalg.matrix_power(invD0, i[K - 1]) e = np.ones(D0.shape[0]) result = pie @ JM @ (np.math.factorial(i[K - 1]) * invD0_power_final) @ e return float(result)
[docs] def map_issym(D0: np.ndarray, D1: np.ndarray = None) -> bool: """ Check if MAP contains symbolic elements. In Python, this always returns False as we use numpy arrays. Args: D0: Hidden transition matrix D1: Visible transition matrix (optional) Returns: False (Python arrays are always numeric) """ return False
[docs] def map_feastol() -> int: """ Get tolerance for feasibility checks. Returns: Tolerance exponent (10^(-feastol)) """ return 8
[docs] def map_feasblock(E1: float, E2: float, E3: float, G2: float, opt: str = '') -> Tuple[np.ndarray, np.ndarray]: """ Fit the most similar feasible MAP(2). Args: E1: First moment (mean) E2: Second moment (or SCV if opt='scv') E3: Third moment G2: Autocorrelation decay rate opt: 'scv' if E2 is SCV instead of second moment Returns: Feasible MAP (D0, D1) """ if opt.lower() == 'scv': E2 = (1 + E2) * E1 ** 2 # Exponential case if abs(E2 - 2 * E1 ** 2) < 1e-10: D0 = np.array([[-1.0, 0], [0, -1.0]]) D1 = np.array([[0.5, 0.5], [0.5, 0.5]]) return map_scale(D0, D1, 1.0 / E1) KPC_TOL = 1e-10 if E2 <= 2 * E1 ** 2: E2 = (1 + KPC_TOL) * E1 ** 2 if E3 <= (3 / 2) * E2 ** 2 / E1: E3 = (3 / 2 + KPC_TOL) * E2 ** 2 / E1 return map_block(E1, E2, E3, G2)
[docs] def map_block(E1: float, E2: float, E3: float, G2: float, opt: str = '') -> Tuple[np.ndarray, np.ndarray]: """ Construct a MAP(2) from moments and autocorrelation. Args: E1: First moment E2: Second moment (or SCV if opt='scv') E3: Third moment G2: Autocorrelation decay rate opt: 'scv' if E2 is SCV Returns: MAP (D0, D1) """ if opt.lower() == 'scv': E2 = (1 + E2) * E1 ** 2 SCV = (E2 - E1 ** 2) / E1 ** 2 if SCV >= 1: # Hyperexponential case mu1 = E1 - 0.5 * np.sqrt(max(0, 2 * E2 - 4 * E1 ** 2)) mu2 = E1 + 0.5 * np.sqrt(max(0, 2 * E2 - 4 * E1 ** 2)) mu1 = max(1e-10, np.real(mu1)) mu2 = max(1e-10, np.real(mu2)) p = 0.5 - 0.5 * G2 D0 = np.array([[-1 / mu1, 0], [0, -1 / mu2]]) D1 = np.array([[(1 - p) / mu1, p / mu1], [p / mu2, (1 - p) / mu2]]) else: # Erlang-like case for SCV < 1 mu = 2 / E1 D0 = np.array([[-mu, mu * 0.5], [0, -mu]]) D1 = np.array([[0, 0], [mu, 0]]) return _map_normalize_generator(D0, D1)
[docs] def map_largemap() -> int: """ Get threshold for "large" MAP where exact computation is expensive. Returns: Order threshold (default: 100) """ return 100
[docs] def map2_fit(e1: float, e2: float, e3: float = -1.0, g2: float = 0.0 ) -> Tuple[Optional[Tuple[np.ndarray, np.ndarray]], int]: """ Fit a MAP(2) distribution to moments and autocorrelation. Based on: A. Heindl, G. Horvath, K. Gross "Explicit inverse characterization of acyclic MAPs of second order" Args: e1: First moment E[X] e2: Second moment E[X^2] e3: Third moment E[X^3], or -1 for automatic selection, -2 for minimum, -3 for maximum, or negative fraction for interpolation g2: Autocorrelation decay rate (gamma_2) Returns: Tuple of (MAP, error_code) where: - MAP: Tuple (D0, D1) if successful, None if failed - error_code: 0 for success, >0 for various errors Error codes: 0: Success 10: Mean out of bounds 20: Correlated exponential 30: h2 out of bounds 40: h3 out of bounds 51-54: g2 out of bounds """ TOL = 1e-10 r1 = e1 r2 = e2 / 2 h2 = (r2 - r1 ** 2) / r1 ** 2 # Handle special cases for e3 scv = (e2 - e1 ** 2) / e1 ** 2 if e3 == -1: # Select e3 that maximizes the range of correlations if 1 <= scv < 3: if g2 < 0: h3 = h2 - h2 ** 2 e3 = 12 * e1 ** 3 * h2 + 6 * e1 ** 3 * h3 + 6 * e1 ** 3 * (1 + h2 ** 2) else: e3 = (3 / 2 + 1e-3) * e2 ** 2 / e1 elif scv >= 3: e3 = (3 / 2 + 1e-3) * e2 ** 2 / e1 elif 0 < scv < 1: e3 = (1 + TOL) * (12 * e1 ** 3 * h2 + 6 * e1 ** 3 * ( h2 * (1 - h2 - 2 * np.sqrt(-h2))) + 6 * e1 ** 3 * (1 + h2 ** 2)) elif e3 == -2: # Select minimum e3 if scv >= 1: e3 = (3 / 2 + 1e-6) * e2 ** 2 / e1 elif 0 < scv < 1: h3 = h2 * (1 - h2 - 2 * np.sqrt(-h2)) e3 = 6 * e1 ** 3 * (h2 ** 2 + h3) elif e3 == -3: # Select maximum e3 if scv >= 1: e3 = 1e6 elif 0 < scv < 1: h3 = (-h2) ** 2 e3 = 6 * e1 ** 3 * (h2 ** 2 + h3) elif e3 == -4: # Select random e3 r = np.random.rand() if scv >= 1: e3 = r * (3 / 2 + 1e-6) * e2 ** 2 / e1 + (1 - r) * 1e6 elif 0 < scv < 1: h3 = r * (-h2) ** 2 + (1 - r) * h2 * (1 - h2 - 2 * np.sqrt(-h2)) e3 = 6 * e1 ** 3 * (h2 ** 2 + h3) elif -1 < e3 < 0: # Use a custom random e3 r = abs(e3) if scv >= 1: e3 = r * (3 / 2 + 1e-6) * e2 ** 2 / e1 + (1 - r) * 1e6 elif 0 < scv < 1: h3 = r * h2 * (1 - h2 - 2 * np.sqrt(-h2)) + (1 - r) * (-h2) ** 2 e3 = 6 * e1 ** 3 * (h2 ** 2 + h3) r3 = e3 / 6 h3 = (r3 * r1 - r2 ** 2) / r1 ** 4 b = h3 + h2 ** 2 - h2 c = np.sqrt(b ** 2 + 4 * h2 ** 3 + 0j) # Complex sqrt if r1 <= 0: return None, 10 # Mean out of bounds if h2 == 0: if h3 == 0 and g2 == 0: return map_exponential(e1), 0 else: return None, 20 # Correlated exponential if h2 > 0 and h3 > 0: # Hyperexponential case if np.real(b) >= 0: bc_ratio = np.real(b / c) if np.abs(c) > TOL else 0 lower_bound = (b - c) / (b + c) if np.abs(b + c) > TOL else -1 if np.real(lower_bound) <= g2 < 1: D0 = (1 / (2 * r1 * h3)) * np.array([ [-(2 * h2 + b - c), 0], [0, -(2 * h2 + b + c)] ]) D1 = (1 / (4 * r1 * h3)) * np.array([ [(2 * h2 + b - c) * (1 - bc_ratio + g2 * (1 + bc_ratio)), (2 * h2 + b - c) * (1 + bc_ratio) * (1 - g2)], [(2 * h2 + b + c) * (1 - bc_ratio) * (1 - g2), (2 * h2 + b + c) * (1 + bc_ratio + g2 * (1 - bc_ratio))] ]) D0 = np.real(D0) D1 = np.real(D1) return (D0, D1), 0 else: return None, 51 # g2 out of bounds else: # b < 0 if 0 <= g2 < 1: bc_ratio = np.real(b / c) if np.abs(c) > TOL else 0 D0 = (1 / (2 * r1 * h3)) * np.array([ [-(2 * h2 + b - c), 0], [0, -(2 * h2 + b + c)] ]) D1 = (1 / (4 * r1 * h3)) * np.array([ [(2 * h2 + b - c) * (1 - bc_ratio + g2 * (1 + bc_ratio)), (2 * h2 + b - c) * (1 + bc_ratio) * (1 - g2)], [(2 * h2 + b + c) * (1 - bc_ratio) * (1 - g2), (2 * h2 + b + c) * (1 + bc_ratio + g2 * (1 - bc_ratio))] ]) D0 = np.real(D0) D1 = np.real(D1) return (D0, D1), 0 elif -(h3 + h2 ** 2) / h2 <= g2 < 0: a = (h3 + h2 ** 2) / h2 d1 = ((1 - a) * (2 * h2 * g2 + b - c) + g2 * (b + c) - (b - c)) / ( (1 - a) * (2 * h2 + b - c) + 2 * c) d2 = ((g2 - 1) * (b - c)) / ((1 - a) * (2 * h2 + b - c) + 2 * c) D0 = (1 / (2 * r1 * h3)) * np.array([ [-(2 * h2 + b - c), (2 * h2 + b - c) * (1 - a)], [0, -(2 * h2 + b + c)] ]) D1 = (1 / (2 * r1 * h3)) * np.array([ [(2 * h2 + b - c) * d1, (2 * h2 + b - c) * (a - d1)], [(2 * h2 + b + c) * d2, (2 * h2 + b + c) * (1 - d2)] ]) D0 = np.real(D0) D1 = np.real(D1) return (D0, D1), 0 else: return None, 52 # g2 out of bounds elif -1 / 4 <= h2 < 0 and h2 * (1 - h2 - 2 * np.sqrt(-h2)) <= h3 <= -h2 ** 2: # Hypoexponential case c = -c # Flip sign for hypo case if g2 >= 0: upper_bound = -(h2 + np.sqrt(-h3)) ** 2 / h2 if h2 != 0 else np.inf if g2 <= np.real(upper_bound): a = (2 * h2 + b - c) * (h2 + np.sqrt(-h3)) / (2 * h2 * np.sqrt(-h3)) d1 = ((1 - a) * (2 * h2 * g2 + b - c) + g2 * (b + c) - (b - c)) / ( (1 - a) * (2 * h2 + b - c) + 2 * c) d2 = ((g2 - 1) * (b - c)) / ((1 - a) * (2 * h2 + b - c) + 2 * c) D0 = (1 / (2 * r1 * h3)) * np.array([ [-(2 * h2 + b - c), (2 * h2 + b - c) * (1 - a)], [0, -(2 * h2 + b + c)] ]) D1 = (1 / (2 * r1 * h3)) * np.array([ [(2 * h2 + b - c) * d1, (2 * h2 + b - c) * (a - d1)], [(2 * h2 + b + c) * d2, (2 * h2 + b + c) * (1 - d2)] ]) D0 = np.real(D0) D1 = np.real(D1) return (D0, D1), 0 else: return None, 53 # g2 out of bounds else: # g2 < 0 lower_bound = -(h3 + h2 ** 2) / h2 if h2 != 0 else -np.inf if g2 >= np.real(lower_bound): a = (h3 + h2 ** 2) / h2 d1 = ((1 - a) * (2 * h2 * g2 + b - c) + g2 * (b + c) - (b - c)) / ( (1 - a) * (2 * h2 + b - c) + 2 * c) d2 = ((g2 - 1) * (b - c)) / ((1 - a) * (2 * h2 + b - c) + 2 * c) D0 = (1 / (2 * r1 * h3)) * np.array([ [-(2 * h2 + b - c), (2 * h2 + b - c) * (1 - a)], [0, -(2 * h2 + b + c)] ]) D1 = (1 / (2 * r1 * h3)) * np.array([ [(2 * h2 + b - c) * d1, (2 * h2 + b - c) * (a - d1)], [(2 * h2 + b + c) * d2, (2 * h2 + b + c) * (1 - d2)] ]) D0 = np.real(D0) D1 = np.real(D1) return (D0, D1), 0 else: return None, 54 # g2 out of bounds else: if not (-1 / 4 <= h2 < 0): return None, 30 # h2 out of bounds else: return None, 40 # h3 out of bounds
__all__ = [ 'map_infgen', 'map_piq', 'map_pie', 'map_lambda', 'map_mean', 'map_var', 'map_scv', 'map_moment', 'map_scale', 'map_normalize', 'map_isfeasible', # MAP constructors (rate-based API) 'exp_map', 'erlang_map', 'hyperexp_map', # MAP constructors (MATLAB-style mean-based API) 'map_exponential', 'map_erlang', 'map_hyperexp', 'map_gamma', # MAP operations 'map_sumind', 'map_cdf', 'map_pdf', 'map_sample', # Statistics functions 'map_skew', 'map_kurt', 'map_acf', 'map_acfc', 'map_idc', # Count process functions 'map_count_mean', 'map_count_var', 'map_varcount', 'map_count_moment', # Constructor functions 'map_mmpp2', 'map_gamma2', 'map_rand', 'map_randn', 'map_renewal', # Operation functions 'map_embedded', 'map_sum', 'map_super', 'map_mixture', 'map_max', 'map_timereverse', 'map_mark', 'map_stochcomp', # Fitting functions 'map_kpc', 'map_bernstein', 'map_pntiter', 'map_pntquad', 'map2_fit', # Utility functions 'map_joint', 'map_issym', 'map_feastol', 'map_feasblock', 'map_block', 'map_largemap', ]