Source code for line_solver.api.trace.trace_analysis

"""
Trace Analysis Functions.

Native Python implementations for statistical analysis of empirical trace data.
Provides functions for computing means, variances, correlations, and other
statistical measures from measurement traces.
"""

import numpy as np
from typing import Union, Tuple, Optional

ArrayLike = Union[np.ndarray, list]


[docs] def trace_mean(trace: ArrayLike) -> float: """ Compute the arithmetic mean of trace data. Args: trace: Array of trace values. Returns: Mean value of the trace. Example: >>> trace_mean([1.0, 2.0, 3.0, 4.0, 5.0]) 3.0 """ trace = np.asarray(trace, dtype=np.float64).ravel() return float(np.mean(trace))
[docs] def trace_var(trace: ArrayLike) -> float: """ Compute the variance of trace data. Uses population variance (ddof=0) for consistency with Kotlin. Args: trace: Array of trace values. Returns: Variance of the trace. Example: >>> trace_var([1.0, 2.0, 3.0, 4.0, 5.0]) 2.0 """ trace = np.asarray(trace, dtype=np.float64).ravel() e1 = np.mean(trace) e2 = np.mean(trace ** 2) return float(e2 - e1 * e1)
[docs] def trace_scv(trace: ArrayLike) -> float: """ Compute the squared coefficient of variation (SCV). SCV = Var(X) / E[X]^2 Args: trace: Array of trace values. Returns: Squared coefficient of variation. Example: >>> trace_scv([1.0, 2.0, 3.0]) # Var=0.667, Mean=2, SCV=0.167 """ mean = trace_mean(trace) var = trace_var(trace) return float(var / (mean * mean)) if mean != 0 else 0.0
def _autocov(X: np.ndarray) -> np.ndarray: """Compute autocovariance using direct method.""" M = len(X) acv = np.zeros(M - 1) for p in range(M - 1): acv[p] = np.mean(X[:M - p] * X[p:]) return acv
[docs] def trace_acf(trace: ArrayLike, lags: ArrayLike = None) -> np.ndarray: """ Compute the autocorrelation function at specified lags. Args: trace: Array of trace values. lags: Array of lag values (default: [1]). Returns: Array of autocorrelation values at each lag. Example: >>> trace = np.random.randn(100) >>> acf = trace_acf(trace, [1, 2, 3]) """ trace = np.asarray(trace, dtype=np.float64).ravel() if lags is None: lags = np.array([1]) lags = np.asarray(lags, dtype=np.int32).ravel() max_lag = np.max(lags) if max_lag > len(trace) - 2: # Filter out lags that are too large valid_lags = lags[(lags <= len(trace) - 2) & (lags > 0)] if len(valid_lags) == 0: return np.array([]) lags = valid_lags mean = trace_mean(trace) centered = trace - mean autocov = _autocov(centered) rho = np.zeros(len(lags)) for i, lag in enumerate(lags): if lag < len(autocov): rho[i] = autocov[lag] / autocov[0] if autocov[0] != 0 else 0.0 return rho
[docs] def trace_gamma(trace: ArrayLike, limit: int = 1000) -> np.ndarray: """ Estimate the autocorrelation decay rate of a trace. Args: trace: Array of trace values. limit: Maximum lag considered. Returns: Array containing [GAMMA, RHO0, RESIDUALS]. Example: >>> gamma, rho0, residuals = trace_gamma(trace_data) """ trace = np.asarray(trace, dtype=np.float64).ravel() M1 = trace_mean(trace) M2 = np.mean(trace ** 2) max_lag = min(limit, len(trace) - 1) lag = np.arange(1, max_lag + 1) rho = trace_acf(trace, lag) VAR = M2 - M1 * M1 SCV = VAR / (M1 * M1) if M1 != 0 else 1.0 RHO0 = 0.5 * (1.0 - 1.0 / SCV) if SCV != 0 else 0.0 # Grid search for best gamma best_gamma = 0.99 min_residuals = np.inf for gamma_int in range(990, 1000): g = gamma_int / 1000.0 expected = RHO0 * (g ** lag[:len(rho)]) residuals = np.sum((rho - expected) ** 2) if residuals < min_residuals: min_residuals = residuals best_gamma = g return np.array([best_gamma, RHO0, min_residuals])
[docs] def trace_iat2counts(trace: ArrayLike, scale: float) -> np.ndarray: """ Compute the counting process from inter-arrival times. Args: trace: Array of inter-arrival times. scale: Time scale for counting. Returns: Array of counts after `scale` units of time from each arrival. Example: >>> iat = [0.5, 0.3, 0.8, 0.2, 0.4] >>> counts = trace_iat2counts(iat, 1.0) """ S = np.asarray(trace, dtype=np.float64).ravel() n = len(S) # Cumulative sum with 0 at start CS = np.zeros(n + 1) CS[1:] = np.cumsum(S) C = [] for i in range(n - 1): cur = i while cur + 1 < n and CS[cur + 1] - CS[i] <= scale: cur += 1 C.append(cur - i) if cur == n - 1: break return np.array(C, dtype=np.int32)
[docs] def trace_idi(trace: ArrayLike, kset: ArrayLike, option: str = None, n: int = 1 ) -> Tuple[np.ndarray, np.ndarray]: """ Compute the Index of Dispersion for Intervals. Args: trace: Array of trace values. kset: Set of k values to compute IDI for. option: Aggregation option (None, 'aggregate', 'aggregate-mix'). n: Aggregation parameter. Returns: Tuple of (IDI values, support values). Example: >>> idi, support = trace_idi(trace_data, [10, 20, 50]) """ S = np.asarray(trace, dtype=np.float64).ravel() kset = np.asarray(kset, dtype=np.int32).ravel() IDIk = [] support = [] for k in kset: if option is None: support_val = len(S) - k - 1 if support_val <= 0: continue support.append(support_val) Sk = np.zeros(len(S) - k) for t in range(len(S) - k - 1): Sk[t] = np.sum(S[t:t + k]) variance = trace_var(Sk) mean = trace_mean(Sk) IDIk.append(k * variance / (mean * mean) if mean != 0 else 0.0) elif option == 'aggregate': keff = k // n if keff <= 0: continue support_val = len(S) // keff support.append(support_val) Sk = np.zeros(len(S) - keff) for t in range(len(S) - keff - 1): Sk[t] = np.sum(S[t:t + keff]) variance = trace_var(Sk) mean = trace_mean(Sk) IDIk.append(k * variance / (mean * mean) if mean != 0 else 0.0) return np.array(IDIk), np.array(support, dtype=np.int32)
[docs] def trace_idc(trace: ArrayLike) -> float: """ Compute the Index of Dispersion for Counts. Asymptotically equal to IDI. Args: trace: Array of trace values. Returns: IDC value. Example: >>> idc = trace_idc(inter_arrival_times) """ S = np.asarray(trace, dtype=np.float64).ravel() k = min(1000, len(S) // 30) if k < 1: k = 1 idi, _ = trace_idi(S, [k]) return float(idi[0]) if len(idi) > 0 else 0.0
[docs] def trace_pmf(X: ArrayLike) -> Tuple[np.ndarray, np.ndarray]: """ Compute the probability mass function of discrete data. Args: X: Array of discrete values. Returns: Tuple of (PMF values, unique values). Example: >>> pmf, values = trace_pmf([1, 2, 2, 3, 3, 3]) """ X = np.asarray(X, dtype=np.int32).ravel() unique_values, counts = np.unique(X, return_counts=True) pmf = counts / len(X) return pmf, unique_values
[docs] def trace_shuffle(trace: ArrayLike) -> np.ndarray: """ Shuffle trace data randomly. Args: trace: Array of trace values. Returns: Shuffled trace array. Example: >>> shuffled = trace_shuffle([1, 2, 3, 4, 5]) """ trace = np.asarray(trace, dtype=np.float64).ravel() result = trace.copy() np.random.shuffle(result) return result
[docs] def trace_joint(trace: ArrayLike, lag: ArrayLike, order: ArrayLike) -> float: """ Compute joint moments E[X^{k_1}_{i} * X^{k_2}_{i+j} * ...]. Args: trace: Array of trace values. lag: Cumulative lag values. order: Moment orders. Returns: Joint moment value. Example: >>> jm = trace_joint(trace, [0, 1], [1, 1]) # E[X_i * X_{i+1}] """ S = np.asarray(trace, dtype=np.float64).ravel() lag = np.asarray(lag, dtype=np.int32).ravel() order = np.asarray(order, dtype=np.int32).ravel() sorted_lag = np.sort(lag) K = len(sorted_lag) base_lag = sorted_lag[0] adjusted_lag = sorted_lag - base_lag max_lag = np.max(adjusted_lag) valid_length = len(S) - max_lag if valid_length <= 0: return 0.0 result = 0.0 for i in range(valid_length): product = 1.0 for j, ord_val in enumerate(order): idx = i + adjusted_lag[min(j, len(adjusted_lag) - 1)] if idx < len(S): product *= S[idx] ** ord_val result += product return float(result / valid_length)
[docs] def trace_iat2bins(trace: ArrayLike, scale: float) -> Tuple[np.ndarray, np.ndarray]: """ Compute counts in bins with specified timescale. Args: trace: Array of inter-arrival times. scale: Bin timescale. Returns: Tuple of (counts per bin, bin membership for each element). Example: >>> counts, bins = trace_iat2bins(iat_data, 1.0) """ S = np.asarray(trace, dtype=np.float64).ravel() n = len(S) CS = np.zeros(n + 1) CS[1:] = np.cumsum(S) num_bins = int(np.ceil((CS[n] - CS[0]) / scale)) C = np.zeros(num_bins, dtype=np.int32) bC = [] cur = 0 last = 0 for i in range(num_bins): if cur >= n - 1: break while cur < n - 1 and CS[cur + 1] <= (i + 1) * scale: cur += 1 C[i] = cur - last for _ in range(cur - last): bC.append(i) last = cur return C, np.array(bC, dtype=np.int32)
def _percentile(sorted_data: np.ndarray, p: float) -> float: """Compute percentile of sorted data.""" index = (p / 100.0) * (len(sorted_data) - 1) lower = int(np.floor(index)) upper = int(np.ceil(index)) if lower == upper: return float(sorted_data[lower]) weight = index - lower return float(sorted_data[lower] * (1 - weight) + sorted_data[upper] * weight)
[docs] def trace_summary(trace: ArrayLike) -> np.ndarray: """ Compute comprehensive summary statistics for a trace. Args: trace: Array of trace values. Returns: Array containing [MEAN, SCV, MAD, SKEW, KURT, Q25, Q50, Q75, P95, MIN, MAX, IQR, ACF1, ACF2, ACF3, ACF4, IDC_SCV_RATIO]. Example: >>> summary = trace_summary(trace_data) >>> print(f"Mean: {summary[0]}, SCV: {summary[1]}") """ m = np.asarray(trace, dtype=np.float64).ravel() mean = trace_mean(m) scv = trace_scv(m) sorted_m = np.sort(m) # Percentiles q25 = _percentile(sorted_m, 25.0) q50 = _percentile(sorted_m, 50.0) # median q75 = _percentile(sorted_m, 75.0) p95 = _percentile(sorted_m, 95.0) min_val = float(sorted_m[0]) max_val = float(sorted_m[-1]) iqr = q75 - q25 # MAD (median absolute deviation) mad = float(np.median(np.abs(m - q50))) # Skewness and kurtosis variance = trace_var(m) std = np.sqrt(variance) if variance > 0 else 1.0 z = (m - mean) / std skew = float(np.mean(z ** 3)) kurt = float(np.mean(z ** 4) - 3.0) # excess kurtosis # ACF for lags 1-4 acf = trace_acf(m, [1, 2, 3, 4]) if len(acf) < 4: acf = np.concatenate([acf, np.zeros(4 - len(acf))]) # IDC/SCV ratio idc = trace_idc(m) idc_scv_ratio = idc / scv if scv != 0 else 0.0 return np.array([ mean, scv, mad, skew, kurt, q25, q50, q75, p95, min_val, max_val, iqr, acf[0], acf[1], acf[2], acf[3], idc_scv_ratio ])
[docs] def mtrace_mean(trace: ArrayLike, ntypes: int, types: ArrayLike) -> np.ndarray: """ Compute the mean of a trace, divided by types. Args: trace: Array of trace values. ntypes: Number of different types. types: Array indicating the type of each element. Returns: Array containing the mean values for each type. Example: >>> trace = [1.0, 2.0, 3.0, 4.0] >>> types = [0, 1, 0, 1] >>> means = mtrace_mean(trace, 2, types) """ trace = np.asarray(trace, dtype=np.float64).ravel() types = np.asarray(types, dtype=np.int32).ravel() mean = np.zeros(ntypes) for c in range(ntypes): mask = types == c if np.any(mask): mean[c] = np.mean(trace[mask]) else: mean[c] = np.nan return mean
[docs] def mtrace_var(trace: ArrayLike, ntypes: int, types: ArrayLike) -> np.ndarray: """ Compute the variance of a trace, divided by types. Args: trace: Array of trace values. ntypes: Number of different types. types: Array indicating the type of each element. Returns: Array containing the variance values for each type. Example: >>> var_by_type = mtrace_var(trace, 2, types) """ trace = np.asarray(trace, dtype=np.float64).ravel() types = np.asarray(types, dtype=np.int32).ravel() var = np.zeros(ntypes) for c in range(ntypes): mask = types == c if np.sum(mask) > 1: class_trace = trace[mask] e1 = np.mean(class_trace) e2 = np.mean(class_trace ** 2) var[c] = e2 - e1 * e1 else: var[c] = np.nan return var
[docs] def mtrace_count(trace: ArrayLike, ntypes: int, types: ArrayLike) -> np.ndarray: """ Count elements per type in a trace. Args: trace: Array of trace values. ntypes: Number of different types. types: Array indicating the type of each element. Returns: Array containing counts for each type. Example: >>> counts = mtrace_count(trace, 2, types) """ types = np.asarray(types, dtype=np.int32).ravel() counts = np.zeros(ntypes, dtype=np.int32) for c in range(ntypes): counts[c] = np.sum(types == c) return counts
[docs] def mtrace_sigma(T: ArrayLike, L: ArrayLike) -> np.ndarray: """ Compute one-step class transition probabilities from a marked trace. Computes P(C_k = j | C_{k-1} = i) empirically from the trace. Args: T: Array of inter-arrival times (unused, for API consistency) L: Array of class labels Returns: C x C matrix where element (i,j) is the probability of observing class j after class i. """ L = np.asarray(L).ravel() marks = np.unique(L) C = len(marks) sigma = np.zeros((C, C)) for i in range(C): for j in range(C): count = np.sum((L[:-1] == marks[i]) & (L[1:] == marks[j])) sigma[i, j] = count / (len(L) - 1) return sigma
[docs] def mtrace_sigma2(T: ArrayLike, L: ArrayLike) -> np.ndarray: """ Compute two-step class transition probabilities from a marked trace. Computes P(C_k = h | C_{k-1} = j, C_{k-2} = i) empirically. Args: T: Array of inter-arrival times (unused, for API consistency) L: Array of class labels Returns: C x C x C 3D array of transition probabilities. """ L = np.asarray(L).ravel() marks = np.unique(L) C = len(marks) sigma = np.zeros((C, C, C)) for i in range(C): for j in range(C): for h in range(C): count = np.sum((L[:-2] == marks[i]) & (L[1:-1] == marks[j]) & (L[2:] == marks[h])) sigma[i, j, h] = count / (len(L) - 2) return sigma
[docs] def mtrace_cross_moment(T: ArrayLike, L: ArrayLike, k: int) -> np.ndarray: """ Compute the k-th order moment of inter-arrival times between class pairs. Args: T: Array of inter-arrival times L: Array of class labels k: Order of the moment Returns: C x C matrix where element (i,j) is E[T^k | C_{t-1} = i, C_t = j] """ T = np.asarray(T, dtype=np.float64).ravel() L = np.asarray(L).ravel() marks = np.unique(L) C = len(marks) MC = np.zeros((C, C)) count = np.zeros((C, C)) for t in range(1, len(T)): for i in range(C): for j in range(C): if L[t - 1] == marks[i] and L[t] == marks[j]: MC[i, j] += T[t] ** k count[i, j] += 1 # Avoid division by zero with np.errstate(divide='ignore', invalid='ignore'): MC = np.where(count > 0, MC / count, np.nan) return MC
[docs] def mtrace_forward_moment(T: ArrayLike, A: ArrayLike, orders: ArrayLike, norm: bool = True) -> np.ndarray: """ Compute forward moments of a marked trace. Forward moment for class c is E[T_{k+1}^order | C_k = c]. Args: T: Array of inter-arrival times A: Array of class labels orders: Array of moment orders to compute norm: If True, normalize by class probability Returns: Matrix of shape (C, len(orders)) containing forward moments. """ T = np.asarray(T, dtype=np.float64).ravel() A = np.asarray(A).ravel() orders = np.asarray(orders).ravel() marks = np.unique(A) C = len(marks) M = np.zeros((C, len(orders))) for j, k in enumerate(orders): for c in range(C): mask = A[:-1] == marks[c] if np.any(mask): M[c, j] = np.mean((T[1:] ** k) * mask) if norm: M[c, j] = M[c, j] * (len(T) - 1) / np.sum(mask) else: M[c, j] = np.nan return M
[docs] def mtrace_backward_moment(T: ArrayLike, A: ArrayLike, orders: ArrayLike, norm: bool = True) -> np.ndarray: """ Compute backward moments of a marked trace. Backward moment for class c is E[T_k^order | C_k = c]. Args: T: Array of inter-arrival times A: Array of class labels orders: Array of moment orders to compute norm: If True, normalize by class probability Returns: Matrix of shape (C, len(orders)) containing backward moments. """ T = np.asarray(T, dtype=np.float64).ravel() A = np.asarray(A).ravel() orders = np.asarray(orders).ravel() marks = np.unique(A) C = len(marks) M = np.zeros((C, len(orders))) for j, k in enumerate(orders): for c in range(C): mask = A == marks[c] if np.any(mask): M[c, j] = np.mean((T ** k) * mask) if norm: M[c, j] = M[c, j] * len(T) / np.sum(mask) else: M[c, j] = np.nan return M
[docs] def mtrace_cov(T: ArrayLike, A: ArrayLike) -> np.ndarray: """ Compute lag-1 covariance between classes in a marked trace. Args: T: Array of inter-arrival times A: Array of class labels Returns: C x C matrix of 2x2 covariance matrices. """ T = np.asarray(T, dtype=np.float64).ravel() A = np.asarray(A).ravel() C = int(np.max(A)) N = len(A) # Return list of covariance matrices COV = [[None for _ in range(C)] for _ in range(C)] for c1 in range(C): for c2 in range(C): X0c1v = np.zeros(N - 1) X1c2v = np.zeros(N - 1) for i in range(N - 1): if A[i] == c1 + 1: # 1-indexed classes X0c1v[i] = T[i] if A[i + 1] == c2 + 1: X1c2v[i] = T[i + 1] COV[c1][c2] = np.cov(X0c1v, X1c2v) return COV
[docs] def mtrace_pc(T: ArrayLike, L: ArrayLike) -> np.ndarray: """ Compute the probability of arrival for each class. Args: T: Array of inter-arrival times (unused, for API consistency) L: Array of class labels Returns: Array of class probabilities. """ L = np.asarray(L).ravel() labels = np.unique(L) m = len(labels) pc = np.zeros(m) for i in range(m): pc[i] = np.sum(L == labels[i]) / len(L) return pc
[docs] def mtrace_summary(T: ArrayLike, L: ArrayLike) -> dict: """ Compute comprehensive summary statistics for a marked trace. Args: T: Array of inter-arrival times L: Array of class labels Returns: Dictionary containing: - M: First 5 aggregate moments - ACF: Autocorrelation for lags 1-100 - F1, F2: Forward moments of order 1 and 2 - B1, B2: Backward moments of order 1 and 2 - C1, C2: Cross moments of order 1 and 2 - Pc: Class probabilities - Pab: One-step transition probabilities """ T = np.asarray(T, dtype=np.float64).ravel() L = np.asarray(L).ravel() summary = { 'M': np.array([np.mean(T ** k) for k in range(1, 6)]), 'ACF': trace_acf(T, np.arange(1, 101)), 'F1': mtrace_forward_moment(T, L, [1]), 'F2': mtrace_forward_moment(T, L, [2]), 'B1': mtrace_backward_moment(T, L, [1]), 'B2': mtrace_backward_moment(T, L, [2]), 'C1': mtrace_cross_moment(T, L, 1), 'C2': mtrace_cross_moment(T, L, 2), 'Pc': mtrace_pc(T, L), 'Pab': mtrace_sigma(T, L), } return summary
[docs] def mtrace_split(T: ArrayLike, L: ArrayLike) -> list: """ Split a multi-class trace into per-class traces. For each class, computes inter-arrival times between consecutive events of that class. Args: T: Array of inter-arrival times L: Array of class labels Returns: List of arrays, one per class, containing inter-arrival times. """ T = np.asarray(T, dtype=np.float64).ravel() L = np.asarray(L).ravel() labels = np.unique(L) C = len(labels) TCUM = np.cumsum(T) TL = [] for c in range(C): mask = L == labels[c] cum_times = np.concatenate([[0], TCUM[mask]]) TL.append(np.diff(cum_times)) return TL
[docs] def mtrace_merge(t1: ArrayLike, t2: ArrayLike) -> Tuple[np.ndarray, np.ndarray]: """ Merge two traces into a single marked (multi-class) trace. Args: t1: Inter-arrival times of the first trace t2: Inter-arrival times of the second trace Returns: Tuple of (T, L) where: - T: Merged inter-arrival times - L: Class labels (1 for t1, 2 for t2) """ t1 = np.asarray(t1, dtype=np.float64).ravel() t2 = np.asarray(t2, dtype=np.float64).ravel() # Compute cumulative times cum1 = np.cumsum(t1) cum2 = np.cumsum(t2) # Combine and sort all_times = np.concatenate([[0], cum1, cum2]) sorted_indices = np.argsort(all_times) sorted_times = all_times[sorted_indices] # Compute inter-arrival times T = np.diff(sorted_times) # Assign labels IDX = sorted_indices[1:] # Skip the leading 0 L = np.zeros(len(T), dtype=np.int32) # Label 1 for t1 events (indices 1 to len(t1)) # Label 2 for t2 events (indices len(t1)+1 onwards) L[(IDX >= 1) & (IDX <= len(t1))] = 1 L[(IDX > len(t1))] = 2 return T, L
[docs] def mtrace_joint(T: ArrayLike, A: ArrayLike, i: ArrayLike) -> np.ndarray: """ Compute class-dependent joint moments. Computes E[(X^(a)_j)^i[0] * (X^(a)_{j+1})^i[1]] for all classes a. Args: T: Array of inter-arrival times A: Array of class labels i: Array of exponents [i0, i1] Returns: Array of joint moments, one per class. """ T = np.asarray(T, dtype=np.float64).ravel() A = np.asarray(A).ravel() i = np.asarray(i).ravel() C = int(np.max(A)) N = len(A) JM = np.zeros(C) for a in range(1, C + 1): # Events of class a, excluding first and last Ta = T[(A[1:-1] == a)] Na = len(Ta) if Na == 0: JM[a - 1] = np.nan continue tmp = 0.0 for j in range(N - 2): if A[j + 1] == a: tmp += (T[j] ** i[0]) * (T[j + 1] ** i[1]) JM[a - 1] = tmp / Na return JM
[docs] def mtrace_moment(T: ArrayLike, A: ArrayLike, orders: ArrayLike, after: bool = False, norm: bool = False) -> np.ndarray: """ Compute class-dependent moments of a multi-class trace. Args: T: Array of inter-arrival times A: Array of class labels orders: Array of moment orders to compute after: If True, compute moments of Bucholz variables (forward) If False, compute moments of Horvath variables (backward) norm: If True, normalize by class probability Returns: Matrix of shape (C, len(orders)) containing moments per class. """ T = np.asarray(T, dtype=np.float64).ravel() A = np.asarray(A).ravel() orders = np.asarray(orders).ravel() marks = np.unique(A) C = len(marks) M = np.zeros((C, len(orders))) for j, k in enumerate(orders): for c in range(C): if after: mask = A[:-1] == marks[c] if np.any(mask): M[c, j] = np.mean((T[1:] ** k) * mask) if norm: M[c, j] = M[c, j] * (len(T) - 1) / np.sum(mask) else: mask = A == marks[c] if np.any(mask): M[c, j] = np.mean((T ** k) * mask) if norm: M[c, j] = M[c, j] * len(T) / np.sum(mask) return M
[docs] def mtrace_moment_simple(T: ArrayLike, A: ArrayLike, k: int) -> np.ndarray: """ Simple interface to compute k-th order moments per class. Args: T: Array of inter-arrival times A: Array of class labels k: Order of the moment Returns: Array of k-th order moments, one per class. """ return mtrace_moment(T, A, [k], after=False, norm=True)[:, 0]
[docs] def mtrace_bootstrap(T: ArrayLike, A: ArrayLike, n_samples: int = 100, seed: int = None) -> Tuple[np.ndarray, np.ndarray]: """ Generate bootstrap samples from a marked trace. Args: T: Array of inter-arrival times A: Array of class labels n_samples: Number of bootstrap samples to generate seed: Random seed Returns: Tuple of (T_boot, A_boot) containing bootstrap samples. """ if seed is not None: np.random.seed(seed) T = np.asarray(T, dtype=np.float64).ravel() A = np.asarray(A).ravel() n = len(T) indices = np.random.choice(n, size=n_samples, replace=True) return T[indices], A[indices]
[docs] def mtrace_iat2counts(T: ArrayLike, L: ArrayLike, scale: float) -> np.ndarray: """ Compute counting process from marked inter-arrival times. For each class, counts arrivals in windows of specified scale. Args: T: Array of inter-arrival times L: Array of class labels scale: Time scale for counting Returns: Array of counts per class per window. """ T = np.asarray(T, dtype=np.float64).ravel() L = np.asarray(L).ravel() labels = np.unique(L) C = len(labels) # Split by class and compute counts TL = mtrace_split(T, L) counts = [] for c in range(C): if len(TL[c]) > 0: counts.append(trace_iat2counts(TL[c], scale)) else: counts.append(np.array([])) return counts
[docs] def trace_bicov(trace: ArrayLike, grid: ArrayLike) -> Tuple[np.ndarray, np.ndarray]: """ Compute bicovariance of a trace. Bicovariance measures third-order correlation structure at multiple lag combinations. Args: trace: Array of trace values. grid: Array of lags to form the grid (e.g., [1, 2, 3, 4, 5]). Returns: Tuple of (bicov, bicov_lags) where: - bicov: Array of bicovariance values - bicov_lags: 2D array of lag combinations (each row is [1, i, j]) Example: >>> trace = np.random.randn(1000) >>> bicov, lags = trace_bicov(trace, [1, 2, 3, 4, 5]) """ trace = np.asarray(trace, dtype=np.float64).ravel() grid = np.asarray(grid).ravel() bicov_lags = [] for i in grid: for j in grid: bicov_lags.append([1, int(i), int(j)]) bicov_lags = np.array(bicov_lags) bicov = np.zeros(len(bicov_lags)) for k, lags in enumerate(bicov_lags): bicov[k] = trace_joint(trace, lags, [1, 1, 1]) return bicov, bicov_lags
__all__ = [ # Single trace analysis 'trace_mean', 'trace_var', 'trace_scv', 'trace_acf', 'trace_gamma', 'trace_iat2counts', 'trace_idi', 'trace_idc', 'trace_pmf', 'trace_shuffle', 'trace_joint', 'trace_iat2bins', 'trace_summary', 'trace_bicov', # Multi-class trace analysis 'mtrace_mean', 'mtrace_var', 'mtrace_count', 'mtrace_sigma', 'mtrace_sigma2', 'mtrace_cross_moment', 'mtrace_forward_moment', 'mtrace_backward_moment', 'mtrace_cov', 'mtrace_pc', 'mtrace_summary', 'mtrace_split', 'mtrace_merge', 'mtrace_joint', 'mtrace_moment', 'mtrace_moment_simple', 'mtrace_bootstrap', 'mtrace_iat2counts', ]