Source code for line_solver.api.trace


"""
Trace data analysis functions.

This module provides functions for analyzing empirical trace data,
including statistical measures and transformations of time series
and event traces.

These functions support trace-driven modeling and empirical analysis
of system measurements and workload characterization.
"""

import jpype
import numpy as np
from line_solver import jlineMatrixToArray, jlineMatrixFromArray


[docs] def trace_mean(trace): """ Calculate the mean of a trace. Args: trace: Array of trace values Returns: float: Mean value of the trace """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_meanKt.trace_mean(java_trace) return float(result)
[docs] def trace_var(trace): """ Calculate the variance of a trace. Args: trace: Array of trace values Returns: float: Variance of the trace """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_var(java_trace) return float(result)
[docs] def mtrace_mean(trace, ntypes, type_array): """ Calculate mean of multi-type trace data. Computes the mean for each type in a multi-type trace dataset. Args: trace: Array of trace values ntypes: Number of different types in the trace type_array: Array indicating the type of each trace element Returns: numpy.ndarray: Mean values for each type """ java_trace = jpype.JArray(jpype.JDouble)(trace) java_type_array = jpype.JArray(jpype.JInt)(type_array) result = jpype.JPackage('jline').api.trace.Mtrace_meanKt.mtrace_mean( java_trace, jpype.JInt(ntypes), java_type_array ) return jlineMatrixToArray(result)
[docs] def trace_scv(trace): """ Calculate the squared coefficient of variation of a trace. Args: trace: Array of trace values Returns: float: Squared coefficient of variation (variance/mean²) """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_scv(java_trace) return float(result)
[docs] def trace_acf(trace, lags=None): """ Calculate the autocorrelation function of a trace. Args: trace: Array of trace values lags: List of lag values to compute autocorrelation for (default: [1]) Returns: numpy.ndarray: Autocorrelation values at specified lags """ java_trace = jpype.JArray(jpype.JDouble)(trace) if lags is None: lags = [1] java_lags = jpype.JArray(jpype.JInt)(lags) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_acf(java_trace, java_lags) return np.array([float(x) for x in result])
[docs] def trace_gamma(trace, limit=1000): """ Fit gamma distribution parameters to trace data. Args: trace: Array of trace values limit: Maximum number of iterations for fitting (default: 1000) Returns: tuple: (shape, scale, rate) parameters of fitted gamma distribution """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_gamma(java_trace, jpype.JInt(limit)) return (float(result[0]), float(result[1]), float(result[2]))
[docs] def trace_iat2counts(trace, scale): """ Convert inter-arrival times to count data. Args: trace: Array of inter-arrival times scale: Time scale for binning Returns: numpy.ndarray: Count values in each time bin """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_iat2counts(java_trace, jpype.JDouble(scale)) return np.array([int(x) for x in result])
[docs] def trace_idi(trace, kset, option=None, n=1): """ Calculate index of dispersion for intervals. Measures the variability of intervals in the trace data. Args: trace: Array of trace values kset: Array of interval lengths to analyze option: Analysis option (optional) n: Parameter for analysis method (default: 1) Returns: tuple: (idi_values, support_values) - IDI values and their support """ java_trace = jpype.JArray(jpype.JDouble)(trace) java_kset = jpype.JArray(jpype.JInt)(kset) if option is None: result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_idi(java_trace, java_kset) else: result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_idi( java_trace, java_kset, jpype.JString(option), jpype.JInt(n) ) idi_values = np.array([float(x) for x in result.getFirst()]) support_values = np.array([int(x) for x in result.getSecond()]) return (idi_values, support_values)
[docs] def trace_idc(trace): """ Calculate index of dispersion for counts. Args: trace: Array of trace values Returns: float: Index of dispersion for counts """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_idc(java_trace) return float(result)
[docs] def trace_pmf(data): """ Calculate probability mass function of discrete data. Args: data: Array of discrete data values Returns: tuple: (pmf_values, unique_values) - PMF and corresponding unique values """ java_data = jpype.JArray(jpype.JInt)([int(x) for x in data]) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_pmf(java_data) pmf_values = np.array([float(x) for x in result.getFirst()]) unique_values = np.array([int(x) for x in result.getSecond()]) return (pmf_values, unique_values)
[docs] def trace_shuffle(trace): """ Randomly shuffle trace data. Args: trace: Array of trace values Returns: numpy.ndarray: Shuffled trace data """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_shuffle(java_trace) return np.array([float(x) for x in result])
[docs] def trace_bicov(trace, grid): """ Calculate bicovariance of trace data. Args: trace: Array of trace values grid: Grid of lag values for bicovariance calculation Returns: tuple: (bicov_values, lag_combinations) - Bicovariance values and lag pairs """ java_trace = jpype.JArray(jpype.JDouble)(trace) java_grid = jpype.JArray(jpype.JInt)(grid) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_bicov(java_trace, java_grid) bicov_values = np.array([float(x) for x in result.getFirst()]) lag_combinations = [] for lag_array in result.getSecond(): lag_combinations.append([int(x) for x in lag_array]) return (bicov_values, lag_combinations)
[docs] def trace_iat2bins(trace, scale): """ Convert inter-arrival times to histogram bins. Args: trace: Array of inter-arrival times scale: Time scale for binning Returns: tuple: (counts, bin_membership) - Bin counts and membership array """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_iat2bins(java_trace, jpype.JDouble(scale)) counts = np.array([int(x) for x in result.getFirst()]) bin_membership = np.array([int(x) for x in result.getSecond()]) return (counts, bin_membership)
[docs] def trace_joint(trace, lag, order): """ Calculate joint statistics of trace data. Args: trace: Array of trace values lag: Array of lag values order: Array of moment orders Returns: float: Joint statistic value """ java_trace = jpype.JArray(jpype.JDouble)(trace) java_lag = jpype.JArray(jpype.JInt)(lag) java_order = jpype.JArray(jpype.JInt)(order) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_joint(java_trace, java_lag, java_order) return float(result)
[docs] def trace_summary(trace): """ Calculate comprehensive summary statistics for trace data. Args: trace: Array of trace values Returns: dict: Dictionary containing various statistics: - mean, scv, mad, skew, kurt: Basic statistics - q25, q50, q75, p95: Quantiles and percentiles - min, max, iqr: Range statistics - acf1-acf4: Autocorrelation at lags 1-4 - idc_scv_ratio: IDC to SCV ratio """ java_trace = jpype.JArray(jpype.JDouble)(trace) result = jpype.JPackage('jline').api.trace.Trace_varKt.trace_summary(java_trace) result_array = [float(x) for x in result] return { 'mean': result_array[0], 'scv': result_array[1], 'mad': result_array[2], 'skew': result_array[3], 'kurt': result_array[4], 'q25': result_array[5], 'q50': result_array[6], 'q75': result_array[7], 'p95': result_array[8], 'min': result_array[9], 'max': result_array[10], 'iqr': result_array[11], 'acf1': result_array[12], 'acf2': result_array[13], 'acf3': result_array[14], 'acf4': result_array[15], 'idc_scv_ratio': result_array[16] }
[docs] def trace_cdf(trace, x_values=None): """ Calculate cumulative distribution function of trace data. Args: trace: Array of trace values x_values: X values to evaluate CDF at (optional, defaults to unique sorted trace values) Returns: dict: Dictionary with 'x' and 'y' arrays for CDF plot """ import numpy as np trace = np.array(trace).flatten() if x_values is None: x_values = np.unique(np.sort(trace)) else: x_values = np.array(x_values) n = len(trace) cdf_values = np.zeros_like(x_values, dtype=float) for i, x in enumerate(x_values): cdf_values[i] = np.sum(trace <= x) / n return { 'x': x_values, 'y': cdf_values }
[docs] def trace_pdf(trace, x_values=None, bandwidth=None): """ Calculate probability density function using kernel density estimation. Args: trace: Array of trace values x_values: X values to evaluate PDF at (optional) bandwidth: KDE bandwidth (optional, uses default if None) Returns: dict: Dictionary with 'x' and 'y' arrays for PDF plot """ import numpy as np from scipy.stats import gaussian_kde trace = np.array(trace).flatten() trace = trace[np.isfinite(trace)] if len(trace) == 0: raise ValueError("No finite values in trace data") if x_values is None: x_min, x_max = np.min(trace), np.max(trace) x_range = x_max - x_min x_values = np.linspace(x_min - 0.1*x_range, x_max + 0.1*x_range, 100) else: x_values = np.array(x_values) kde = gaussian_kde(trace) if bandwidth is not None: kde.set_bandwidth(bandwidth) pdf_values = kde(x_values) return { 'x': x_values, 'y': pdf_values }
[docs] def trace_hist(trace, bins=None): """ Calculate histogram of trace data. Args: trace: Array of trace values bins: Number of bins or bin specification (optional, defaults to 'auto') Returns: dict: Dictionary with 'counts', 'bin_edges', and 'bin_centers' """ import numpy as np trace = np.array(trace).flatten() trace = trace[np.isfinite(trace)] if bins is None: bins = 'auto' counts, bin_edges = np.histogram(trace, bins=bins) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 return { 'counts': counts, 'bin_edges': bin_edges, 'bin_centers': bin_centers }
[docs] def trace_moment(trace, k): """ Calculate the k-th moment of trace data. Args: trace: Array of trace values k: Moment order Returns: float: k-th moment value """ import numpy as np trace = np.array(trace).flatten() trace = trace[np.isfinite(trace)] if len(trace) == 0: return 0.0 return float(np.mean(trace ** k))
[docs] def trace_skew(trace): """ Calculate skewness of trace data. Args: trace: Array of trace values Returns: float: Skewness value """ import numpy as np from scipy.stats import skew trace = np.array(trace).flatten() trace = trace[np.isfinite(trace)] if len(trace) < 3: return 0.0 return float(skew(trace))
[docs] def trace_kurt(trace): """ Calculate kurtosis of trace data. Args: trace: Array of trace values Returns: float: Excess kurtosis value (Fisher's definition) """ import numpy as np from scipy.stats import kurtosis trace = np.array(trace).flatten() trace = trace[np.isfinite(trace)] if len(trace) < 4: return 0.0 return float(kurtosis(trace, fisher=True))
[docs] def trace_fit_gamma(trace): """ Fit gamma distribution to trace data using method of moments. Args: trace: Array of positive trace values Returns: dict: Gamma distribution parameters: - 'shape': Shape parameter (alpha) - 'scale': Scale parameter (beta) - 'rate': Rate parameter (1/beta) - 'mean': Sample mean - 'var': Sample variance Raises: ValueError: If no positive finite values or zero variance """ import numpy as np trace = np.array(trace).flatten() trace = trace[np.isfinite(trace) & (trace > 0)] if len(trace) == 0: raise ValueError("No positive finite values in trace data") sample_mean = np.mean(trace) sample_var = np.var(trace, ddof=1) if sample_var <= 0: raise ValueError("Sample variance must be positive for gamma fitting") scale = sample_var / sample_mean shape = sample_mean / scale rate = 1.0 / scale return { 'shape': float(shape), 'scale': float(scale), 'rate': float(rate), 'mean': float(sample_mean), 'var': float(sample_var) }
[docs] def mtrace_corr(trace, ntypes, type_array, lags=None): """ Calculate cross-correlation matrix for multi-type trace data. Args: trace: Array of trace values ntypes: Number of different types type_array: Array indicating the type of each trace element lags: Array of lag values (default: [0,1,2,3,4]) Returns: dict: Dictionary with: - 'correlations': 3D array (ntypes x ntypes x nlags) - 'lags': Array of lag values used Raises: ValueError: If trace and type_array have different lengths """ import numpy as np trace = np.array(trace).flatten() type_array = np.array(type_array).flatten() if len(trace) != len(type_array): raise ValueError("trace and type_array must have same length") if lags is None: lags = [0, 1, 2, 3, 4] lags = np.array(lags) nlags = len(lags) correlations = np.zeros((ntypes, ntypes, nlags)) for i in range(ntypes): for j in range(ntypes): for k, lag in enumerate(lags): if lag == 0: mask_i = (type_array == i) mask_j = (type_array == j) if np.sum(mask_i) > 0 and np.sum(mask_j) > 0: if i == j: correlations[i, j, k] = 1.0 else: correlations[i, j, k] = np.corrcoef( trace[mask_i], trace[mask_j] )[0, 1] if len(trace[mask_i]) > 1 and len(trace[mask_j]) > 1 else 0.0 else: if len(trace) > lag: trace1 = trace[:-lag] trace2 = trace[lag:] type1 = type_array[:-lag] type2 = type_array[lag:] mask1 = (type1 == i) mask2 = (type2 == j) if np.sum(mask1) > 0 and np.sum(mask2) > 0: correlations[i, j, k] = np.corrcoef( trace1[mask1], trace2[mask2] )[0, 1] if len(trace1[mask1]) > 1 and len(trace2[mask2]) > 1 else 0.0 return { 'correlations': correlations, 'lags': lags }
[docs] def mtrace_cov(trace, ntypes, type_array, lags=None): """ Calculate cross-covariance matrix for multi-type trace data. Args: trace: Array of trace values ntypes: Number of different types type_array: Array indicating the type of each trace element lags: Array of lag values (default: [0,1,2,3,4]) Returns: dict: Dictionary with: - 'covariances': 3D array (ntypes x ntypes x nlags) - 'lags': Array of lag values used Raises: ValueError: If trace and type_array have different lengths """ import numpy as np trace = np.array(trace).flatten() type_array = np.array(type_array).flatten() if len(trace) != len(type_array): raise ValueError("trace and type_array must have same length") if lags is None: lags = [0, 1, 2, 3, 4] lags = np.array(lags) nlags = len(lags) covariances = np.zeros((ntypes, ntypes, nlags)) for i in range(ntypes): for j in range(ntypes): for k, lag in enumerate(lags): if lag == 0: mask_i = (type_array == i) mask_j = (type_array == j) if np.sum(mask_i) > 0 and np.sum(mask_j) > 0: if i == j: covariances[i, j, k] = np.var(trace[mask_i], ddof=1) else: covariances[i, j, k] = np.cov( trace[mask_i], trace[mask_j], ddof=1 )[0, 1] if len(trace[mask_i]) > 1 and len(trace[mask_j]) > 1 else 0.0 else: if len(trace) > lag: trace1 = trace[:-lag] trace2 = trace[lag:] type1 = type_array[:-lag] type2 = type_array[lag:] mask1 = (type1 == i) mask2 = (type2 == j) if np.sum(mask1) > 0 and np.sum(mask2) > 0: covariances[i, j, k] = np.cov( trace1[mask1], trace2[mask2], ddof=1 )[0, 1] if len(trace1[mask1]) > 1 and len(trace2[mask2]) > 1 else 0.0 return { 'covariances': covariances, 'lags': lags }
# Additional MTRACE functions
[docs] def mtrace_backward_moment(traces, order): """ Computes backward moments for multiple traces. Args: traces: List of trace data order: Moment order Returns: numpy.ndarray: Backward moments """ result = jpype.JPackage('jline').api.trace.Mtrace_backward_momentKt.mtrace_backward_moment( jlineMatrixFromArray(traces), int(order) ) return jlineMatrixToArray(result)
[docs] def mtrace_bootstrap(traces, n_bootstrap=100, statistic='mean'): """ Performs bootstrap analysis on multiple traces. Args: traces: List of trace data n_bootstrap: Number of bootstrap samples statistic: Statistic to compute ('mean', 'var', etc.) Returns: dict: Bootstrap results """ result = jpype.JPackage('jline').api.trace.Mtrace_bootstrapKt.mtrace_bootstrap( jlineMatrixFromArray(traces), int(n_bootstrap), jpype.JString(statistic) ) return { 'estimates': jlineMatrixToArray(result.getEstimates()), 'confidence_interval': jlineMatrixToArray(result.getConfidenceInterval()), 'std_error': float(result.getStdError()) }
[docs] def mtrace_count(traces): """ Counts events in multiple traces. Args: traces: List of trace data Returns: numpy.ndarray: Event counts per trace """ result = jpype.JPackage('jline').api.trace.Mtrace_countKt.mtrace_count( jlineMatrixFromArray(traces) ) return jlineMatrixToArray(result)
[docs] def mtrace_cross_moment(trace1, trace2, order1, order2): """ Computes cross moments between two traces. Args: trace1: First trace data trace2: Second trace data order1: Moment order for trace1 order2: Moment order for trace2 Returns: float: Cross moment value """ result = jpype.JPackage('jline').api.trace.Mtrace_cross_momentKt.mtrace_cross_moment( jlineMatrixFromArray(trace1), jlineMatrixFromArray(trace2), int(order1), int(order2) ) return float(result)
[docs] def mtrace_forward_moment(traces, order): """ Computes forward moments for multiple traces. Args: traces: List of trace data order: Moment order Returns: numpy.ndarray: Forward moments """ result = jpype.JPackage('jline').api.trace.Mtrace_forward_momentKt.mtrace_forward_moment( jlineMatrixFromArray(traces), int(order) ) return jlineMatrixToArray(result)
[docs] def mtrace_iat2counts(traces, interval_length): """ Converts inter-arrival times to counts for multiple traces. Args: traces: List of inter-arrival time traces interval_length: Length of counting interval Returns: dict: Count data for each trace """ result = jpype.JPackage('jline').api.trace.Mtrace_iat2countsKt.mtrace_iat2counts( jlineMatrixFromArray(traces), float(interval_length) ) return { 'counts': jlineMatrixToArray(result.getCounts()), 'intervals': jlineMatrixToArray(result.getIntervals()) }
[docs] def mtrace_joint(traces, bins=None): """ Computes joint distribution for multiple traces. Args: traces: List of trace data bins: Number of bins for histogram (optional) Returns: dict: Joint distribution information """ if bins is not None: result = jpype.JPackage('jline').api.trace.Mtrace_jointKt.mtrace_joint( jlineMatrixFromArray(traces), int(bins) ) else: result = jpype.JPackage('jline').api.trace.Mtrace_jointKt.mtrace_joint( jlineMatrixFromArray(traces) ) return { 'joint_distribution': jlineMatrixToArray(result.getJointDistribution()), 'marginals': jlineMatrixToArray(result.getMarginals()), 'bins': jlineMatrixToArray(result.getBins()) if hasattr(result, 'getBins') else None }
[docs] def mtrace_merge(traces): """ Merges multiple traces into a single trace. Args: traces: List of trace data to merge Returns: numpy.ndarray: Merged trace """ result = jpype.JPackage('jline').api.trace.Mtrace_mergeKt.mtrace_merge( jlineMatrixFromArray(traces) ) return jlineMatrixToArray(result)
[docs] def mtrace_moment(traces, order): """ Computes moments of specified order for multiple traces. Args: traces: List of trace data order: Moment order Returns: numpy.ndarray: Moments for each trace """ result = jpype.JPackage('jline').api.trace.Mtrace_momentKt.mtrace_moment( jlineMatrixFromArray(traces), int(order) ) return jlineMatrixToArray(result)
[docs] def mtrace_moment_simple(traces): """ Computes simple moments (mean, variance) for multiple traces. Args: traces: List of trace data Returns: dict: Simple moments for each trace """ result = jpype.JPackage('jline').api.trace.Mtrace_moment_simpleKt.mtrace_moment_simple( jlineMatrixFromArray(traces) ) return { 'means': jlineMatrixToArray(result.getMeans()), 'variances': jlineMatrixToArray(result.getVariances()) }
[docs] def mtrace_pc(traces): """ Computes principal components for multiple traces. Args: traces: List of trace data Returns: dict: Principal component analysis results """ result = jpype.JPackage('jline').api.trace.Mtrace_pcKt.mtrace_pc( jlineMatrixFromArray(traces) ) return { 'components': jlineMatrixToArray(result.getComponents()), 'explained_variance': jlineMatrixToArray(result.getExplainedVariance()), 'loadings': jlineMatrixToArray(result.getLoadings()) }
[docs] def mtrace_sigma(traces): """ Computes sigma parameter for multiple traces. Args: traces: List of trace data Returns: numpy.ndarray: Sigma values for each trace """ result = jpype.JPackage('jline').api.trace.Mtrace_sigmaKt.mtrace_sigma( jlineMatrixFromArray(traces) ) return jlineMatrixToArray(result)
[docs] def mtrace_sigma2(traces): """ Computes squared sigma parameter for multiple traces. Args: traces: List of trace data Returns: numpy.ndarray: Squared sigma values for each trace """ result = jpype.JPackage('jline').api.trace.Mtrace_sigma2Kt.mtrace_sigma2( jlineMatrixFromArray(traces) ) return jlineMatrixToArray(result)
[docs] def mtrace_split(trace, n_parts): """ Splits a trace into multiple parts. Args: trace: Trace data to split n_parts: Number of parts to split into Returns: list: List of trace parts """ result = jpype.JPackage('jline').api.trace.Mtrace_splitKt.mtrace_split( jlineMatrixFromArray(trace), int(n_parts) ) return [jlineMatrixToArray(part) for part in result]
[docs] def mtrace_summary(traces): """ Computes summary statistics for multiple traces. Args: traces: List of trace data Returns: dict: Summary statistics """ result = jpype.JPackage('jline').api.trace.Mtrace_summaryKt.mtrace_summary( jlineMatrixFromArray(traces) ) return { 'count': jlineMatrixToArray(result.getCount()), 'mean': jlineMatrixToArray(result.getMean()), 'std': jlineMatrixToArray(result.getStd()), 'min': jlineMatrixToArray(result.getMin()), 'percentiles': jlineMatrixToArray(result.getPercentiles()), 'max': jlineMatrixToArray(result.getMax()) }