"""
Matrix-Analytic Methods (MAM) for MAP/PH distributions.
This module provides functions for analyzing Markovian Arrival Processes (MAPs),
Phase-Type (PH) distributions, and related matrix-analytic methods. It includes
fitting algorithms, moment calculations, and various transformations.
Key function categories:
- MAP analysis: map_pie, map_mean, map_var, map_scv, map_skew
- MAP fitting: map2_fit, mmpp2_fit, aph_fit, aph2_fit
- PH distributions: Phase-type analysis and fitting
- Transformations: map_scale, map_normalize, map_timereverse
- QBD methods: qbd_R, qbd_mapmap1, qbd_raprap1
- Compression: compress_adaptive, compress_spectral
These functions support advanced stochastic modeling with correlated
arrivals and non-exponential service times.
"""
import jpype
import numpy as np
from line_solver import jlineMatrixToArray, jlineMatrixFromArray
[docs]
def map_pie(D0, D1=None):
"""
Compute equilibrium distribution of the embedded discrete-time process.
Calculates the stationary distribution of the discrete-time Markov chain
embedded at departure instants for a Markovian Arrival Process (MAP).
The embedded process has transition matrix P = (-D0)^(-1) * D1.
Args:
D0: Generator matrix for non-arrival transitions, or MAP as container {D0,D1}.
D1: Generator matrix for arrival transitions (optional if D0 is container).
Returns:
numpy.ndarray: Equilibrium distribution of the discrete-time Markov chain
embedded at departure instants.
"""
if hasattr(D0, 'get') and hasattr(D0, 'length'):
if D0.length() == 2:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_pieKt.map_pie(D0)
)
else:
raise ValueError("D0 container must have exactly 2 elements")
elif D1 is None:
if isinstance(D0, (list, np.ndarray)):
D0_array = np.array(D0)
if D0_array.ndim == 3 and D0_array.shape[0] == 2:
D0_mat = jlineMatrixFromArray(D0_array[0])
D1_mat = jlineMatrixFromArray(D0_array[1])
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_pieKt.map_pie(D0_mat, D1_mat)
)
else:
raise ValueError("When D1 is None, D0 must be a 3D array with shape (2, n, n)")
else:
raise ValueError("Invalid input type for D0 when D1 is None")
else:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_pieKt.map_pie(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
)
[docs]
def map_mean(D0, D1=None):
"""
Calculate mean inter-arrival time of a MAP.
Computes the expected inter-arrival time for a Markovian Arrival Process.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Mean inter-arrival time
"""
if hasattr(D0, 'get') and hasattr(D0, 'length'):
if D0.length() == 2:
return jpype.JPackage('jline').api.mam.Map_meanKt.map_mean(D0)
else:
raise ValueError("D0 container must have exactly 2 elements")
elif D1 is None:
if isinstance(D0, (list, np.ndarray)):
D0_array = np.array(D0)
if D0_array.ndim == 3 and D0_array.shape[0] == 2:
D0_mat = jlineMatrixFromArray(D0_array[0])
D1_mat = jlineMatrixFromArray(D0_array[1])
return jpype.JPackage('jline').api.mam.Map_meanKt.map_mean(D0_mat, D1_mat)
else:
raise ValueError("When D1 is None, D0 must be a 3D array with shape (2, n, n)")
else:
raise ValueError("Invalid input type for D0 when D1 is None")
else:
return jpype.JPackage('jline').api.mam.Map_meanKt.map_mean(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_var(D0, D1=None):
"""
Calculate variance of inter-arrival times of a MAP.
Computes the variance of inter-arrival times for a Markovian Arrival Process.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Variance of inter-arrival times
"""
if D1 is None:
return jpype.JPackage('jline').api.mam.Map_varKt.map_var(D0)
else:
return jpype.JPackage('jline').api.mam.Map_varKt.map_var(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_scv(D0, D1=None):
"""
Calculate squared coefficient of variation of a MAP.
Computes the SCV (variance/mean²) of inter-arrival times.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Squared coefficient of variation
"""
if D1 is None:
return jpype.JPackage('jline').api.mam.Map_scvKt.map_scv(D0)
else:
return jpype.JPackage('jline').api.mam.Map_scvKt.map_scv(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_skew(D0, D1=None):
"""
Calculate skewness of inter-arrival times of a MAP.
Computes the third central moment normalized by variance^(3/2).
Args:
D0: Generator matrix for non-arrival transitions, or MAP container
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Skewness of inter-arrival times
"""
if D1 is None:
return jpype.JPackage('jline').api.mam.Map_skewKt.map_skew(D0)
else:
return jpype.JPackage('jline').api.mam.Map_skewKt.map_skew(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_moment(D0, D1, order):
"""
Compute (power) moments of interarrival times of the specified order.
Calculates the k-th moment E[X^k] where X is the interarrival time
random variable of the MAP.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
order: Moment order to calculate (1=>E[X], 2=>E[X^2], ...)
Returns:
float: The k-th power moment of interarrival times
Examples:
- map_moment(D0, D1, 1) returns the first moment E[X]
- map_moment(D0, D1, 2) returns the second moment E[X^2]
"""
return jpype.JPackage('jline').api.mam.Map_momentKt.map_moment(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JInt(order)
)
[docs]
def map_lambda(D0, D1=None):
"""
Calculate arrival rate of a MAP.
Computes the long-run arrival rate (λ) of the Markovian Arrival Process.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Arrival rate λ
"""
if D1 is None:
return jpype.JPackage('jline').api.mam.Map_lambdaKt.map_lambda(D0)
else:
return jpype.JPackage('jline').api.mam.Map_lambdaKt.map_lambda(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_acf(D0, D1, lags):
"""
Compute autocorrelation coefficients of interarrival times.
Calculates the lag-k autocorrelation coefficients of the interarrival
times for a Markovian Arrival Process. The autocorrelation coefficient
at lag k measures the correlation between interarrival times that are
k arrivals apart.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
lags: Array of positive integers specifying the lags of the
autocorrelation coefficients to compute
Returns:
numpy.ndarray: Autocorrelation coefficients returned in the same
order as the lags vector
Examples:
- map_acf(D0, D1, [1]) returns the lag-1 autocorrelation coefficient
- map_acf(D0, D1, [1, 2, 3, 4, 5]) returns the first five autocorrelation coefficients
- map_acf(D0, D1, np.logspace(0, 4, 5)) returns five logarithmically spaced
autocorrelation coefficients in [1e0, 1e4]
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_acfKt.map_acf(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jlineMatrixFromArray(lags)
)
)
[docs]
def map_acfc(D0, D1, lags, u):
"""
Compute autocorrelation of a MAP's counting process at specified lags.
Calculates the autocorrelation function of the counting process N(t)
representing the number of arrivals in time interval [0,t], evaluated
at discrete time points separated by timeslot length u.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
lags: Array of lag values (positive integers)
u: Length of timeslot (timescale parameter)
Returns:
numpy.ndarray: Autocorrelation values of the counting process
at the specified lags
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_acfcKt.map_acfc(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jlineMatrixFromArray(lags),
jpype.JDouble(u)
)
)
[docs]
def map_idc(D0, D1=None):
"""
Compute the asymptotic index of dispersion.
Calculates I = SCV(1 + 2*sum_{k=1}^∞ ρ_k) where SCV is the squared
coefficient of variation and ρ_k is the lag-k autocorrelation coefficient
of inter-arrival times. I is also the limiting value of the index of
dispersion for counts and of the index of dispersion for intervals.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container {D0,D1}
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Asymptotic index of dispersion
Examples:
- For a renewal process: map_idc(map_renewal(MAP)) equals the SCV of the MAP
"""
if D1 is None:
return jpype.JPackage('jline').api.mam.Map_idcKt.map_idc(D0)
else:
return jpype.JPackage('jline').api.mam.Map_idcKt.map_idc(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_gamma(D0, D1=None):
"""
Estimate the auto-correlation decay rate of a MAP.
For MAPs of order higher than 2, performs an approximation of the
autocorrelation function (ACF) curve using non-linear least squares fitting.
For second-order MAPs, returns the geometric ACF decay rate. For Poisson
processes (order 1), returns 0.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container {D0,D1}
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
float: Autocorrelation decay rate (GAMMA parameter)
"""
if D1 is None:
return jpype.JPackage('jline').api.mam.Map_gammaKt.map_gamma(D0)
else:
return jpype.JPackage('jline').api.mam.Map_gammaKt.map_gamma(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
[docs]
def map_gamma2(MAP):
"""
Calculate the second largest eigenvalue of the embedded transition matrix.
Computes the second-order gamma parameter (γ₂) which is the second largest
eigenvalue (in absolute value) of the embedded discrete-time transition matrix
P = (-D0)^(-1) * D1. This parameter characterizes the autocorrelation structure
of the MAP beyond the dominant eigenvalue.
Args:
MAP: MAP structure or container {D0, D1}
Returns:
complex: Second largest eigenvalue of the embedded transition matrix
(complex number, but typically real for feasible MAPs)
Note:
The eigenvalues are sorted by descending absolute value, where the first
eigenvalue is 1 (for irreducible MAPs) and γ₂ is the second eigenvalue.
This parameter is important for analyzing higher-order correlation properties.
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_gamma2Kt.map_gamma2(MAP)
)
[docs]
def map_cdf(D0, D1, points):
"""
Calculate cumulative distribution function of a MAP.
Evaluates the CDF of inter-arrival times at specified points.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
points: Array of points to evaluate CDF at
Returns:
numpy.ndarray: CDF values at specified points
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_cdfKt.map_cdf(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jlineMatrixFromArray(points)
)
)
[docs]
def map_piq(D0, D1=None):
"""
Compute the probability in queue (piq) for a MAP.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
numpy.ndarray: Probability in queue vector
"""
if D1 is None:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_piqKt.map_piq(D0)
)
else:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_piqKt.map_piq(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
)
[docs]
def map_embedded(D0, D1=None):
"""
Compute the embedded discrete-time process transition matrix.
Returns the transition matrix P of the discrete-time Markov chain embedded
at arrival instants. If the MAP is feasible, then P must be an irreducible
stochastic matrix. P(i,j) gives the probability that the MAP restarts in
phase j if the last arrival occurred in phase i.
Args:
D0: Generator matrix for non-arrival transitions, or MAP container {D0,D1}
D1: Generator matrix for arrival transitions (optional if D0 is container)
Returns:
numpy.ndarray: Transition matrix P = (-D0)^(-1) * D1 of the embedded process
"""
if D1 is None:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_embeddedKt.map_embedded(D0)
)
else:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_embeddedKt.map_embedded(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
)
[docs]
def map_count_mean(MAP, t):
"""
Compute the mean of the counting process.
Calculates the expected number of arrivals in time interval (0, t] for
a Markovian Arrival Process.
Args:
MAP: Markovian Arrival Process as container {D0, D1}
t: The time period considered for each sample of the counting process
Returns:
float: Mean number of arrivals in (0, t]
"""
return jpype.JPackage('jline').api.mam.Map_count_meanKt.map_count_mean(
MAP, jpype.JDouble(t)
)
[docs]
def map_count_var(MAP, t):
"""
Compute the variance of the counting process.
Calculates the variance of the number of arrivals in time interval (0, t]
for a Markovian Arrival Process.
Args:
MAP: Markovian Arrival Process as container {D0, D1}
t: The time period considered for each sample of the counting process
Returns:
float: Variance of number of arrivals in (0, t]
Reference:
He and Neuts, "Markov chains with marked transitions", 1998
Verified special case of MMPP(2) with Andresen and Nielsen, 1998
"""
return jpype.JPackage('jline').api.mam.Map_count_varKt.map_count_var(
MAP, jpype.JDouble(t)
)
[docs]
def map_varcount(D0, D1, t):
"""
Compute variance of the counting process for a MAP.
Calculates the variance of the number of arrivals in time interval [0,t]
for a Markovian Arrival Process. This is an alternative interface for
map_count_var with direct matrix input.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
t: Time interval length
Returns:
float: Variance of number of arrivals in time interval [0,t]
Note:
This function provides the same computation as map_count_var but
accepts the D0, D1 matrices directly rather than a MAP container.
"""
return jpype.JPackage('jline').api.mam.Map_varcountKt.map_varcount(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JDouble(t)
)
[docs]
def map2_fit(e1, e2, e3, g2):
"""
Fit a second-order MAP from moments and autocorrelation.
Constructs a Markovian Arrival Process of order 2 from given statistical
moments and lag-1 autocorrelation coefficient using the explicit inverse
characterization method for acyclic MAPs of second order.
Args:
e1: First moment E[X] (mean inter-arrival time)
e2: Second moment E[X²]
e3: Third moment E[X³] (if not provided, automatically selected)
g2: Lag-1 autocorrelation coefficient
Returns:
tuple: (D0, D1) - Generator matrices for the fitted MAP or None if infeasible
Reference:
A. Heindl, G. Horvath, K. Gross "Explicit inverse characterization of
acyclic MAPs of second order"
Note:
The method automatically handles hyperexponential (h2>0) and hypoexponential
(h2<0) cases, where h2 = (r2-r1²)/r1² with r1=e1, r2=e2/2.
"""
result = jpype.JPackage('jline').api.mam.Map2_fitKt.map2_fit(
jpype.JDouble(e1),
jpype.JDouble(e2),
jpype.JDouble(e3),
jpype.JDouble(g2)
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def aph_fit(e1, e2, e3, nmax=10):
"""
Fit an Acyclic Phase-Type distribution to moments.
Fits an APH distribution to match the first three moments using
the EC (Expectation-Conditional) algorithm.
Args:
e1: First moment (mean)
e2: Second moment
e3: Third moment
nmax: Maximum number of phases (default: 10)
Returns:
tuple: (alpha, T, feasible) where:
- alpha: Initial probability vector
- T: Sub-generator matrix
- feasible: Whether the fit was feasible
"""
result = jpype.JPackage('jline').api.mam.Aph_fitKt.aph_fit(
jpype.JDouble(e1),
jpype.JDouble(e2),
jpype.JDouble(e3),
jpype.JInt(nmax)
)
alpha = jlineMatrixToArray(result.alpha)
T = jlineMatrixToArray(result.T)
feasible = result.feasible
return alpha, T, feasible
[docs]
def aph2_fit(M1, M2, M3):
"""
Fit a 2-phase APH distribution to moments.
Fits a 2-phase Acyclic Phase-Type distribution to match
the first three moments.
Args:
M1: First moment (mean)
M2: Second moment
M3: Third moment
Returns:
tuple: (alpha, T, feasible) where:
- alpha: Initial probability vector
- T: Sub-generator matrix
- feasible: Whether the fit was feasible
"""
result = jpype.JPackage('jline').api.mam.Aph2_fitKt.aph2_fit(
jpype.JDouble(M1),
jpype.JDouble(M2),
jpype.JDouble(M3)
)
alpha = jlineMatrixToArray(result.alpha)
T = jlineMatrixToArray(result.T)
feasible = result.feasible
return alpha, T, feasible
[docs]
def aph2_fitall(M1, M2, M3):
"""
Fit a 2-phase APH distribution with all possible structures.
Tries all possible 2-phase APH structures to fit the given moments.
Args:
M1: First moment (mean)
M2: Second moment
M3: Third moment
Returns:
Results for all feasible APH structures
"""
result = jpype.JPackage('jline').api.mam.Aph2_fitallKt.aph2_fitall(
jpype.JDouble(M1),
jpype.JDouble(M2),
jpype.JDouble(M3)
)
fits = []
for i in range(result.size()):
fit = result.get(i)
alpha = jlineMatrixToArray(fit.alpha)
T = jlineMatrixToArray(fit.T)
fits.append((alpha, T))
return fits
[docs]
def aph2_adjust(M1, M2, M3, method="default"):
"""
Adjust moments to ensure feasible APH fitting.
Adjusts the input moments to be in the feasible region for
2-phase APH distribution fitting.
Args:
M1: First moment (mean)
M2: Second moment
M3: Third moment
method: Adjustment method (default: "default")
Returns:
tuple: (M1, M2, M3) - Adjusted moments
"""
result = jpype.JPackage('jline').api.mam.Aph2_adjustKt.aph2_adjust(
jpype.JDouble(M1),
jpype.JDouble(M2),
jpype.JDouble(M3),
jpype.JString(method)
)
return result.M1, result.M2, result.M3
[docs]
def mmpp2_fit(E1, E2, E3, G2):
"""
Fit a 2-state MMPP (Markov Modulated Poisson Process) to moments.
Fits a 2-state MMPP to match the first three moments and
lag-1 autocorrelation of inter-arrival times.
Args:
E1: First moment (mean)
E2: Second moment
E3: Third moment
G2: Lag-1 autocorrelation coefficient
Returns:
tuple: (D0, D1, feasible) where:
- D0: Generator matrix for non-arrival transitions
- D1: Generator matrix for arrival transitions
- feasible: Whether the fit was successful
"""
result = jpype.JPackage('jline').api.mam.Mmpp2_fitKt.mmpp2_fit(
jpype.JDouble(E1),
jpype.JDouble(E2),
jpype.JDouble(E3),
jpype.JDouble(G2)
)
D0 = jlineMatrixToArray(result.D0)
D1 = jlineMatrixToArray(result.D1)
feasible = result.feasible
return D0, D1, feasible
[docs]
def mmpp2_fit1(mean, scv, skew, idc):
"""
Fit a 2-state MMPP using alternative parameterization.
Fits a 2-state MMPP using mean, SCV, skewness, and index of dispersion.
Args:
mean: Mean inter-arrival time
scv: Squared coefficient of variation
skew: Skewness
idc: Index of dispersion for counts
Returns:
tuple: (D0, D1, feasible) where:
- D0: Generator matrix for non-arrival transitions
- D1: Generator matrix for arrival transitions
- feasible: Whether the fit was successful
"""
result = jpype.JPackage('jline').api.mam.Mmpp2_fit1Kt.mmpp2_fit1(
jpype.JDouble(mean),
jpype.JDouble(scv),
jpype.JDouble(skew),
jpype.JDouble(idc)
)
D0 = jlineMatrixToArray(result.D0)
D1 = jlineMatrixToArray(result.D1)
feasible = result.feasible
return D0, D1, feasible
[docs]
def mmap_mixture_fit(P2, M1, M2, M3):
"""
Fit a mixture of MMAPs to given moments.
Args:
P2: Mixing probabilities matrix
M1: First moment matrix
M2: Second moment matrix
M3: Third moment matrix
Returns:
Fitted MMAP mixture parameters
"""
result = jpype.JPackage('jline').api.mam.Mmap_mixture_fitKt.mmap_mixture_fit(
jlineMatrixFromArray(P2),
jlineMatrixFromArray(M1),
jlineMatrixFromArray(M2),
jlineMatrixFromArray(M3)
)
D0 = jlineMatrixToArray(result.D0)
D1 = jlineMatrixToArray(result.D1)
feasible = result.feasible
return D0, D1, feasible
[docs]
def mmap_mixture_fit_mmap(mmap):
"""
Fit mixture of MMAPs from MMAP representation.
Args:
mmap: MMAP object or structure to fit mixture from
Returns:
tuple: (D0, D1, components) where:
- D0: Generator matrix for non-arrival transitions
- D1: Generator matrix for arrival transitions
- components: List of mixture components
"""
result = jpype.JPackage('jline').api.mam.Mmap_mixture_fit_mmapKt.mmap_mixture_fit_mmap(mmap)
D0 = jlineMatrixToArray(result.D0)
D1 = jlineMatrixToArray(result.D1)
components = []
for i in range(result.components.size()):
component = result.components.get(i)
components.append(jlineMatrixToArray(component))
return D0, D1, components
[docs]
def mamap2m_fit_gamma_fb_mmap(mmap):
"""
Fit 2-moment MAMAP using gamma forward-backward algorithm from MMAP.
Args:
mmap: MMAP structure to fit
Returns:
Fitted MAMAP parameters
"""
result = jpype.JPackage('jline').api.mam.Mamap2m_fit_gamma_fb_mmapKt.mamap2m_fit_gamma_fb_mmap(mmap)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
D_classes = []
for i in range(2, result.length()):
D_classes.append(jlineMatrixToArray(result.get(i)))
return D0, D1, D_classes
[docs]
def mamap2m_fit_gamma_fb(M1, M2, M3, GAMMA, P, F, B):
"""
Fit 2-moment MAMAP using gamma forward-backward algorithm.
Args:
M1: First moment
M2: Second moment
M3: Third moment
GAMMA: Gamma parameter
P: Probability parameters
F: Forward parameters
B: Backward parameters
Returns:
tuple: (D0, D1, D_classes) - Fitted MAMAP matrices and class matrices
"""
import numpy as np
P_java = jpype.JArray(jpype.JDouble)(P)
F_java = jpype.JArray(jpype.JDouble)(F)
B_java = jpype.JArray(jpype.JDouble)(B)
result = jpype.JPackage('jline').api.mam.Mamap2m_fit_gamma_fb_mmapKt.mamap2m_fit_gamma_fb(
jpype.JDouble(M1), jpype.JDouble(M2), jpype.JDouble(M3), jpype.JDouble(GAMMA),
P_java, F_java, B_java
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
D_classes = []
for i in range(2, result.length()):
D_classes.append(jlineMatrixToArray(result.get(i)))
return D0, D1, D_classes
[docs]
def map_exponential(mean):
"""
Fit a Poisson process as a MAP.
Creates a Markovian Arrival Process representing a Poisson process
(exponential inter-arrival times) with the specified mean.
Args:
mean: Mean inter-arrival time of the process
Returns:
tuple: (D0, D1) - MAP matrices representing the Poisson process
Examples:
- map_exponential(2) returns a Poisson process with rate λ=0.5
"""
result = jpype.JPackage('jline').api.mam.Map_exponentialKt.map_exponential(
jpype.JDouble(mean)
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def map_erlang(mean, k):
"""
Fit an Erlang-k process as a MAP.
Creates a Markovian Arrival Process representing an Erlang-k distribution
with the specified mean and number of phases.
Args:
mean: Mean inter-arrival time of the process
k: Number of phases in the Erlang distribution
Returns:
tuple: (D0, D1) - MAP matrices representing the Erlang-k process
Examples:
- map_erlang(2, 3) creates an Erlang-3 process with mean 2
"""
result = jpype.JPackage('jline').api.mam.Map_erlangKt.map_erlang(
jpype.JDouble(mean),
jpype.JInt(k)
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def map_hyperexp(mean, scv, p):
"""
Fit a two-phase Hyperexponential process as a MAP.
Creates a Markovian Arrival Process representing a two-phase hyperexponential
distribution with specified mean, squared coefficient of variation, and phase
selection probability.
Args:
mean: Mean inter-arrival time of the process
scv: Squared coefficient of variation of inter-arrival times
p: Probability of being served in phase 1 (default: p=0.99)
Returns:
tuple: (D0, D1) - MAP matrices representing the hyperexponential process
Examples:
- map_hyperexp(1, 2, 0.99) creates a two-phase hyperexponential process
where phase 1 is selected with probability 0.99
- map_hyperexp(1, 2, 0.2) creates a two-phase hyperexponential process
where phase 1 is selected with probability 0.2
Note:
For some parameter combinations, a feasible solution may not exist.
Use map_isfeasible() to check if the returned MAP is valid.
"""
result = jpype.JPackage('jline').api.mam.Map_hyperexpKt.map_hyperexp(
jpype.JDouble(mean),
jpype.JDouble(scv),
jlineMatrixFromArray(p)
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def map_scale(D0, D1, newMean):
"""
Rescale mean inter-arrival time of a MAP.
Returns a MAP with the same normalized moments and correlations as the
input MAP, except for the mean inter-arrival time which is set to the
specified new value.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
newMean: New mean inter-arrival time
Returns:
tuple: (D0_scaled, D1_scaled) - Scaled and normalized MAP matrices
Examples:
- map_mean(map_scale(map_exponential(1), 2)) has mean 2
"""
result = jpype.JPackage('jline').api.mam.Map_scaleKt.map_scale(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JDouble(newMean)
)
D0_scaled = jlineMatrixToArray(result.get(0))
D1_scaled = jlineMatrixToArray(result.get(1))
return D0_scaled, D1_scaled
[docs]
def map_normalize(D0, D1):
"""
Try to make a MAP feasible through normalization.
Normalizes the MAP matrices by: (1) ensuring D0+D1 is an infinitesimal
generator, (2) setting all negative off-diagonal entries to zero, and
(3) taking the real part of any complex values.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
Returns:
tuple: (D0_norm, D1_norm) - Normalized MAP matrices that form a valid MAP
Examples:
- map_normalize([[0,0],[0,0]], [[1,2],[3,4]]) produces a valid MAP
"""
result = jpype.JPackage('jline').api.mam.Map_normalizeKt.map_normalize(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
D0_norm = jlineMatrixToArray(result.get(0))
D1_norm = jlineMatrixToArray(result.get(1))
return D0_norm, D1_norm
[docs]
def map_timereverse(map_input):
"""
Compute time-reversed MAP.
Args:
map_input: Either tuple (D0, D1) or MAP container
Returns:
tuple: (D0_rev, D1_rev) - Time-reversed MAP matrices
"""
if isinstance(map_input, tuple):
D0, D1 = map_input
result = jpype.JPackage('jline').api.mam.Map_timereverseKt.map_timereverse(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
else:
result = jpype.JPackage('jline').api.mam.Map_timereverseKt.map_timereverse(map_input)
D0_rev = jlineMatrixToArray(result.get(0))
D1_rev = jlineMatrixToArray(result.get(1))
return D0_rev, D1_rev
[docs]
def map_mark(MAP, prob):
"""
Mark arrivals from a MAP according to given probabilities.
Takes a MAP with a single arrival type and returns a MMAP (Marked MAP)
with the same unmarked inter-arrival process but marked arrivals
specified by prob. prob[k] is the probability that an arrival
will be marked with type k.
Args:
MAP: Either tuple (D0, D1) or MAP container representing the input MAP
prob: Probability vector where prob[k] is the probability that arrivals
are marked with type k (probabilities are normalized if they don't sum to 1)
Returns:
Marked MAP (MMAP) with multiple arrival types
Note:
Input marking probabilities are automatically normalized if they don't sum to 1.
"""
if isinstance(MAP, tuple):
D0, D1 = MAP
result = jpype.JPackage('jline').api.mam.Map_markKt.map_mark(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jlineMatrixFromArray(prob)
)
else:
result = jpype.JPackage('jline').api.mam.Map_markKt.map_mark(
MAP,
jlineMatrixFromArray(prob)
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def map_infgen(D0, D1):
"""
Compute infinitesimal generator matrix of MAP.
Args:
D0: Generator matrix for non-arrival transitions
D1: Generator matrix for arrival transitions
Returns:
numpy.ndarray: Infinitesimal generator matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_infgenKt.map_infgen(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1)
)
)
[docs]
def map_super(MAPa, MAPb):
"""
Compute superposition of two independent MAPs.
Creates a MAP that represents the superposition (sum) of two
independent Markovian Arrival Processes using Kronecker sum operations.
The resulting MAP has arrivals from both input MAPs combined.
Args:
MAPa: First MAP as tuple (D0_a, D1_a) or container {D0_a, D1_a}
MAPb: Second MAP as tuple (D0_b, D1_b) or container {D0_b, D1_b}
Returns:
tuple: (D0_super, D1_super) - Superposition MAP matrices where:
D0_super = kron(D0_a, I_b) + kron(I_a, D0_b)
D1_super = kron(D1_a, I_b) + kron(I_a, D1_b)
Note:
The output is automatically normalized to ensure a valid MAP.
"""
if isinstance(MAPa, tuple) and isinstance(MAPb, tuple):
D0a, D1a = MAPa
D0b, D1b = MAPb
result = jpype.JPackage('jline').api.mam.Map_superKt.map_super(
jlineMatrixFromArray(D0a),
jlineMatrixFromArray(D1a),
jlineMatrixFromArray(D0b),
jlineMatrixFromArray(D1b)
)
else:
result = jpype.JPackage('jline').api.mam.Map_superKt.map_super(MAPa, MAPb)
D0_super = jlineMatrixToArray(result.get(0))
D1_super = jlineMatrixToArray(result.get(1))
return D0_super, D1_super
[docs]
def map_sum(MAP, n):
"""
Compute sum of n identical independent MAPs.
Args:
MAP: MAP as tuple (D0, D1) or container
n: Number of identical MAPs to sum
Returns:
tuple: (D0_sum, D1_sum) - Sum MAP matrices
"""
if isinstance(MAP, tuple):
D0, D1 = MAP
result = jpype.JPackage('jline').api.mam.Map_sumKt.map_sum(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JInt(n)
)
else:
result = jpype.JPackage('jline').api.mam.Map_sumKt.map_sum(MAP, jpype.JInt(n))
D0_sum = jlineMatrixToArray(result.get(0))
D1_sum = jlineMatrixToArray(result.get(1))
return D0_sum, D1_sum
[docs]
def map_sumind(MAPs):
"""
Compute sum of multiple independent MAPs.
Args:
MAPs: List of MAPs, each as tuple (D0, D1) or container
Returns:
tuple: (D0_sum, D1_sum) - Sum MAP matrices
"""
java_maps = jpype.java.util.ArrayList()
for MAP in MAPs:
if isinstance(MAP, tuple):
D0, D1 = MAP
map_matrices = jpype.java.util.ArrayList()
map_matrices.add(jlineMatrixFromArray(D0))
map_matrices.add(jlineMatrixFromArray(D1))
java_maps.add(map_matrices)
else:
java_maps.add(MAP)
result = jpype.JPackage('jline').api.mam.Map_sumindKt.map_sumind(java_maps)
D0_sum = jlineMatrixToArray(result.get(0))
D1_sum = jlineMatrixToArray(result.get(1))
return D0_sum, D1_sum
[docs]
def map_checkfeasible(MAP, TOL=1e-10):
"""
Check if MAP satisfies feasibility conditions and return diagnostics.
Args:
MAP: MAP as tuple (D0, D1) or container
TOL: Numerical tolerance (default: 1e-10)
Returns:
Feasibility check results with diagnostic information
"""
if isinstance(MAP, tuple):
D0, D1 = MAP
return jpype.JPackage('jline').api.mam.Map_checkfeasibleKt.map_checkfeasible(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JDouble(TOL)
)
else:
return jpype.JPackage('jline').api.mam.Map_checkfeasibleKt.map_checkfeasible(
MAP, jpype.JDouble(TOL)
)
[docs]
def map_isfeasible(MAP, TOL=1e-10):
"""
Evaluate feasibility of a MAP process.
Checks if a Markovian Arrival Process satisfies all feasibility conditions
including proper matrix structure, non-negativity constraints, stochasticity,
and irreducibility requirements.
Args:
MAP: MAP as tuple (D0, D1) or container {D0, D1}
TOL: Numerical tolerance for feasibility checks (default: 1e-10)
Returns:
bool: True if MAP is feasible, False if infeasible
Examples:
- map_isfeasible([[0,0],[0,0]], [[1,2],[3,4]]) returns False (infeasible MAP)
Note:
Numerical tolerance is based on the standard toolbox value in map_feastol().
"""
if isinstance(MAP, tuple):
D0, D1 = MAP
return jpype.JPackage('jline').api.mam.Map_isfeasibleKt.map_isfeasible(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JDouble(TOL)
)
else:
return jpype.JPackage('jline').api.mam.Map_isfeasibleKt.map_isfeasible(
MAP, jpype.JDouble(TOL)
)
[docs]
def map_feastol():
"""
Get default feasibility tolerance for MAP validation.
Returns:
float: Default tolerance value
"""
return jpype.JPackage('jline').api.mam.Map_feastolKt.map_feastol()
[docs]
def map_largemap():
"""
Get threshold for determining if a MAP is considered large.
Returns:
int: Size threshold for large MAPs
"""
return jpype.JPackage('jline').api.mam.Map_largemapKt.map_largemap()
[docs]
def aph2_assemble(l1, l2, p1):
"""
Assemble a 2-phase APH distribution from parameters.
Args:
l1: Rate parameter for first phase
l2: Rate parameter for second phase
p1: Initial probability for first phase
Returns:
tuple: (alpha, T) - Initial vector and generator matrix
"""
result = jpype.JPackage('jline').api.mam.Aph2_assembleKt.aph2_assemble(
jpype.JDouble(l1),
jpype.JDouble(l2),
jpype.JDouble(p1)
)
alpha = jlineMatrixToArray(result.alpha)
T = jlineMatrixToArray(result.T)
return alpha, T
[docs]
def ph_reindex(PHs, stationToClass):
"""
Reindex phase-type distributions according to station-to-class mapping.
Args:
PHs: List of phase-type distributions
stationToClass: Mapping from stations to classes
Returns:
Reindexed phase-type distributions
"""
java_phs = jpype.java.util.ArrayList()
for ph in PHs:
java_phs.add(ph)
result = jpype.JPackage('jline').api.mam.Ph_reindexKt.ph_reindex(
java_phs,
jlineMatrixFromArray(stationToClass)
)
reindexed_phs = []
for i in range(result.size()):
reindexed_phs.append(result.get(i))
return reindexed_phs
[docs]
def map_rand(K):
"""
Generate a random MAP with K states.
Args:
K: Number of states in the MAP
Returns:
tuple: (D0, D1) - Randomly generated MAP matrices
"""
result = jpype.JPackage('jline').api.mam.Map_randKt.map_rand(jpype.JInt(K))
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def map_randn(K, mu, sigma):
"""
Generate a random MAP with normally distributed matrix elements.
Creates a random Markovian Arrival Process with K states where the matrix
elements are drawn from normal distributions with specified mean and standard
deviation parameters. The resulting matrices are normalized to ensure feasibility.
Args:
K: Number of states in the MAP
mu: Mean parameter for the normal distribution of matrix elements
sigma: Standard deviation parameter for the normal distribution
Returns:
tuple: (D0, D1) - Random MAP matrices with normalized feasible structure
Note:
The generated matrices undergo normalization via map_normalize() to ensure
they form a valid MAP structure with proper generator properties.
"""
result = jpype.JPackage('jline').api.mam.Map_randnKt.map_randn(
jpype.JInt(K),
jpype.JDouble(mu),
jpype.JDouble(sigma)
)
D0 = jlineMatrixToArray(result.get(0))
D1 = jlineMatrixToArray(result.get(1))
return D0, D1
[docs]
def mmap_lambda(MMAP):
"""
Calculate arrival rate for each class in a Marked MAP.
Computes the arrival rate (lambda) for each job class in a Marked
Markovian Arrival Process (MMAP). For a MMAP with C classes, returns
a vector of C arrival rates.
Args:
MMAP: Marked MAP structure as container {D0, D1, D2, ..., DC+1}
where D0 is the background generator and D1, D2, ..., DC+1
are the arrival generators for each class
Returns:
numpy.ndarray: Vector of arrival rates for each job class
Note:
This function is equivalent to mmap_count_lambda(MMAP).
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_lambdaKt.mmap_lambda(MMAP)
)
[docs]
def mmap_count_mean(MMAP, t):
"""
Compute the mean of the counting process for each class in a Marked MAP.
Calculates the expected number of arrivals in time interval (0, t] for
each job class in a Marked Markovian Arrival Process (MMAP).
Args:
MMAP: Marked MAP structure as container {D0, D1, D2, ..., DC+1}
t: The time period considered for each sample of the counting process
Returns:
numpy.ndarray: Vector with the mean number of arrivals in (0, t] for each job class
Note:
For a MMAP with C classes, returns a C-dimensional vector where
mk[k] is the mean arrival count for class k in interval (0, t].
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_count_meanKt.mmap_count_mean(
MMAP, jpype.JDouble(t)
)
)
[docs]
def mmap_count_var(MMAP, t):
"""
Compute variance of number of arrivals per class in time interval t for an MMAP.
Args:
MMAP: Markovian Arrival Process with Multiple types
t: Time interval
Returns:
numpy.ndarray: Variance of number of arrivals per class in time t
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_count_varKt.mmap_count_var(
MMAP, jpype.JDouble(t)
)
)
[docs]
def mmap_count_idc(MMAP, t):
"""
Compute index of dispersion for counts per class in time interval t for an MMAP.
Args:
MMAP: Markovian Arrival Process with Multiple types
t: Time interval
Returns:
numpy.ndarray: Index of dispersion for counts per class
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_count_idcKt.mmap_count_idc(
MMAP, jpype.JDouble(t)
)
)
[docs]
def mmap_idc(MMAP):
"""
Compute index of dispersion for counts of an MMAP.
Args:
MMAP: Markovian Arrival Process with Multiple types
Returns:
float: Index of dispersion for counts
"""
return jpype.JPackage('jline').api.mam.Mmap_idcKt.mmap_idc(MMAP)
[docs]
def mmap_sigma2(mmap):
"""
Compute second-order statistics (variance) for an MMAP.
Args:
mmap: Markovian Arrival Process with Multiple types
Returns:
numpy.ndarray: Second-order statistics matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_sigma2Kt.mmap_sigma2(mmap)
)
[docs]
def mmap_exponential(lambda_rates, n):
"""
Create an exponential MMAP with given arrival rates.
Args:
lambda_rates: Array of arrival rates for each class
n: Number of states
Returns:
MMAP: Exponential MMAP with specified rates
"""
return jpype.JPackage('jline').api.mam.Mmap_exponentialKt.mmap_exponential(
jlineMatrixFromArray(lambda_rates),
jpype.JInt(n)
)
[docs]
def mmap_mixture(alpha, MAPs):
"""
Create a mixture of MMAPs with given mixing probabilities.
Args:
alpha: Mixing probability vector
MAPs: List of MMAP components
Returns:
MMAP: Mixture MMAP
"""
java_maps = jpype.java.util.ArrayList()
for MAP in MAPs:
java_maps.add(MAP)
return jpype.JPackage('jline').api.mam.Mmap_mixtureKt.mmap_mixture(
jlineMatrixFromArray(alpha),
java_maps
)
[docs]
def mmap_super(MMAPa, MMAPb, opt="default"):
"""
Compute superposition of two MMAPs.
Args:
MMAPa: First MMAP
MMAPb: Second MMAP
opt: Superposition method (default: "default")
Returns:
MMAP: Superposed MMAP
"""
return jpype.JPackage('jline').api.mam.Mmap_superKt.mmap_super(
MMAPa, MMAPb, jpype.JString(opt)
)
[docs]
def mmap_super_safe(MMAPS, maxorder=10, method="default"):
"""
Safe superposition of multiple MMAPs with error checking.
Performs superposition of multiple Marked Markovian Arrival Processes
with additional validation and error handling.
Args:
MMAPS: List of MMAP matrices to superpose
maxorder: Maximum order for approximation
method: Superposition method
Returns:
Superposed MMAP matrices
"""
java_mmaps = jpype.java.util.ArrayList()
for MMAP in MMAPS:
java_mmaps.add(MMAP)
return jpype.JPackage('jline').api.mam.Mmap_super_safeKt.mmap_super_safe(
java_mmaps,
jpype.JInt(maxorder),
jpype.JString(method)
)
[docs]
def mmap_compress(MMAP, config="default"):
"""
Compress an MMAP using specified configuration.
Reduces the number of states in an MMAP while preserving
key statistical properties using the specified configuration.
Args:
MMAP: MMAP matrices to compress
config: Compression configuration
Returns:
Compressed MMAP matrices
"""
return jpype.JPackage('jline').api.mam.Mmap_compressKt.mmap_compress(
MMAP, jpype.JString(config)
)
[docs]
def mmap_normalize(MMAP):
"""
Fix MMAP feasibility by normalization.
Normalizes a Marked MAP to ensure feasibility by setting negative values
to zero and enforcing the proper conditions for a valid MMAP structure.
This includes ensuring the background matrix D0 has negative diagonal
elements and the arrival matrices are non-negative.
Args:
MMAP: Marked MAP structure as container {D0, D1, D2, ..., DC+1}
Returns:
Normalized MMAP structure that satisfies feasibility conditions
Note:
The normalization process:
1. Sets negative off-diagonal elements in D0 to zero
2. Sets negative elements in arrival matrices to zero
3. Adjusts diagonal elements of D0 to maintain generator property
"""
return jpype.JPackage('jline').api.mam.Mmap_normalizeKt.mmap_normalize(MMAP)
[docs]
def mmap_scale(MMAP, M, maxIter=100):
"""
Scale an MMAP using iterative algorithm.
Adjusts the MMAP parameters using matrix M through
an iterative scaling procedure.
Args:
MMAP: MMAP matrices to scale
M: Scaling matrix
maxIter: Maximum number of iterations
Returns:
Scaled MMAP matrices
"""
return jpype.JPackage('jline').api.mam.Mmap_scaleKt.mmap_scale(
MMAP, jlineMatrixFromArray(M), jpype.JInt(maxIter)
)
[docs]
def mmap_timereverse(mmap):
"""
Compute time-reversed MMAP.
Creates the time-reversed version of a Marked Markovian
Arrival Process, reversing the direction of time.
Args:
mmap: MMAP matrices to time-reverse
Returns:
Time-reversed MMAP matrices
"""
return jpype.JPackage('jline').api.mam.Mmap_timereverseKt.mmap_timereverse(mmap)
[docs]
def mmap_hide(MMAP, types):
"""
Hide specific arrival types from an MMAP.
Removes specified arrival types from the MMAP,
effectively filtering out those arrival classes.
Args:
MMAP: MMAP matrices
types: Matrix specifying types to hide
Returns:
MMAP with specified types hidden
"""
return jpype.JPackage('jline').api.mam.Mmap_hideKt.mmap_hide(
MMAP, jlineMatrixFromArray(types)
)
[docs]
def mmap_shorten(mmap):
"""
Shorten an MMAP by removing redundant elements.
Reduces the size of the MMAP representation by
eliminating unnecessary components.
Args:
mmap: MMAP matrices to shorten
Returns:
Shortened MMAP matrices
"""
return jpype.JPackage('jline').api.mam.Mmap_shortenKt.mmap_shorten(mmap)
[docs]
def mmap_maps(MMAP):
"""
Extract individual MAP components from an MMAP.
Decomposes a Marked Markovian Arrival Process into
its constituent MAP components.
Args:
MMAP: MMAP matrices to decompose
Returns:
list: List of individual MAP matrices
"""
result = jpype.JPackage('jline').api.mam.Mmap_mapsKt.mmap_maps(MMAP)
maps = []
for i in range(result.size()):
maps.append(result.get(i))
return maps
[docs]
def mmap_pc(MMAP):
"""
Compute probability matrix for MMAP.
Calculates the probability matrix associated with
the Marked Markovian Arrival Process.
Args:
MMAP: MMAP matrices
Returns:
numpy.ndarray: Probability matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_pcKt.mmap_pc(MMAP)
)
[docs]
def mmap_forward_moment(MMAP, ORDERS, NORM=True):
"""
Compute forward moments for MMAP.
Calculates forward moments of the inter-arrival times
for a Marked Markovian Arrival Process.
Args:
MMAP: MMAP matrices
ORDERS: Array of moment orders to compute
NORM: Whether to normalize the moments
Returns:
numpy.ndarray: Forward moment matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_forward_momentKt.mmap_forward_moment(
MMAP, jlineMatrixFromArray(ORDERS), jpype.JBoolean(NORM)
)
)
[docs]
def mmap_backward_moment(MMAP, ORDERS, NORM=True):
"""
Compute backward moments for MMAP.
Calculates backward moments of the inter-arrival times
for a Marked Markovian Arrival Process.
Args:
MMAP: MMAP matrices
ORDERS: Array of moment orders to compute
NORM: Whether to normalize the moments
Returns:
numpy.ndarray: Backward moment matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_backward_momentKt.mmap_backward_moment(
MMAP, jlineMatrixFromArray(ORDERS), jpype.JBoolean(NORM)
)
)
[docs]
def mmap_cross_moment(mmap, k):
"""
Compute cross moments for MMAP.
Calculates cross moments between different arrival types
in a Marked Markovian Arrival Process.
Args:
mmap: MMAP matrices
k: Cross moment order
Returns:
numpy.ndarray: Cross moment matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Mmap_cross_momentKt.mmap_cross_moment(
mmap, jpype.JInt(k)
)
)
[docs]
def mmap_sample(MMAP, n, random=None):
"""
Generate random samples from an MMAP.
Produces random arrival times and classes according to
the specified Marked Markovian Arrival Process.
Args:
MMAP: MMAP matrices to sample from
n: Number of samples to generate
random: Random number generator (optional)
Returns:
tuple: (times, classes) - Arrays of arrival times and class indices
"""
if random is None:
result = jpype.JPackage('jline').api.mam.Mmap_sampleKt.mmap_sample(
MMAP, jpype.JInt(n)
)
else:
result = jpype.JPackage('jline').api.mam.Mmap_sampleKt.mmap_sample(
MMAP, jpype.JInt(n), random
)
times = jlineMatrixToArray(result.times)
classes = jlineMatrixToArray(result.classes)
return times, classes
[docs]
def mmap_rand(order, classes):
"""
Generate a random MMAP with specified parameters.
Creates a random Marked Markovian Arrival Process with
the given order (number of states) and number of classes.
Args:
order: Number of states in the MMAP
classes: Number of arrival classes
Returns:
Random MMAP matrices
"""
return jpype.JPackage('jline').api.mam.Mmap_randKt.mmap_rand(
jpype.JInt(order), jpype.JInt(classes)
)
[docs]
def map_sample(MAP, n, random=None):
"""
Generate a random sample of inter-arrival times.
Produces random inter-arrival time samples according to the specified
Markovian Arrival Process using Monte Carlo simulation.
Args:
MAP: MAP as tuple (D0, D1) or MAP container {D0, D1}
n: Number of samples to be generated
random: Random number generator or seed (optional)
Returns:
numpy.ndarray: Set of n inter-arrival time samples
Examples:
- map_sample(MAP, 10) generates a sample of 10 inter-arrival times
- map_sample(MAP, 1000, seed=42) generates 1000 samples with fixed seed
Warning:
The function can be memory consuming and quite slow for sample sizes
greater than n=10000.
"""
if isinstance(MAP, tuple):
D0, D1 = MAP
if random is None:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_sampleKt.map_sample(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JInt(n)
)
)
else:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_sampleKt.map_sample(
jlineMatrixFromArray(D0),
jlineMatrixFromArray(D1),
jpype.JInt(n),
random
)
)
else:
if random is None:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_sampleKt.map_sample(
MAP, jpype.JInt(n)
)
)
else:
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_sampleKt.map_sample(
MAP, jpype.JInt(n), random
)
)
[docs]
def mmap_count_lambda(mmap):
"""
Compute arrival rate lambda for MMAP counting process.
Calculates the lambda parameters for the counting process
associated with the Marked Markovian Arrival Process.
Args:
mmap: MMAP matrices or MMAP object
Returns:
numpy.ndarray: Lambda parameters for counting process
"""
if isinstance(mmap, list):
java_mmap = jpype.JPackage('jline').util.matrix.MatrixCell(len(mmap))
for matrix in mmap:
java_mmap.add(jlineMatrixFromArray(matrix))
else:
java_mmap = mmap
result = jpype.JPackage('jline').api.mam.Mmap_count_lambdaKt.mmap_count_lambda(java_mmap)
return jlineMatrixToArray(result)
[docs]
def mmap_isfeasible(mmap):
"""
Check if an MMAP is feasible (valid).
Determines whether the given Marked Markovian Arrival Process
satisfies all necessary conditions for validity.
Args:
mmap: MMAP matrices or MMAP object
Returns:
bool: True if MMAP is feasible, False otherwise
"""
if isinstance(mmap, list):
java_mmap = jpype.JPackage('jline').util.matrix.MatrixCell(len(mmap))
for matrix in mmap:
java_mmap.add(jlineMatrixFromArray(matrix))
else:
java_mmap = mmap
return jpype.JPackage('jline').api.mam.Mmap_isfeasibleKt.mmap_isfeasible(java_mmap)
[docs]
def mmap_mark(mmap, prob):
"""
Mark an MMAP with specified probabilities.
Applies probability markings to a Markovian Arrival Process
to create a Marked MAP with specified marking probabilities.
Args:
mmap: MMAP matrices or MMAP object
prob: Marking probabilities matrix
Returns:
Marked MMAP with applied probabilities
"""
if isinstance(mmap, list):
java_mmap = jpype.JPackage('jline').util.matrix.MatrixCell(len(mmap))
for matrix in mmap:
java_mmap.add(jlineMatrixFromArray(matrix))
else:
java_mmap = mmap
result = jpype.JPackage('jline').api.mam.Mmap_markKt.mmap_mark(
java_mmap,
jlineMatrixFromArray(prob)
)
new_mmap = []
for i in range(result.length()):
new_mmap.append(jlineMatrixToArray(result.get(i)))
return new_mmap
[docs]
def aph_bernstein(f, order):
"""
Compute Bernstein approximation of function using Acyclic PH.
Approximates a function using Bernstein polynomials with
an Acyclic Phase-type distribution representation.
Args:
f: Function to approximate
order: Order of Bernstein approximation
Returns:
Acyclic PH approximation result
"""
class FunctionWrapper:
def __init__(self, python_func):
self.python_func = python_func
def apply(self, x):
return float(self.python_func(float(x)))
function_wrapper = FunctionWrapper(f)
result = jpype.JPackage('jline').api.mam.Aph_bernsteinKt.aph_bernstein(
function_wrapper.apply,
jpype.JInt(order)
)
D0 = jlineMatrixToArray(result.getFirst())
D1 = jlineMatrixToArray(result.getSecond())
return D0, D1
[docs]
def map_jointpdf_derivative(map_matrices, iset):
"""
Compute joint PDF derivative for MAP.
Calculates the derivative of the joint probability density function
for a Markovian Arrival Process with respect to specified indices.
Args:
map_matrices: List of MAP matrices [D0, D1]
iset: Set of indices for derivative computation
Returns:
Joint PDF derivative result
"""
java_map = jpype.JPackage('jline').util.matrix.MatrixCell(
jpype.JArray(jpype.JPackage('jline').util.matrix.Matrix)([
jlineMatrixFromArray(map_matrices[0]),
jlineMatrixFromArray(map_matrices[1])
])
)
java_iset = jpype.JArray(jpype.JInt)(len(iset))
for i, val in enumerate(iset):
java_iset[i] = int(val)
return jpype.JPackage('jline').api.mam.Map_jointpdf_derivativeKt.map_jointpdf_derivative(
java_map, java_iset
)
[docs]
def map_ccdf_derivative(map_matrices, i):
"""
Compute complementary CDF derivative for MAP.
Calculates the derivative of the complementary cumulative
distribution function for a Markovian Arrival Process.
Args:
map_matrices: List of MAP matrices [D0, D1]
i: Index parameter for derivative computation
Returns:
Complementary CDF derivative result
"""
java_map = jpype.JPackage('jline').util.matrix.MatrixCell(
jpype.JArray(jpype.JPackage('jline').util.matrix.Matrix)([
jlineMatrixFromArray(map_matrices[0]),
jlineMatrixFromArray(map_matrices[1])
])
)
return jpype.JPackage('jline').api.mam.Map_ccdf_derivativeKt.map_ccdf_derivative(
java_map, jpype.JInt(i)
)
[docs]
def qbd_R(B, L, F, iter_max=100000):
"""
Compute the R matrix for a Quasi-Birth-Death process.
Solves the matrix equation R = F + R*L*R + R^2*B using iterative methods.
The R matrix is fundamental in QBD analysis.
Args:
B: Backward transition matrix
L: Local transition matrix
F: Forward transition matrix
iter_max: Maximum number of iterations (default: 100000)
Returns:
numpy.ndarray: The R matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Qbd_RKt.qbd_R(
jlineMatrixFromArray(B),
jlineMatrixFromArray(L),
jlineMatrixFromArray(F),
jpype.JInt(iter_max)
)
)
[docs]
def qbd_R_logred(B, L, F, iter_max=100000):
"""
Compute the R matrix using logarithmic reduction algorithm.
Alternative method for computing the R matrix that can be more
numerically stable for certain QBD problems.
Args:
B: Backward transition matrix
L: Local transition matrix
F: Forward transition matrix
iter_max: Maximum number of iterations (default: 100000)
Returns:
numpy.ndarray: The R matrix
"""
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Qbd_R_logredKt.qbd_R_logred(
jlineMatrixFromArray(B),
jlineMatrixFromArray(L),
jlineMatrixFromArray(F),
jpype.JInt(iter_max)
)
)
[docs]
def qbd_rg(map_a, map_s, util=None):
"""
Analyze a MAP/MAP/1 queue using QBD methods.
Analyzes a single-server queue with MAP arrivals and MAP service times
using Quasi-Birth-Death process techniques.
Args:
map_a: Arrival MAP (list/array with D0, D1 matrices)
map_s: Service MAP (list/array with D0, D1 matrices)
util: Utilization constraint (optional)
Returns:
QBD analysis results including performance measures
"""
java_map_a = jpype.JPackage('jline').util.matrix.MatrixCell(
jpype.JArray(jpype.JPackage('jline').util.matrix.Matrix)([
jlineMatrixFromArray(map_a[0]),
jlineMatrixFromArray(map_a[1])
])
)
java_map_s = jpype.JPackage('jline').util.matrix.MatrixCell(
jpype.JArray(jpype.JPackage('jline').util.matrix.Matrix)([
jlineMatrixFromArray(map_s[0]),
jlineMatrixFromArray(map_s[1])
])
)
if util is not None:
result = jpype.JPackage('jline').api.mam.Qbd_rgKt.qbd_rg(
java_map_a, java_map_s, jpype.JDouble(util)
)
else:
result = jpype.JPackage('jline').api.mam.Qbd_rgKt.qbd_rg(
java_map_a, java_map_s, None
)
return {
'R': jlineMatrixToArray(result.getR()),
'G': jlineMatrixToArray(result.getG()),
'B': jlineMatrixToArray(result.getB()),
'L': jlineMatrixToArray(result.getL()),
'F': jlineMatrixToArray(result.getF()),
'U': jlineMatrixToArray(result.getU())
}
[docs]
def map_pdf(MAP, points):
"""
Compute probability density function of interarrival times.
Evaluates the probability density function (PDF) of the interarrival time
distribution for a Markovian Arrival Process at the specified time points.
Args:
MAP: MAP as tuple (D0, D1) representing the Markovian Arrival Process
points: Array of time points to evaluate the PDF at
Returns:
numpy.ndarray: PDF values at the specified points, returned in the
same order as the points vector
Examples:
- map_pdf(MAP, [0.5, 1.0, 2.0]) returns PDF values at t=0.5, 1.0, 2.0
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_pdfKt.map_pdf(
map_cell, jlineMatrixFromArray(points)
)
)
[docs]
def map_prob(MAP, t):
"""
Compute probability for MAP at time t.
Calculates the probability measure associated with
the Markovian Arrival Process at the specified time.
Args:
MAP: MAP as tuple (D0, D1) matrices
t: Time point for probability computation
Returns:
float: Probability value at time t
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
return jlineMatrixToArray(
jpype.JPackage('jline').api.mam.Map_probKt.map_prob(
map_cell, jpype.JDouble(t)
)
)
[docs]
def map_joint(MAP1, MAP2):
"""
Compute joint distribution of two MAPs.
Creates the joint distribution representation of two
independent Markovian Arrival Processes.
Args:
MAP1: First MAP as tuple (D0, D1)
MAP2: Second MAP as tuple (D0, D1)
Returns:
Joint MAP representation
"""
if isinstance(MAP1, (list, tuple)) and len(MAP1) == 2:
D0_1, D1_1 = MAP1
map_cell_1 = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_1), jlineMatrixFromArray(D1_1)
)
else:
raise ValueError("MAP1 must be a tuple/list of (D0, D1) matrices")
if isinstance(MAP2, (list, tuple)) and len(MAP2) == 2:
D0_2, D1_2 = MAP2
map_cell_2 = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_2), jlineMatrixFromArray(D1_2)
)
else:
raise ValueError("MAP2 must be a tuple/list of (D0, D1) matrices")
result = jpype.JPackage('jline').api.mam.Map_jointKt.map_joint(
map_cell_1, map_cell_2
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def map_mixture(alpha, MAPs):
"""
Create mixture of multiple MAPs.
Constructs a mixture distribution from multiple Markovian
Arrival Processes with given mixing probabilities.
Args:
alpha: Mixing probabilities (weights) for each MAP
MAPs: List of MAP matrices to mix
Returns:
Mixture MAP representation
"""
java_alpha = jpype.JArray(jpype.JDouble)(list(alpha))
java_maps = jpype.JArray(jpype.JClass("jline.lang.MatrixCell"))(len(MAPs))
for i, MAP in enumerate(MAPs):
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
java_maps[i] = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError(f"MAP {i} must be a tuple/list of (D0, D1) matrices")
result = jpype.JPackage('jline').api.mam.Map_mixtureKt.map_mixture(
java_alpha, java_maps
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def map_max(MAP1, MAP2):
"""
Compute maximum of two independent MAPs.
Args:
MAP1: First MAP as tuple (D0, D1)
MAP2: Second MAP as tuple (D0, D1)
Returns:
MAP representing max(X,Y) where X~MAP1, Y~MAP2
"""
if isinstance(MAP1, (list, tuple)) and len(MAP1) == 2:
D0_1, D1_1 = MAP1
map_cell_1 = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_1), jlineMatrixFromArray(D1_1)
)
else:
raise ValueError("MAP1 must be a tuple/list of (D0, D1) matrices")
if isinstance(MAP2, (list, tuple)) and len(MAP2) == 2:
D0_2, D1_2 = MAP2
map_cell_2 = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_2), jlineMatrixFromArray(D1_2)
)
else:
raise ValueError("MAP2 must be a tuple/list of (D0, D1) matrices")
result = jpype.JPackage('jline').api.mam.Map_maxKt.map_max(
map_cell_1, map_cell_2
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def map_renewal(MAPIN):
"""
Remove all correlations from a MAP to create a renewal process.
Transforms a Markovian Arrival Process into a renewal MAP process
with the same cumulative distribution function (CDF) as the input MAP,
but with no correlations between inter-arrival times.
Args:
MAPIN: Input MAP as tuple (D0, D1) representing the original MAP
Returns:
Renewal MAP with same marginal distribution but independent inter-arrival times
Note:
The resulting process maintains the same inter-arrival time distribution
but removes all temporal dependencies, making it a renewal process.
"""
if isinstance(MAPIN, (list, tuple)) and len(MAPIN) == 2:
D0, D1 = MAPIN
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAPIN must be a tuple/list of (D0, D1) matrices")
result = jpype.JPackage('jline').api.mam.Map_renewalKt.map_renewal(map_cell)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def map_stochcomp(MAP):
"""
Compute stochastic complement of MAP by eliminating states.
Args:
MAP: MAP as tuple (D0, D1)
Returns:
Reduced MAP with eliminated transient states
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
result = jpype.JPackage('jline').api.mam.Map_stochcompKt.map_stochcomp(map_cell)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def qbd_mapmap1(MAP_A, MAP_S, mu=None):
"""
Analyze MAP/MAP/1 queueing system using QBD methods.
Args:
MAP_A: Arrival MAP as tuple (D0, D1)
MAP_S: Service MAP as tuple (D0, D1)
mu: Service rate parameter (optional)
Returns:
dict: QBD analysis results including performance measures
"""
if isinstance(MAP_A, (list, tuple)) and len(MAP_A) == 2:
D0_A, D1_A = MAP_A
map_a_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_A), jlineMatrixFromArray(D1_A)
)
else:
raise ValueError("MAP_A must be a tuple/list of (D0, D1) matrices")
if isinstance(MAP_S, (list, tuple)) and len(MAP_S) == 2:
D0_S, D1_S = MAP_S
map_s_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_S), jlineMatrixFromArray(D1_S)
)
else:
raise ValueError("MAP_S must be a tuple/list of (D0, D1) matrices")
if mu is not None:
result = jpype.JPackage('jline').api.mam.Qbd_mapmap1Kt.qbd_mapmap1(
map_a_cell, map_s_cell, jpype.JDouble(mu)
)
else:
result = jpype.JPackage('jline').api.mam.Qbd_mapmap1Kt.qbd_mapmap1(
map_a_cell, map_s_cell
)
return {
'pi': jlineMatrixToArray(result.getPi()) if hasattr(result, 'getPi') else None,
'R': jlineMatrixToArray(result.getR()) if hasattr(result, 'getR') else None,
'utilization': float(result.getUtilization()) if hasattr(result, 'getUtilization') else None,
'mean_queue_length': float(result.getMeanQueueLength()) if hasattr(result, 'getMeanQueueLength') else None,
'mean_waiting_time': float(result.getMeanWaitingTime()) if hasattr(result, 'getMeanWaitingTime') else None
}
[docs]
def qbd_raprap1(RAP_A, RAP_S, util=None):
"""
Analyze RAP/RAP/1 queueing system using QBD methods.
Args:
RAP_A: Arrival RAP (Rational Arrival Process)
RAP_S: Service RAP
util: Utilization parameter (optional)
Returns:
dict: QBD analysis results for RAP/RAP/1 queue
"""
if isinstance(RAP_A, (list, tuple)) and len(RAP_A) == 2:
D0_A, D1_A = RAP_A
rap_a_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_A), jlineMatrixFromArray(D1_A)
)
else:
raise ValueError("RAP_A must be a tuple/list of (D0, D1) matrices")
if isinstance(RAP_S, (list, tuple)) and len(RAP_S) == 2:
D0_S, D1_S = RAP_S
rap_s_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_S), jlineMatrixFromArray(D1_S)
)
else:
raise ValueError("RAP_S must be a tuple/list of (D0, D1) matrices")
if util is not None:
result = jpype.JPackage('jline').api.mam.Qbd_raprap1Kt.qbd_raprap1(
rap_a_cell, rap_s_cell, jpype.JDouble(util)
)
else:
result = jpype.JPackage('jline').api.mam.Qbd_raprap1Kt.qbd_raprap1(
rap_a_cell, rap_s_cell
)
return {
'R': jlineMatrixToArray(result.getR()) if hasattr(result, 'getR') else None,
'G': jlineMatrixToArray(result.getG()) if hasattr(result, 'getG') else None,
'pi': jlineMatrixToArray(result.getPi()) if hasattr(result, 'getPi') else None,
'performance_metrics': result.getPerformanceMetrics() if hasattr(result, 'getPerformanceMetrics') else None
}
[docs]
def qbd_bmapbmap1(BMAP_A, BMAP_S):
"""
Analyze BMAP/BMAP/1 queueing system using QBD methods.
Args:
BMAP_A: Arrival Batch MAP
BMAP_S: Service Batch MAP
Returns:
dict: QBD analysis results for batch arrival/service system
"""
if isinstance(BMAP_A, list) and len(BMAP_A) >= 2:
java_bmap_a = jpype.JClass("jline.lang.MatrixCell")(len(BMAP_A))
for i, matrix in enumerate(BMAP_A):
java_bmap_a.set(i, jlineMatrixFromArray(matrix))
else:
raise ValueError("BMAP_A must be a list of at least 2 matrices [D0, D1, ...]")
if isinstance(BMAP_S, list) and len(BMAP_S) >= 2:
java_bmap_s = jpype.JClass("jline.lang.MatrixCell")(len(BMAP_S))
for i, matrix in enumerate(BMAP_S):
java_bmap_s.set(i, jlineMatrixFromArray(matrix))
else:
raise ValueError("BMAP_S must be a list of at least 2 matrices [D0, D1, ...]")
result = jpype.JPackage('jline').api.mam.Qbd_bmapbmap1Kt.qbd_bmapbmap1(
java_bmap_a, java_bmap_s
)
return {
'R': jlineMatrixToArray(result.getR()) if hasattr(result, 'getR') else None,
'G': jlineMatrixToArray(result.getG()) if hasattr(result, 'getG') else None,
'performance_metrics': result.getMetrics() if hasattr(result, 'getMetrics') else None
}
[docs]
def qbd_setupdelayoff(MAP_A, MAP_S, setup_time, delay_time, off_time):
"""
Analyze queue with setup, delay, and off periods using QBD methods.
Models a queueing system with server setup times, processing delays,
and off periods using Quasi-Birth-Death process analysis.
Args:
MAP_A: Arrival MAP as tuple (D0, D1).
MAP_S: Service MAP as tuple (D0, D1).
setup_time: Server setup time.
delay_time: Processing delay time.
off_time: Server off time.
Returns:
dict: Performance measures for the system with timing constraints.
"""
if isinstance(MAP_A, (list, tuple)) and len(MAP_A) == 2:
D0_A, D1_A = MAP_A
map_a_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_A), jlineMatrixFromArray(D1_A)
)
else:
raise ValueError("MAP_A must be a tuple/list of (D0, D1) matrices")
if isinstance(MAP_S, (list, tuple)) and len(MAP_S) == 2:
D0_S, D1_S = MAP_S
map_s_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_S), jlineMatrixFromArray(D1_S)
)
else:
raise ValueError("MAP_S must be a tuple/list of (D0, D1) matrices")
result = jpype.JPackage('jline').api.mam.Qbd_setupdelayoffKt.qbd_setupdelayoff(
map_a_cell, map_s_cell,
jpype.JDouble(setup_time), jpype.JDouble(delay_time), jpype.JDouble(off_time)
)
return {
'steady_state': jlineMatrixToArray(result.getSteadyState()) if hasattr(result, 'getSteadyState') else None,
'performance_metrics': result.getMetrics() if hasattr(result, 'getMetrics') else None
}
[docs]
def aph_simplify(a1, T1, a2, T2, p1, p2, pattern):
"""
Simplify APH representation using specified pattern.
Args:
a1: Initial vector for first APH
T1: Generator matrix for first APH
a2: Initial vector for second APH
T2: Generator matrix for second APH
p1: Probability parameter for first APH
p2: Probability parameter for second APH
pattern: Simplification pattern identifier
Returns:
tuple: (alpha, T) simplified APH representation
"""
result = jpype.JPackage('jline').api.mam.Aph_simplifyKt.aph_simplify(
jlineMatrixFromArray(a1), jlineMatrixFromArray(T1),
jlineMatrixFromArray(a2), jlineMatrixFromArray(T2),
jpype.JDouble(p1), jpype.JDouble(p2), jpype.JInt(pattern)
)
return (jlineMatrixToArray(result.getFirst()), jlineMatrixToArray(result.getSecond()))
[docs]
def randp(P, rows, cols=None):
"""
Pick random values with relative probability.
Returns integers in the range from 1 to len(P) with a relative probability,
so that the value X is present approximately (P[X-1]/sum(P)) times.
Args:
P: Probability vector (all values should be >= 0)
rows: Number of rows for output matrix
cols: Number of columns for output matrix (defaults to rows if not specified)
Returns:
Matrix of random integers with specified probabilities
"""
if cols is None:
cols = rows
# Convert P to Java double array
if hasattr(P, '__iter__'):
P_array = jpype.JArray(jpype.JDouble)(len(P))
for i, val in enumerate(P):
P_array[i] = float(val)
else:
P_array = jpype.JArray(jpype.JDouble)([float(P)])
result = jpype.JPackage('jline').api.mam.RandpKt.randp(
P_array, jpype.JInt(rows), jpype.JInt(cols)
)
return jlineMatrixToArray(result)
[docs]
def aph_rand(n):
"""
Generate random APH distribution with n phases.
Args:
n: Number of phases
Returns:
tuple: (alpha, T) random APH representation
"""
result = jpype.JPackage('jline').api.mam.Aph_randKt.aph_rand(jpype.JInt(n))
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def amap2_fit_gamma(mean1, var1, mean2, var2, p):
"""
Fit 2-phase AMAP using gamma matching method.
Args:
mean1: Mean for first phase
var1: Variance for first phase
mean2: Mean for second phase
var2: Variance for second phase
p: Phase probability
Returns:
tuple: (D0, D1) fitted AMAP matrices
"""
result = jpype.JPackage('jline').api.mam.Amap2_fit_gammaKt.amap2_fit_gamma(
jpype.JDouble(mean1), jpype.JDouble(var1),
jpype.JDouble(mean2), jpype.JDouble(var2),
jpype.JDouble(p)
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def mamap2m_fit_fb_multiclass(data, classes, options=None):
"""
Fit multiclass MAMAP using feedback method.
Args:
data: Input data for fitting
classes: Number of classes
options: Fitting options (optional)
Returns:
dict: Fitted AMAP and quality metrics
"""
java_options = None
if options is not None:
pass
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Mamap2m_fit_fb_multiclassKt.mamap2m_fit_fb_multiclass(
jlineMatrixFromArray(data), jpype.JInt(classes), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Mamap2m_fit_fb_multiclassKt.mamap2m_fit_fb_multiclass(
jlineMatrixFromArray(data), jpype.JInt(classes)
)
return {
'fitted_amap': result.getFittedAmap() if hasattr(result, 'getFittedAmap') else None,
'quality_metrics': result.getQualityMetrics() if hasattr(result, 'getQualityMetrics') else None
}
[docs]
def mmpp_rand(states, lambda_range=(0.1, 5.0)):
result = jpype.JPackage('jline').api.mam.Mmpp_randKt.mmpp_rand(
jpype.JInt(states), jpype.JDouble(lambda_range[0]), jpype.JDouble(lambda_range[1])
)
return (jlineMatrixToArray(result.getQ()), jlineMatrixToArray(result.getLambda()))
[docs]
def map_count_moment(MAP, k, lag=0):
"""
Compute count moments of MAP arrivals.
Args:
MAP: MAP as tuple (D0, D1)
k: Moment order
lag: Time lag (default: 0)
Returns:
float: Count moment value
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
return float(jpype.JPackage('jline').api.mam.Map_count_momentKt.map_count_moment(
map_cell, jpype.JInt(k), jpype.JInt(lag)
))
[docs]
def map_kurt(MAP):
"""
Calculate kurtosis of inter-arrival times for a MAP.
Computes the fourth moment (kurtosis) of the inter-arrival time
distribution for a Markovian Arrival Process.
Args:
MAP: MAP as tuple (D0, D1) of generator matrices.
Returns:
float: Kurtosis of inter-arrival times.
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
return float(jpype.JPackage('jline').api.mam.Map_kurtKt.map_kurt(map_cell))
[docs]
def mmap_sigma2_cell(MMAP):
"""
Compute two-step class transition probabilities for MMAP (cell version).
Args:
MMAP: Multi-class MAP matrices as cell structure
Returns:
3D matrix of class transition probabilities
"""
if isinstance(MMAP, list):
mmap_cell = jpype.JClass("jline.lang.MatrixCell")()
for i, matrix in enumerate(MMAP):
mmap_cell.set(i, jlineMatrixFromArray(matrix))
else:
mmap_cell = MMAP
return float(jpype.JPackage('jline').api.mam.Mmap_sigma2_cellKt.mmap_sigma2_cell(mmap_cell))
[docs]
def amap2_adjust_gamma(mean1, var1, mean2, var2, p, target_mean, target_var):
"""
Adjust 2-phase AMAP gamma parameters to target moments.
Args:
mean1: Mean for first phase
var1: Variance for first phase
mean2: Mean for second phase
var2: Variance for second phase
p: Phase probability
target_mean: Target mean value
target_var: Target variance value
Returns:
Adjusted AMAP parameters
"""
result = jpype.JPackage('jline').api.mam.Amap2_adjust_gammaKt.amap2_adjust_gamma(
jpype.JDouble(mean1), jpype.JDouble(var1),
jpype.JDouble(mean2), jpype.JDouble(var2),
jpype.JDouble(p), jpype.JDouble(target_mean), jpype.JDouble(target_var)
)
return {
'mean1': result.mean1 if hasattr(result, 'mean1') else None,
'var1': result.var1 if hasattr(result, 'var1') else None,
'mean2': result.mean2 if hasattr(result, 'mean2') else None,
'var2': result.var2 if hasattr(result, 'var2') else None,
'p': result.p if hasattr(result, 'p') else None
}
[docs]
def amap2_fitall_gamma(data, options=None):
"""
Fit all gamma parameters for 2-phase AMAP from data.
Args:
data: Input data for fitting
options: Fitting options (optional)
Returns:
dict: Fitted AMAP parameters and quality metrics
"""
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Amap2_fitall_gammaKt.amap2_fitall_gamma(
jlineMatrixFromArray(data), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Amap2_fitall_gammaKt.amap2_fitall_gamma(
jlineMatrixFromArray(data)
)
return {
'parameters': {
'mean1': result.mean1 if hasattr(result, 'mean1') else None,
'var1': result.var1 if hasattr(result, 'var1') else None,
'mean2': result.mean2 if hasattr(result, 'var2') else None,
'var2': result.var2 if hasattr(result, 'var2') else None,
'p': result.p if hasattr(result, 'p') else None
},
'quality': result.quality if hasattr(result, 'quality') else None,
'converged': result.converged if hasattr(result, 'converged') else None
}
[docs]
def mmpp2_fit_mu00(data, options=None):
"""
Fit 2-phase MMPP parameter mu00 from data.
Args:
data: Input data for parameter fitting
options: Fitting options (optional)
Returns:
Fitted mu00 parameter
"""
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_mu00Kt.mmpp2_fit_mu00(
jlineMatrixFromArray(data), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_mu00Kt.mmpp2_fit_mu00(
jlineMatrixFromArray(data)
)
return float(result)
[docs]
def mmpp2_fit_mu11(data, options=None):
"""
Fit 2-phase MMPP parameter mu11 from data.
Args:
data: Input data for parameter fitting
options: Fitting options (optional)
Returns:
Fitted mu11 parameter
"""
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_mu11Kt.mmpp2_fit_mu11(
jlineMatrixFromArray(data), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_mu11Kt.mmpp2_fit_mu11(
jlineMatrixFromArray(data)
)
return float(result)
[docs]
def mmpp2_fit_q01(data, options=None):
"""
Fit 2-phase MMPP parameter q01 from data.
Args:
data: Input data for parameter fitting
options: Fitting options (optional)
Returns:
Fitted q01 parameter
"""
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_q01Kt.mmpp2_fit_q01(
jlineMatrixFromArray(data), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_q01Kt.mmpp2_fit_q01(
jlineMatrixFromArray(data)
)
return float(result)
[docs]
def mmpp2_fit_q10(data, options=None):
"""
Fit 2-phase MMPP parameter q10 from data.
Args:
data: Input data for parameter fitting
options: Fitting options (optional)
Returns:
Fitted q10 parameter
"""
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_q10Kt.mmpp2_fit_q10(
jlineMatrixFromArray(data), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Mmpp2_fit_q10Kt.mmpp2_fit_q10(
jlineMatrixFromArray(data)
)
return float(result)
[docs]
def assess_compression_quality(original_MAP, compressed_MAP, metrics=['mean', 'var', 'acf']):
"""
Assess quality of MAP compression by comparing metrics.
Evaluates how well a compressed MAP approximates the original MAP
by comparing specified statistical metrics.
Args:
original_MAP: Original MAP as tuple (D0, D1).
compressed_MAP: Compressed MAP as tuple (D0, D1).
metrics: List of metrics to compare (default: ['mean', 'var', 'acf']).
Returns:
dict: Quality assessment results for each metric.
"""
if isinstance(original_MAP, (list, tuple)) and len(original_MAP) == 2:
D0_orig, D1_orig = original_MAP
orig_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_orig), jlineMatrixFromArray(D1_orig)
)
else:
raise ValueError("original_MAP must be a tuple/list of (D0, D1) matrices")
if isinstance(compressed_MAP, (list, tuple)) and len(compressed_MAP) == 2:
D0_comp, D1_comp = compressed_MAP
comp_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0_comp), jlineMatrixFromArray(D1_comp)
)
else:
raise ValueError("compressed_MAP must be a tuple/list of (D0, D1) matrices")
java_metrics = jpype.JPackage('java.util').ArrayList()
for metric in metrics:
java_metrics.add(jpype.JObject(metric, jpype.JClass("java.lang.String")))
result = jpype.JPackage('jline').api.mam.Assess_compression_qualityKt.assess_compression_quality(
orig_cell, comp_cell, java_metrics
)
return {
'error_metrics': jlineMatrixToArray(result.errorMetrics) if hasattr(result, 'errorMetrics') else None,
'relative_errors': jlineMatrixToArray(result.relativeErrors) if hasattr(result, 'relativeErrors') else None,
'quality_score': result.qualityScore if hasattr(result, 'qualityScore') else None
}
[docs]
def compress_adaptive(MAP, target_order, options=None):
"""
Compress MAP using adaptive compression algorithm.
Args:
MAP: MAP as tuple (D0, D1)
target_order: Target order for compression
options: Compression options (optional)
Returns:
Compressed MAP with reduced order
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Compress_adaptiveKt.compress_adaptive(
map_cell, jpype.JInt(target_order), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Compress_adaptiveKt.compress_adaptive(
map_cell, jpype.JInt(target_order)
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def compress_autocorrelation(MAP, target_order, options=None):
"""
Compress MAP preserving autocorrelation properties.
Args:
MAP: MAP as tuple (D0, D1)
target_order: Target order for compression
options: Compression options (optional)
Returns:
Compressed MAP preserving autocorrelation structure
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Compress_autocorrelationKt.compress_autocorrelation(
map_cell, jpype.JInt(target_order), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Compress_autocorrelationKt.compress_autocorrelation(
map_cell, jpype.JInt(target_order)
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def compress_spectral(MAP, target_order, options=None):
"""
Compress MAP using spectral decomposition methods.
Args:
MAP: MAP as tuple (D0, D1)
target_order: Target order for compression
options: Compression options (optional)
Returns:
Compressed MAP using spectral techniques
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Compress_spectralKt.compress_spectral(
map_cell, jpype.JInt(target_order), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Compress_spectralKt.compress_spectral(
map_cell, jpype.JInt(target_order)
)
return (jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1)))
[docs]
def compress_with_quality_control(MAP, target_order, quality_threshold=0.95, options=None):
"""
Compress MAP with quality control constraints.
Args:
MAP: MAP as tuple (D0, D1)
target_order: Target order for compression
quality_threshold: Quality threshold (default: 0.95)
options: Compression options (optional)
Returns:
dict: Compressed MAP and quality metrics
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
raise ValueError("MAP must be a tuple/list of (D0, D1) matrices")
java_options = jpype.JObject(options) if options is not None else None
if java_options is not None:
result = jpype.JPackage('jline').api.mam.Compress_with_quality_controlKt.compress_with_quality_control(
map_cell, jpype.JInt(target_order), jpype.JDouble(quality_threshold), java_options
)
else:
result = jpype.JPackage('jline').api.mam.Compress_with_quality_controlKt.compress_with_quality_control(
map_cell, jpype.JInt(target_order), jpype.JDouble(quality_threshold)
)
return {
'compressed_MAP': (jlineMatrixToArray(result.compressedMAP.get(0)), jlineMatrixToArray(result.compressedMAP.get(1))),
'quality_achieved': result.qualityAchieved if hasattr(result, 'qualityAchieved') else None,
'meets_threshold': result.meetsThreshold if hasattr(result, 'meetsThreshold') else None
}
[docs]
def qbd_G(A0, A1, A2):
"""
Compute G matrix for Quasi-Birth-Death (QBD) process.
Calculates the G matrix which represents the probability
generating function for the first passage time from level k to k+1.
Args:
A0: Transition rate matrix for level down
A1: Transition rate matrix for same level
A2: Transition rate matrix for level up
Returns:
numpy.ndarray: G matrix
"""
A0_matrix = jlineMatrixFromArray(A0)
A1_matrix = jlineMatrixFromArray(A1)
A2_matrix = jlineMatrixFromArray(A2)
result = jpype.JPackage('jline').api.mam.Qbd_GKt.qbd_G(
A0_matrix, A1_matrix, A2_matrix
)
return jlineMatrixToArray(result)
[docs]
def ph_fit(data, max_phases=10):
"""
Fit phase-type distribution to empirical data.
Estimates the parameters of a phase-type distribution that
best fits the given data using maximum likelihood estimation.
Args:
data: Empirical data to fit
max_phases: Maximum number of phases to use
Returns:
tuple: (alpha, T) - Initial probability vector and transition rate matrix
"""
data_array = jpype.JArray(jpype.JDouble)(data)
result = jpype.JPackage('jline').api.mam.Ph_fitKt.ph_fit(
data_array, jpype.JInt(max_phases)
)
alpha = jlineMatrixToArray(result.getFirst())
T = jlineMatrixToArray(result.getSecond())
return alpha, T
[docs]
def ph_mean(alpha, T):
"""
Compute mean of phase-type distribution.
Calculates the expected value (first moment) of a phase-type
distribution with given initial vector and transition matrix.
Args:
alpha: Initial probability vector
T: Transition rate matrix
Returns:
float: Mean of the phase-type distribution
"""
alpha_matrix = jlineMatrixFromArray(alpha)
T_matrix = jlineMatrixFromArray(T)
result = jpype.JPackage('jline').api.mam.Ph_meanKt.ph_mean(
alpha_matrix, T_matrix
)
return float(result)
[docs]
def ph_var(alpha, T):
"""
Compute variance of phase-type distribution.
Calculates the variance (second central moment) of a phase-type
distribution with given parameters.
Args:
alpha: Initial probability vector
T: Transition rate matrix
Returns:
float: Variance of the phase-type distribution
"""
alpha_matrix = jlineMatrixFromArray(alpha)
T_matrix = jlineMatrixFromArray(T)
result = jpype.JPackage('jline').api.mam.Ph_varKt.ph_var(
alpha_matrix, T_matrix
)
return float(result)
[docs]
def ph_pdf(alpha, T, points):
"""
Compute PDF of phase-type distribution at given points.
Evaluates the probability density function of a phase-type
distribution at the specified points.
Args:
alpha: Initial probability vector
T: Transition rate matrix
points: Points at which to evaluate the PDF
Returns:
numpy.ndarray: PDF values at the specified points
"""
alpha_matrix = jlineMatrixFromArray(alpha)
T_matrix = jlineMatrixFromArray(T)
points_matrix = jlineMatrixFromArray(points)
result = jpype.JPackage('jline').api.mam.Ph_pdfKt.ph_pdf(
alpha_matrix, T_matrix, points_matrix
)
return jlineMatrixToArray(result)
[docs]
def ph_cdf(alpha, T, points):
"""
Compute CDF of phase-type distribution at given points.
Evaluates the cumulative distribution function of a phase-type
distribution at the specified points.
Args:
alpha: Initial probability vector
T: Transition rate matrix
points: Points at which to evaluate the CDF
Returns:
numpy.ndarray: CDF values at the specified points
"""
alpha_matrix = jlineMatrixFromArray(alpha)
T_matrix = jlineMatrixFromArray(T)
points_matrix = jlineMatrixFromArray(points)
result = jpype.JPackage('jline').api.mam.Ph_cdfKt.ph_cdf(
alpha_matrix, T_matrix, points_matrix
)
return jlineMatrixToArray(result)
[docs]
def qbd_psif(A0, A1, A2, B, options=None):
"""
Compute finite QBD stationary probability vector.
Calculates the stationary probability vector for a finite
Quasi-Birth-Death process with boundary conditions.
Args:
A0: Transition rate matrix for level down
A1: Transition rate matrix for same level
A2: Transition rate matrix for level up
B: Boundary condition matrix
options: Optional solver options
Returns:
numpy.ndarray: Stationary probability vector
"""
A0_matrix = jlineMatrixFromArray(A0)
A1_matrix = jlineMatrixFromArray(A1)
A2_matrix = jlineMatrixFromArray(A2)
B_matrix = jlineMatrixFromArray(B)
if options is not None:
result = jpype.JPackage('jline').api.mam.Qbd_psifKt.qbd_psif(
A0_matrix, A1_matrix, A2_matrix, B_matrix,
jpype.JObject(options)
)
else:
result = jpype.JPackage('jline').api.mam.Qbd_psifKt.qbd_psif(
A0_matrix, A1_matrix, A2_matrix, B_matrix
)
return jlineMatrixToArray(result)
[docs]
def qbd_psi(A0, A1, A2, options=None):
"""
Compute Psi matrix for QBD process using iterative methods.
Args:
A0: QBD backward transition matrix
A1: QBD local transition matrix
A2: QBD forward transition matrix
options: Computational options (optional)
Returns:
numpy.ndarray: Psi matrix for QBD fundamental matrices
"""
A0_matrix = jlineMatrixFromArray(A0)
A1_matrix = jlineMatrixFromArray(A1)
A2_matrix = jlineMatrixFromArray(A2)
if options is not None:
result = jpype.JPackage('jline').api.mam.Qbd_psiKt.qbd_psi(
A0_matrix, A1_matrix, A2_matrix, jpype.JObject(options)
)
else:
result = jpype.JPackage('jline').api.mam.Qbd_psiKt.qbd_psi(
A0_matrix, A1_matrix, A2_matrix
)
return jlineMatrixToArray(result)
[docs]
def aph2_check_feasibility(M1, M2, M3):
"""
Check feasibility of 2-phase APH distribution parameters.
Args:
M1: First moment
M2: Second moment
M3: Third moment
Returns:
bool: True if parameters are feasible for 2-phase APH
"""
result = jpype.JPackage('jline').api.mam.Aph2_check_feasibilityKt.aph2_check_feasibility(
jpype.JDouble(M1), jpype.JDouble(M2), jpype.JDouble(M3)
)
return bool(result)
[docs]
def aph2_canonical(a1, t11, a2, t22):
"""
Convert 2-phase APH to canonical form.
Args:
a1: Initial probability for first phase
t11: Transition rate for first phase
a2: Initial probability for second phase
t22: Transition rate for second phase
Returns:
tuple: Canonical form APH parameters
"""
result = jpype.JPackage('jline').api.mam.Aph2_canonicalKt.aph2_canonical(
jpype.JDouble(a1), jpype.JDouble(t11),
jpype.JDouble(a2), jpype.JDouble(t22)
)
alpha = jlineMatrixToArray(result.getFirst())
T = jlineMatrixToArray(result.getSecond())
return alpha, T
[docs]
def map_cdf_derivative(MAP, x, order=1):
"""
Compute derivative of MAP cumulative distribution function.
Args:
MAP: MAP as tuple (D0, D1)
x: Point at which to evaluate derivative
order: Derivative order (default: 1)
Returns:
float: CDF derivative value at point x
"""
if isinstance(MAP, (list, tuple)) and len(MAP) == 2:
D0, D1 = MAP
map_cell = jpype.JClass("jline.lang.MatrixCell")(
jlineMatrixFromArray(D0), jlineMatrixFromArray(D1)
)
else:
map_cell = MAP
result = jpype.JPackage('jline').api.mam.Map_cdf_derivativeKt.map_cdf_derivative(
map_cell, jpype.JDouble(x), jpype.JInt(order)
)
return float(result)
[docs]
def map_rand_moment(K, target_mean=1.0, target_var=1.0):
"""
Generate random MAP with specified moments.
Creates a random Markovian Arrival Process with the given
number of states and target statistical moments.
Args:
K: Number of states
target_mean: Target mean inter-arrival time
target_var: Target variance of inter-arrival time
Returns:
tuple: (D0, D1) - Random MAP matrices with target moments
"""
result = jpype.JPackage('jline').api.mam.Map_rand_momentKt.map_rand_moment(
jpype.JInt(K), jpype.JDouble(target_mean), jpype.JDouble(target_var)
)
return result
[docs]
def qbd_solve(A0, A1, A2):
"""
Solve Quasi-Birth-Death process for stationary distribution.
Computes the stationary probability distribution and rate matrix
for an infinite QBD process.
Args:
A0: Transition rate matrix for level down
A1: Transition rate matrix for same level
A2: Transition rate matrix for level up
Returns:
tuple: (pi0, R) - Boundary probabilities and rate matrix
"""
from .. import jlineMatrixFromArray, jlineMatrixToArray
java_A0 = jlineMatrixFromArray(A0)
java_A1 = jlineMatrixFromArray(A1)
java_A2 = jlineMatrixFromArray(A2)
java_result = jpype.JPackage('jline').api.mam.Qbd_solveKt.qbd_solve(
java_A0, java_A1, java_A2
)
pi0 = jlineMatrixToArray(java_result.getPi0())
R = jlineMatrixToArray(java_result.getR())
return pi0, R
# Additional MAM functions for complete API coverage
[docs]
def amap2_fit_gamma_map(map_obj):
"""
Fits AMAP(2) by approximating arbitrary-order MAP with preserved correlation structure.
Args:
map_obj: Input MAP object to approximate
Returns:
tuple: Fitted AMAP(2) parameters (D0, D1)
"""
result = jpype.JPackage('jline').api.mam.Amap2_fit_gamma_mapKt.amap2_fit_gamma_map(map_obj)
return jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1))
[docs]
def amap2_fit_gamma_trace(trace):
"""
Fits AMAP(2) from empirical traces while preserving autocorrelation characteristics.
Args:
trace: Empirical trace data (array-like)
Returns:
tuple: Fitted AMAP(2) parameters (D0, D1)
"""
trace_matrix = jlineMatrixFromArray(trace)
result = jpype.JPackage('jline').api.mam.Amap2_fit_gamma_traceKt.amap2_fit_gamma_trace(trace_matrix)
return jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1))
[docs]
def aph2_fit_map(map_obj):
"""
Fits APH(2) distributions by approximating arbitrary-order MAP processes.
Args:
map_obj: Input MAP object to approximate with APH(2)
Returns:
tuple: Fitted APH(2) parameters (alpha, T)
"""
result = jpype.JPackage('jline').api.mam.Aph2_fit_mapKt.aph2_fit_map(map_obj)
return jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1))
[docs]
def aph2_fit_trace(trace):
"""
Fits APH(2) distributions from empirical inter-arrival time traces.
Args:
trace: Empirical trace data (array-like)
Returns:
tuple: Fitted APH(2) parameters (alpha, T)
"""
trace_matrix = jlineMatrixFromArray(trace)
result = jpype.JPackage('jline').api.mam.Aph2_fit_traceKt.aph2_fit_trace(trace_matrix)
return jlineMatrixToArray(result.get(0)), jlineMatrixToArray(result.get(1))
[docs]
def qbd_r(A0, A1, A2):
"""
Computes the rate matrix R for QBD processes.
Args:
A0: Level down transition matrix
A1: Level transition matrix
A2: Level up transition matrix
Returns:
numpy.ndarray: Rate matrix R
"""
result = jpype.JPackage('jline').api.mam.Qbd_rKt.qbd_r(
jlineMatrixFromArray(A0),
jlineMatrixFromArray(A1),
jlineMatrixFromArray(A2)
)
return jlineMatrixToArray(result)
[docs]
def map_block(MAP, k):
"""
Extracts a block from a MAP representation.
Args:
MAP: Markovian Arrival Process representation
k: Block index
Returns:
numpy.ndarray: The k-th block of the MAP
"""
result = jpype.JPackage('jline').api.mam.Map_blockKt.map_block(
jlineMatrixFromArray(MAP), int(k)
)
return jlineMatrixToArray(result)
[docs]
def map_feasblock(MAP):
"""
Computes feasibility blocks for MAP representation.
Args:
MAP: Markovian Arrival Process representation
Returns:
dict: Feasibility block information
"""
result = jpype.JPackage('jline').api.mam.Map_feasblockKt.map_feasblock(
jlineMatrixFromArray(MAP)
)
return {
'blocks': jlineMatrixToArray(result.getBlocks()),
'feasible': bool(result.isFeasible())
}
[docs]
def map_kpc(MAP, k):
"""
Computes k-th order phase-type correlation for MAP.
Args:
MAP: Markovian Arrival Process representation
k: Correlation order
Returns:
float: k-th order correlation coefficient
"""
result = jpype.JPackage('jline').api.mam.Map_kpcKt.map_kpc(
jlineMatrixFromArray(MAP), int(k)
)
return float(result)
[docs]
def map_pntiter(MAP, n, epsilon=1e-8):
"""
Point process iteration method for MAP analysis.
Args:
MAP: Markovian Arrival Process representation
n: Number of iterations
epsilon: Convergence tolerance
Returns:
dict: Iteration results
"""
result = jpype.JPackage('jline').api.mam.Map_pntiterKt.map_pntiter(
jlineMatrixFromArray(MAP), int(n), float(epsilon)
)
return {
'points': jlineMatrixToArray(result.getPoints()),
'converged': bool(result.isConverged()),
'iterations': int(result.getIterations())
}
[docs]
def map_pntquad(MAP, n):
"""
Point process quadrature method for MAP analysis.
Args:
MAP: Markovian Arrival Process representation
n: Number of quadrature points
Returns:
dict: Quadrature results
"""
result = jpype.JPackage('jline').api.mam.Map_pntquadKt.map_pntquad(
jlineMatrixFromArray(MAP), int(n)
)
return {
'points': jlineMatrixToArray(result.getPoints()),
'weights': jlineMatrixToArray(result.getWeights())
}
[docs]
def map2mmpp(MAP):
"""
Converts MAP to MMPP representation.
Args:
MAP: Markovian Arrival Process representation
Returns:
dict: MMPP representation with D0 and D1 matrices
"""
result = jpype.JPackage('jline').api.mam.Map2mmppKt.map2mmpp(
jlineMatrixFromArray(MAP)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def m3pp_rand(n, params=None):
"""
Generates random M3PP (third-order Markov-modulated Poisson process).
Args:
n: Number of states
params: Optional parameters for generation
Returns:
dict: Random M3PP representation
"""
if params is not None:
result = jpype.JPackage('jline').api.mam.M3pp_randKt.m3pp_rand(
int(n), jlineMatrixFromArray(params)
)
else:
result = jpype.JPackage('jline').api.mam.M3pp_randKt.m3pp_rand(int(n))
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'D2': jlineMatrixToArray(result.getD2())
}
[docs]
def m3pp_interleave_fitc_theoretical(M3PP1, M3PP2):
"""
Fits interleaved M3PP using theoretical approach.
Args:
M3PP1: First M3PP process
M3PP2: Second M3PP process
Returns:
dict: Interleaved M3PP representation
"""
result = jpype.JPackage('jline').api.mam.M3pp_interleave_fitc_theoreticalKt.m3pp_interleave_fitc_theoretical(
jlineMatrixFromArray(M3PP1), jlineMatrixFromArray(M3PP2)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'D2': jlineMatrixToArray(result.getD2())
}
[docs]
def m3pp_interleave_fitc_trace(trace1, trace2):
"""
Fits interleaved M3PP from trace data.
Args:
trace1: First trace data
trace2: Second trace data
Returns:
dict: Fitted M3PP representation
"""
result = jpype.JPackage('jline').api.mam.M3pp_interleave_fitc_traceKt.m3pp_interleave_fitc_trace(
jlineMatrixFromArray(trace1), jlineMatrixFromArray(trace2)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'D2': jlineMatrixToArray(result.getD2()),
'quality': float(result.getQuality())
}
[docs]
def m3pp_superpos_fitc(M3PPs):
"""
Fits superposition of multiple M3PP processes.
Args:
M3PPs: List of M3PP processes
Returns:
dict: Superposed M3PP representation
"""
result = jpype.JPackage('jline').api.mam.M3pp_superpos_fitcKt.m3pp_superpos_fitc(
jlineMatrixFromArray(M3PPs)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'D2': jlineMatrixToArray(result.getD2())
}
[docs]
def m3pp_superpos_fitc_theoretical(M3PPs):
"""
Fits superposition of M3PP processes using theoretical approach.
Args:
M3PPs: List of M3PP processes
Returns:
dict: Superposed M3PP representation
"""
result = jpype.JPackage('jline').api.mam.M3pp_superpos_fitc_theoreticalKt.m3pp_superpos_fitc_theoretical(
jlineMatrixFromArray(M3PPs)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'D2': jlineMatrixToArray(result.getD2())
}
[docs]
def maph2m_fit(data, m, h):
"""
Fits PH(2,m) distribution to data.
Args:
data: Input data for fitting
m: Number of phases
h: Hyper-parameter
Returns:
dict: Fitted PH(2,m) representation
"""
result = jpype.JPackage('jline').api.mam.Maph2m_fitKt.maph2m_fit(
jlineMatrixFromArray(data), int(m), int(h)
)
return {
'alpha': jlineMatrixToArray(result.getAlpha()),
'T': jlineMatrixToArray(result.getT()),
'quality': float(result.getQuality())
}
[docs]
def maph2m_fitc_approx(moments, m):
"""
Approximate fitting of PH(2,m) from moments.
Args:
moments: Statistical moments
m: Number of phases
Returns:
dict: Fitted PH(2,m) representation
"""
result = jpype.JPackage('jline').api.mam.Maph2m_fitc_approxKt.maph2m_fitc_approx(
jlineMatrixFromArray(moments), int(m)
)
return {
'alpha': jlineMatrixToArray(result.getAlpha()),
'T': jlineMatrixToArray(result.getT())
}
[docs]
def maph2m_fitc_theoretical(params, m):
"""
Theoretical fitting of PH(2,m) distribution.
Args:
params: Theoretical parameters
m: Number of phases
Returns:
dict: Fitted PH(2,m) representation
"""
result = jpype.JPackage('jline').api.mam.Maph2m_fitc_theoreticalKt.maph2m_fitc_theoretical(
jlineMatrixFromArray(params), int(m)
)
return {
'alpha': jlineMatrixToArray(result.getAlpha()),
'T': jlineMatrixToArray(result.getT())
}
[docs]
def maph2m_fit_mmap(MMAP, m):
"""
Fits PH(2,m) from MMAP representation.
Args:
MMAP: Marked MAP representation
m: Number of phases
Returns:
dict: Fitted PH(2,m) representation
"""
result = jpype.JPackage('jline').api.mam.Maph2m_fit_mmapKt.maph2m_fit_mmap(
jlineMatrixFromArray(MMAP), int(m)
)
return {
'alpha': jlineMatrixToArray(result.getAlpha()),
'T': jlineMatrixToArray(result.getT())
}
[docs]
def maph2m_fit_multiclass(data, classes, m):
"""
Fits PH(2,m) for multiclass data.
Args:
data: Multiclass input data
classes: Class definitions
m: Number of phases
Returns:
dict: Fitted multiclass PH(2,m) representation
"""
result = jpype.JPackage('jline').api.mam.Maph2m_fit_multiclassKt.maph2m_fit_multiclass(
jlineMatrixFromArray(data), jlineMatrixFromArray(classes), int(m)
)
return {
'alpha': jlineMatrixToArray(result.getAlpha()),
'T': jlineMatrixToArray(result.getT()),
'class_params': jlineMatrixToArray(result.getClassParams()) if hasattr(result, 'getClassParams') else None
}
[docs]
def maph2m_fit_trace(trace, m):
"""
Fits PH(2,m) from trace data.
Args:
trace: Trace data
m: Number of phases
Returns:
dict: Fitted PH(2,m) representation
"""
result = jpype.JPackage('jline').api.mam.Maph2m_fit_traceKt.maph2m_fit_trace(
jlineMatrixFromArray(trace), int(m)
)
return {
'alpha': jlineMatrixToArray(result.getAlpha()),
'T': jlineMatrixToArray(result.getT()),
'quality': float(result.getQuality())
}
[docs]
def mmap_embedded(MMAP):
"""
Computes embedded process of MMAP.
Args:
MMAP: Marked MAP representation
Returns:
numpy.ndarray: Embedded process representation
"""
result = jpype.JPackage('jline').api.mam.Mmap_embeddedKt.mmap_embedded(
jlineMatrixFromArray(MMAP)
)
return jlineMatrixToArray(result)
[docs]
def mmap_count_mcov(MMAP, lag):
"""
Computes cross-covariance of counts for MMAP.
Args:
MMAP: Marked MAP representation
lag: Time lag for covariance
Returns:
numpy.ndarray: Cross-covariance matrix
"""
result = jpype.JPackage('jline').api.mam.Mmap_count_mcovKt.mmap_count_mcov(
jlineMatrixFromArray(MMAP), int(lag)
)
return jlineMatrixToArray(result)
[docs]
def mmap_issym(MMAP):
"""
Checks if MMAP is symmetric.
Args:
MMAP: Marked MAP representation
Returns:
bool: True if MMAP is symmetric
"""
result = jpype.JPackage('jline').api.mam.Mmap_issymKt.mmap_issym(
jlineMatrixFromArray(MMAP)
)
return bool(result)
[docs]
def mmap_max(MMAPs):
"""
Computes maximum of multiple MMAPs.
Args:
MMAPs: List of MMAP representations
Returns:
dict: Maximum MMAP representation
"""
result = jpype.JPackage('jline').api.mam.Mmap_maxKt.mmap_max(
jlineMatrixFromArray(MMAPs)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mmap_mixture_order2(MMAPs, weights):
"""
Creates second-order mixture of MMAPs.
Args:
MMAPs: List of MMAP representations
weights: Mixture weights
Returns:
dict: Mixed MMAP representation
"""
result = jpype.JPackage('jline').api.mam.Mmap_mixture_order2Kt.mmap_mixture_order2(
jlineMatrixFromArray(MMAPs), jlineMatrixFromArray(weights)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mmap_modulate(MMAP, modulation):
"""
Modulates MMAP with given modulation function.
Args:
MMAP: Marked MAP representation
modulation: Modulation function or parameters
Returns:
dict: Modulated MMAP representation
"""
result = jpype.JPackage('jline').api.mam.Mmap_modulateKt.mmap_modulate(
jlineMatrixFromArray(MMAP), jlineMatrixFromArray(modulation)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mmap_pie(MMAP):
"""
Computes stationary distribution of MMAP.
Args:
MMAP: Marked MAP representation
Returns:
numpy.ndarray: Stationary distribution
"""
result = jpype.JPackage('jline').api.mam.Mmap_pieKt.mmap_pie(
jlineMatrixFromArray(MMAP)
)
return jlineMatrixToArray(result)
[docs]
def mmap_sigma(MMAP):
"""
Computes sigma parameter for MMAP.
Args:
MMAP: Marked MAP representation
Returns:
float: Sigma parameter
"""
result = jpype.JPackage('jline').api.mam.Mmap_sigmaKt.mmap_sigma(
jlineMatrixFromArray(MMAP)
)
return float(result)
[docs]
def mmap_sum(MMAPs):
"""
Computes sum of multiple MMAPs.
Args:
MMAPs: List of MMAP representations
Returns:
dict: Summed MMAP representation
"""
result = jpype.JPackage('jline').api.mam.Mmap_sumKt.mmap_sum(
jlineMatrixFromArray(MMAPs)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mmpp2_fitc(data):
"""
Fits MMPP(2) using correlation fitting.
Args:
data: Input data for fitting
Returns:
dict: Fitted MMPP(2) representation
"""
result = jpype.JPackage('jline').api.mam.Mmpp2_fitcKt.mmpp2_fitc(
jlineMatrixFromArray(data)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'quality': float(result.getQuality())
}
[docs]
def mmpp2_fitc_approx(moments):
"""
Approximate fitting of MMPP(2) from moments.
Args:
moments: Statistical moments
Returns:
dict: Fitted MMPP(2) representation
"""
result = jpype.JPackage('jline').api.mam.Mmpp2_fitc_approxKt.mmpp2_fitc_approx(
jlineMatrixFromArray(moments)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def qbd_r_logred(A0, A1, A2):
"""
Computes the rate matrix R using logarithmic reduction.
Args:
A0: Level down transition matrix
A1: Level transition matrix
A2: Level up transition matrix
Returns:
numpy.ndarray: Rate matrix R computed via logarithmic reduction
"""
result = jpype.JPackage('jline').api.mam.Qbd_r_logredKt.qbd_r_logred(
jlineMatrixFromArray(A0),
jlineMatrixFromArray(A1),
jlineMatrixFromArray(A2)
)
return jlineMatrixToArray(result)
# MAPQN Functions
[docs]
def mapqn_bnd_lr(MAP, N, Z=None):
"""
Computes lower and upper response time bounds for MAP queueing networks.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: Lower and upper bounds
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_lrKt.mapqn_bnd_lr(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_lrKt.mapqn_bnd_lr(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'lower': jlineMatrixToArray(result.getLower()),
'upper': jlineMatrixToArray(result.getUpper())
}
[docs]
def mapqn_bnd_lr_mva(MAP, N, Z=None):
"""
Computes MAP queueing network bounds using MVA approach.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: MVA-based bounds
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_lr_mvaKt.mapqn_bnd_lr_mva(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_lr_mvaKt.mapqn_bnd_lr_mva(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'lower': jlineMatrixToArray(result.getLower()),
'upper': jlineMatrixToArray(result.getUpper()),
'iterations': int(result.getIterations()) if hasattr(result, 'getIterations') else None
}
[docs]
def mapqn_bnd_lr_pf(MAP, N, Z=None):
"""
Computes MAP queueing network bounds using product-form approach.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: Product-form based bounds
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_lr_pfKt.mapqn_bnd_lr_pf(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_lr_pfKt.mapqn_bnd_lr_pf(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'lower': jlineMatrixToArray(result.getLower()),
'upper': jlineMatrixToArray(result.getUpper())
}
[docs]
def mapqn_bnd_qr(MAP, N, Z=None):
"""
Computes queue length and response time bounds for MAP queueing networks.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: Queue and response time bounds
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_qrKt.mapqn_bnd_qr(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_qrKt.mapqn_bnd_qr(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'queue_lower': jlineMatrixToArray(result.getQueueLower()),
'queue_upper': jlineMatrixToArray(result.getQueueUpper()),
'response_lower': jlineMatrixToArray(result.getResponseLower()),
'response_upper': jlineMatrixToArray(result.getResponseUpper())
}
[docs]
def mapqn_bnd_qr_delay(MAP, N, Z, delay):
"""
Computes bounds for MAP queueing networks with delay.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times
delay: Delay parameters
Returns:
dict: Bounds with delay consideration
"""
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_qr_delayKt.mapqn_bnd_qr_delay(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N),
jlineMatrixFromArray(Z), jlineMatrixFromArray(delay)
)
return {
'queue_lower': jlineMatrixToArray(result.getQueueLower()),
'queue_upper': jlineMatrixToArray(result.getQueueUpper()),
'response_lower': jlineMatrixToArray(result.getResponseLower()),
'response_upper': jlineMatrixToArray(result.getResponseUpper())
}
[docs]
def mapqn_bnd_qr_ld(MAP, N, Z, mu):
"""
Computes bounds for load-dependent MAP queueing networks.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times
mu: Load-dependent service rates
Returns:
dict: Load-dependent bounds
"""
result = jpype.JPackage('jline').api.mam.Mapqn_bnd_qr_ldKt.mapqn_bnd_qr_ld(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N),
jlineMatrixFromArray(Z), jlineMatrixFromArray(mu)
)
return {
'queue_lower': jlineMatrixToArray(result.getQueueLower()),
'queue_upper': jlineMatrixToArray(result.getQueueUpper()),
'response_lower': jlineMatrixToArray(result.getResponseLower()),
'response_upper': jlineMatrixToArray(result.getResponseUpper())
}
[docs]
def mapqn_lpmodel(MAP, N, Z=None):
"""
Creates linear programming model for MAP queueing network.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: LP model components
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_lpmodelKt.mapqn_lpmodel(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_lpmodelKt.mapqn_lpmodel(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'A': jlineMatrixToArray(result.getA()),
'b': jlineMatrixToArray(result.getB()),
'c': jlineMatrixToArray(result.getC()),
'bounds': jlineMatrixToArray(result.getBounds()) if hasattr(result, 'getBounds') else None
}
[docs]
def mapqn_parameters(MAP, N):
"""
Extracts parameters from MAP queueing network.
Args:
MAP: MAP representation of arrival process
N: Population vector
Returns:
dict: Network parameters
"""
result = jpype.JPackage('jline').api.mam.Mapqn_parametersKt.mapqn_parameters(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'arrival_rate': float(result.getArrivalRate()),
'service_rates': jlineMatrixToArray(result.getServiceRates()),
'routing': jlineMatrixToArray(result.getRouting()),
'think_times': jlineMatrixToArray(result.getThinkTimes()) if hasattr(result, 'getThinkTimes') else None
}
[docs]
def mapqn_parameters_factory(params_dict):
"""
Factory method to create MAP queueing network parameters.
Args:
params_dict: Dictionary of parameters
Returns:
dict: Formatted network parameters
"""
result = jpype.JPackage('jline').api.mam.Mapqn_parameters_factoryKt.mapqn_parameters_factory(
jpype.JObject(params_dict)
)
return {
'MAP': jlineMatrixToArray(result.getMAP()),
'N': jlineMatrixToArray(result.getN()),
'Z': jlineMatrixToArray(result.getZ()) if hasattr(result, 'getZ') else None
}
[docs]
def mapqn_qr_bounds_bas(MAP, N, Z=None):
"""
Computes Balanced Asymptotic System (BAS) bounds for MAP queueing networks.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: BAS bounds
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_qr_bounds_basKt.mapqn_qr_bounds_bas(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_qr_bounds_basKt.mapqn_qr_bounds_bas(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'queue_bounds': jlineMatrixToArray(result.getQueueBounds()),
'response_bounds': jlineMatrixToArray(result.getResponseBounds())
}
[docs]
def mapqn_qr_bounds_rsrd(MAP, N, Z=None):
"""
Computes Response-time Scaled Routing Delay (RSRD) bounds for MAP queueing networks.
Args:
MAP: MAP representation of arrival process
N: Population vector
Z: Think times (optional)
Returns:
dict: RSRD bounds
"""
if Z is not None:
result = jpype.JPackage('jline').api.mam.Mapqn_qr_bounds_rsrdKt.mapqn_qr_bounds_rsrd(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N), jlineMatrixFromArray(Z)
)
else:
result = jpype.JPackage('jline').api.mam.Mapqn_qr_bounds_rsrdKt.mapqn_qr_bounds_rsrd(
jlineMatrixFromArray(MAP), jlineMatrixFromArray(N)
)
return {
'queue_bounds': jlineMatrixToArray(result.getQueueBounds()),
'response_bounds': jlineMatrixToArray(result.getResponseBounds())
}
# M3PP Functions (Markovian Multi-class Point Processes)
[docs]
def m3pp22_fitc_approx_cov(mean, scv, skew, cov):
"""
Implements parameter fitting for second-order Marked Markov Modulated Poisson Process.
Args:
mean: Mean inter-arrival time
scv: Squared coefficient of variation
skew: Skewness
cov: Covariance matrix
Returns:
dict: Fitted M3PP(2,2) parameters
"""
result = jpype.JPackage('jline').api.mam.m3pp.M3pp22_fitc_approx_covKt.m3pp22_fitc_approx_cov(
float(mean), float(scv), float(skew), arrayToJLineMatrix(cov)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'params': jlineMatrixToArray(result.getParams())
}
# M3PP and MAMAP Functions
# Merged from mam_extended.py
[docs]
def mamap2m_coefficients(mean, scv, skew):
"""
Computes coefficients for MAMAP(2,m) fitting.
Args:
mean: Mean value
scv: Squared coefficient of variation
skew: Skewness
Returns:
numpy.ndarray: Coefficient matrix
"""
result = jpype.JPackage('jline').api.mam.Mamap2m_coefficientsKt.mamap2m_coefficients(
float(mean), float(scv), float(skew)
)
return jlineMatrixToArray(result)
[docs]
def m3pp22_fitc_approx_cov_multiclass(means, scvs, skews, covs):
"""
Implements constrained optimization for fitting M3PP(2,2) parameters given an underlying.
Args:
means: Mean inter-arrival times per class
scvs: Squared coefficients of variation per class
skews: Skewness values per class
covs: Covariance matrices
Returns:
dict: Fitted M3PP(2,2) parameters for multiple classes
"""
result = jpype.JPackage('jline').api.mam.m3pp.M3pp22_fitc_approx_cov_multiclassKt.m3pp22_fitc_approx_cov_multiclass(
arrayToJLineMatrix(means),
arrayToJLineMatrix(scvs),
arrayToJLineMatrix(skews),
arrayToJLineMatrix(covs)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'params': jlineMatrixToArray(result.getParams())
}
[docs]
def m3pp22_interleave_fitc(processes):
"""
Implements lumped superposition of multiple M3PP(2,2) processes using interleaved.
Args:
processes: List of M3PP(2,2) process parameters
Returns:
dict: Interleaved M3PP(2,2) parameters
"""
java_processes = jpype.JPackage('java.util').ArrayList()
for proc in processes:
java_processes.add(proc)
result = jpype.JPackage('jline').api.mam.m3pp.M3pp22_interleave_fitcKt.m3pp22_interleave_fitc(java_processes)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def m3pp2m_fitc(mean, scv, skew, num_classes):
"""
Implements exact fitting of second-order Marked Markov Modulated Poisson Process.
Args:
mean: Mean inter-arrival time
scv: Squared coefficient of variation
skew: Skewness
num_classes: Number of classes
Returns:
dict: Fitted M3PP(2,m) parameters
"""
result = jpype.JPackage('jline').api.mam.m3pp.M3pp2m_fitcKt.m3pp2m_fitc(
float(mean), float(scv), float(skew), int(num_classes)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'params': jlineMatrixToArray(result.getParams())
}
[docs]
def m3pp2m_fitc_approx(mean, scv, skew, num_classes):
"""
Implements approximation-based fitting for M3PP(2,m) using optimization methods.
Args:
mean: Mean inter-arrival time
scv: Squared coefficient of variation
skew: Skewness
num_classes: Number of classes
Returns:
dict: Fitted M3PP(2,m) parameters
"""
result = jpype.JPackage('jline').api.mam.m3pp.M3pp2m_fitc_approxKt.m3pp2m_fitc_approx(
float(mean), float(scv), float(skew), int(num_classes)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'params': jlineMatrixToArray(result.getParams())
}
[docs]
def m3pp2m_fitc_approx_ag(mean, scv, skew, num_classes):
"""
Implements auto-gamma approximation method for M3PP(2,m) parameter fitting.
Args:
mean: Mean inter-arrival time
scv: Squared coefficient of variation
skew: Skewness
num_classes: Number of classes
Returns:
dict: Fitted M3PP(2,m) parameters using auto-gamma
"""
result = jpype.JPackage('jline').api.mam.m3pp.M3pp2m_fitc_approx_agKt.m3pp2m_fitc_approx_ag(
float(mean), float(scv), float(skew), int(num_classes)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'params': jlineMatrixToArray(result.getParams())
}
[docs]
def m3pp2m_fitc_approx_ag_multiclass(means, scvs, skews, num_classes):
"""
Implements multiclass auto-gamma fitting for M3PP(2,m) with variance and covariance.
Args:
means: Mean inter-arrival times per class
scvs: Squared coefficients of variation per class
skews: Skewness values per class
num_classes: Number of classes
Returns:
dict: Fitted M3PP(2,m) parameters for multiple classes
"""
result = jpype.JPackage('jline').api.mam.m3pp.M3pp2m_fitc_approx_ag_multiclassKt.m3pp2m_fitc_approx_ag_multiclass(
arrayToJLineMatrix(means),
arrayToJLineMatrix(scvs),
arrayToJLineMatrix(skews),
int(num_classes)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1()),
'params': jlineMatrixToArray(result.getParams())
}
[docs]
def m3pp2m_interleave(processes):
"""
Implements interleaved superposition of multiple M3PP(2,m) processes to construct.
Args:
processes: List of M3PP(2,m) process parameters
Returns:
dict: Interleaved M3PP(2,m) parameters
"""
java_processes = jpype.JPackage('java.util').ArrayList()
for proc in processes:
java_processes.add(proc)
result = jpype.JPackage('jline').api.mam.m3pp.M3pp2m_interleaveKt.m3pp2m_interleave(java_processes)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def m3pp_interleave_fitc(processes):
"""
Implements fitting and interleaving of k second-order M3PP processes with varying.
Args:
processes: List of M3PP process parameters
Returns:
dict: Interleaved M3PP parameters
"""
java_processes = jpype.JPackage('java.util').ArrayList()
for proc in processes:
java_processes.add(proc)
result = jpype.JPackage('jline').api.mam.m3pp.M3pp_interleave_fitcKt.m3pp_interleave_fitc(java_processes)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
# Additional MAMAP functions
[docs]
def mamap22_fit_gamma_fs_trace(trace):
"""
Fits MAMAP(2,2) from trace data using gamma autocorrelation and forward-sigma characteristics.
Args:
trace: Empirical trace data
Returns:
dict: Fitted MAMAP(2,2) parameters
"""
trace_matrix = arrayToJLineMatrix(trace)
result = jpype.JPackage('jline').api.mam.Mamap22_fit_gamma_fs_traceKt.mamap22_fit_gamma_fs_trace(trace_matrix)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mamap22_fit_multiclass(class_data):
"""
Fits MAMAP(2,2) processes for two-class systems with forward moments and sigma characteristics.
Args:
class_data: Data for multiple classes
Returns:
dict: Fitted MAMAP(2,2) parameters for multiple classes
"""
result = jpype.JPackage('jline').api.mam.Mamap22_fit_multiclassKt.mamap22_fit_multiclass(class_data)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mamap2m_fit(mean, scv, skew, num_states):
"""
Fits MAMAP(2,m) processes matching moments, autocorrelation, and class characteristics.
Args:
mean: Mean value
scv: Squared coefficient of variation
skew: Skewness
num_states: Number of states
Returns:
dict: Fitted MAMAP(2,m) parameters
"""
result = jpype.JPackage('jline').api.mam.Mamap2m_fitKt.mamap2m_fit(
float(mean), float(scv), float(skew), int(num_states)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mamap2m_fit_mmap(mmap_obj):
"""
Fits MAPH/MAMAP(2,m) by approximating characteristics of input MMAP processes.
Args:
mmap_obj: Input MMAP object
Returns:
dict: Fitted MAMAP(2,m) parameters
"""
result = jpype.JPackage('jline').api.mam.Mamap2m_fit_mmapKt.mamap2m_fit_mmap(mmap_obj)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}
[docs]
def mamap2m_fit_trace(trace, num_states):
"""
Fits MAMAP(2,m) processes from empirical trace data with inter-arrival times and class labels.
Args:
trace: Empirical trace data
num_states: Number of states
Returns:
dict: Fitted MAMAP(2,m) parameters
"""
trace_matrix = arrayToJLineMatrix(trace)
result = jpype.JPackage('jline').api.mam.Mamap2m_fit_traceKt.mamap2m_fit_trace(
trace_matrix, int(num_states)
)
return {
'D0': jlineMatrixToArray(result.getD0()),
'D1': jlineMatrixToArray(result.getD1())
}