"""
TTL-Based Cache Analysis Methods.
Native Python implementations of Time-To-Live (TTL) based cache analysis
methods, including LRU(m) and hierarchical LRU caches.
Key functions:
cache_t_lrum: Characteristic times for LRU(m) cache levels
cache_t_hlru: Characteristic times for hierarchical LRU cache
cache_ttl_lrum: Steady-state probabilities for TTL-LRU(m) cache
cache_ttl_hlru: Steady-state probabilities for TTL hierarchical LRU
cache_ttl_lrua: Steady-state probabilities with arrival-based routing
References:
Original MATLAB: matlab/src/api/cache/cache_ttl*.m, cache_t*.m
"""
import numpy as np
from scipy.optimize import fsolve
from typing import Tuple, Optional
[docs]
def cache_t_lrum(gamma: np.ndarray, m: np.ndarray) -> np.ndarray:
"""
Compute characteristic times for LRU(m) cache levels.
Uses fixed-point iteration (fsolve) to compute the characteristic time
for each level of an LRU(m) cache.
Args:
gamma: Item popularity probabilities or arrival rates (n x h)
m: Cache capacity vector (h,)
Returns:
Characteristic time for each cache level (h,)
References:
Original MATLAB: matlab/src/api/cache/cache_t_lrum.m
"""
gamma = np.asarray(gamma, dtype=np.float64)
m = np.asarray(m, dtype=np.float64).ravel()
n, h = gamma.shape
def time_equations(x):
"""System of equations for characteristic times."""
F = np.zeros(h)
# Compute transition probabilities and capacities
trans = np.zeros((n, h))
logtrans = np.zeros((n, h))
denom = np.zeros(n)
stablecapa = np.zeros(h)
for k in range(n):
for j in range(h):
val = np.exp(gamma[k, j] * x[j]) - 1
trans[k, j] = max(val, 1e-300)
logtrans[k, j] = np.log(trans[k, j])
denom[k] = np.sum(np.exp(np.cumsum(logtrans[k, :])))
for l in range(h):
for k in range(n):
log_prod = np.sum(np.log(trans[k, :l+1]))
stablecapa[l] += np.exp(log_prod - np.log(1 + denom[k]))
F[l] = m[l] - stablecapa[l]
return F
# Initial guess
x0 = np.ones(h)
# Solve
t, info, ier, mesg = fsolve(time_equations, x0, full_output=True)
if ier != 1:
# Try different initial conditions
for scale in [0.1, 0.5, 2.0, 5.0]:
t, info, ier, mesg = fsolve(time_equations, x0 * scale, full_output=True)
if ier == 1:
break
return t
[docs]
def cache_t_hlru(gamma: np.ndarray, m: np.ndarray) -> np.ndarray:
"""
Compute characteristic times for hierarchical LRU cache levels.
Uses fixed-point iteration (fsolve) to compute the characteristic time
for each level of a hierarchical LRU cache.
Args:
gamma: Item popularity probabilities or arrival rates (n x h)
m: Cache capacity vector (h,)
Returns:
Characteristic time for each cache level (h,)
References:
Original MATLAB: matlab/src/api/cache/cache_t_hlru.m
"""
gamma = np.asarray(gamma, dtype=np.float64)
m = np.asarray(m, dtype=np.float64).ravel()
n, h = gamma.shape
def time_equations(x):
"""System of equations for characteristic times."""
F = np.zeros(h)
for a in range(h):
temp1 = np.ones(n)
temp2 = np.zeros(n)
probh = np.zeros(n)
for k in range(n):
# Compute temp1: product of (1 - exp(-gamma * t)) for levels 0..a
for s in range(a + 1):
temp1[k] *= (1 - np.exp(-gamma[k, s] * x[s]))
# Compute temp2
middtemp2 = 0.0
for l in range(a):
middtemp = 1.0
for s in range(l + 1):
middtemp *= (1 - np.exp(-gamma[k, s] * x[s]))
middtemp2 += middtemp
temp2[k] = np.exp(-gamma[k, a] * x[a]) * (1 + middtemp2)
# Hit probability at level a
denom = temp1[k] + temp2[k]
probh[k] = temp1[k] / denom if denom > 0 else 0.0
F[a] = m[a] - np.sum(probh)
return F
# Initial guess
x0 = np.ones(h)
# Solve
t, info, ier, mesg = fsolve(time_equations, x0, full_output=True)
if ier != 1:
for scale in [0.1, 0.5, 2.0, 5.0]:
t, info, ier, mesg = fsolve(time_equations, x0 * scale, full_output=True)
if ier == 1:
break
return t
[docs]
def cache_ttl_lrum(gamma: np.ndarray, m: np.ndarray) -> np.ndarray:
"""
Compute steady-state probabilities for TTL-based LRU(m) cache.
Args:
gamma: Arrival rates per item per list. Can be (n x h) or (1 x n x h+1).
If 3D, uses gamma[0, :, 1:] as the (n x h) input.
m: Cache capacity vector (h,)
Returns:
Steady-state probability distribution (n x h+1):
prob[:, 0] = miss probability
prob[:, 1:] = hit probability at each level
References:
Original MATLAB: matlab/src/api/cache/cache_ttl_lrum.m
"""
gamma = np.asarray(gamma, dtype=np.float64)
m = np.asarray(m, dtype=np.float64).ravel()
# Handle 3D input (MATLAB convention)
if gamma.ndim == 3:
gamma = gamma[0, :, 1:]
n, h = gamma.shape
# Compute characteristic times
t = cache_t_lrum(gamma, m)
# Compute steady-state probabilities
probh = np.zeros((n, h))
prob0 = np.zeros(n)
trans = np.zeros((n, h))
denom = np.zeros(n)
for k in range(n):
for j in range(h):
trans[k, j] = np.exp(gamma[k, j] * t[j]) - 1
denom[k] = np.sum(np.cumprod(trans[k, :]))
for k in range(n):
for l in range(h):
probh[k, l] = np.prod(trans[k, :l+1]) / (1 + denom[k])
prob0[k] = 1 - np.sum(probh[k, :])
prob = np.column_stack([prob0, probh])
return prob
[docs]
def cache_ttl_hlru(gamma: np.ndarray, m: np.ndarray) -> np.ndarray:
"""
Compute steady-state probabilities for TTL-based hierarchical LRU cache.
Args:
gamma: Arrival rates per item per list. Can be (n x h) or (1 x n x h+1).
If 3D, uses gamma[0, :, 1:] as the (n x h) input.
m: Cache capacity vector (h,)
Returns:
Steady-state probability distribution (n x 2):
prob[:, 0] = miss probability
prob[:, 1] = hit probability at level h
References:
Original MATLAB: matlab/src/api/cache/cache_ttl_hlru.m
"""
gamma = np.asarray(gamma, dtype=np.float64)
m = np.asarray(m, dtype=np.float64).ravel()
# Handle 3D input
if gamma.ndim == 3:
gamma = gamma[0, :, 1:]
n, h = gamma.shape
# Compute characteristic times
t = cache_t_hlru(gamma, m)
# Compute steady-state probabilities
probh = np.zeros(n)
prob0 = np.zeros(n)
temp1 = np.ones(n)
temp2 = np.zeros(n)
for k in range(n):
# Product of (1 - exp(-gamma * t)) for all levels
for s in range(h):
temp1[k] *= (1 - np.exp(-gamma[k, s] * t[s]))
# Compute temp2
middtemp2 = 0.0
for l in range(h - 1):
middtemp = 1.0
for s in range(l + 1):
middtemp *= (1 - np.exp(-gamma[k, s] * t[s]))
middtemp2 += middtemp
temp2[k] = np.exp(-gamma[k, h-1] * t[h-1]) * (1 + middtemp2)
denom = temp1[k] + temp2[k]
probh[k] = temp1[k] / denom if denom > 0 else 0.0
prob0[k] = 1 / (denom * np.exp(gamma[k, h-1] * t[h-1])) if denom > 0 else 1.0
prob = np.column_stack([prob0, probh])
return prob
[docs]
def cache_ttl_lrua(lambd: np.ndarray, R: list, m: np.ndarray,
seed: int = 23000) -> np.ndarray:
"""
Compute steady-state probabilities for TTL-LRU cache with arrival routing.
Uses fixed-point iteration with DTMC solving for cache systems with
multiple users, items, and levels with routing.
Args:
lambd: Arrival rates per user per item per list (u x n x h+1)
R: Routing probability structure (list of n matrices, each (h+1 x h+1))
m: Cache capacity vector (h,)
seed: Random seed for initialization (default: 23000)
Returns:
Steady-state probability distribution (n x h+1)
References:
Original MATLAB: matlab/src/api/cache/cache_ttl_lrua.m
"""
np.random.seed(seed)
lambd = np.asarray(lambd, dtype=np.float64)
m = np.asarray(m, dtype=np.float64).ravel()
u = lambd.shape[0] # number of users
n = lambd.shape[1] # number of items
h = lambd.shape[2] - 1 # number of lists
def ttl_tree_time(x):
"""Compute capacity difference for given characteristic times."""
steadystateprob = np.zeros((n, h + 1))
randprob = np.zeros((n, h + 1))
avgtime = np.zeros((n, h + 1))
capa = np.zeros(h)
rpdenominator = np.zeros(n)
for i in range(n):
# Build transition matrix
Ri = R[i]
transmatrix = np.zeros((h + 1, h + 1))
for j in range(h + 1):
leafnodes = np.where(Ri[j, :] > 0)[0]
for k in leafnodes:
if j == 0:
transmatrix[j, k] = Ri[j, k]
else:
transmatrix[j, k] = (1 - np.exp(-lambd[0, i, j] * x[j-1])) * Ri[j, k]
if j != k and k > 0:
transmatrix[k, j] = np.exp(-lambd[0, i, k] * x[k-1])
# Remove disconnected nodes
connected = []
for j in range(h + 1):
if np.any(transmatrix[j, :] > 0) or np.any(transmatrix[:, j] > 0):
connected.append(j)
if len(connected) == 0:
continue
# Extract submatrix for connected nodes
submatrix = transmatrix[np.ix_(connected, connected)]
# Solve DTMC
dtmcprob = _dtmc_solve(submatrix)
for idx, node in enumerate(connected):
steadystateprob[i, node] = dtmcprob[idx]
if node > 0:
rate = lambd[0, i, node]
if rate > 0:
avgtime[i, node] = (1 - np.exp(-rate * x[node-1])) / rate
else:
avgtime[i, node] = 0
else:
rate = lambd[0, i, node]
avgtime[i, node] = 1 / rate if rate > 0 else 1.0
rpdenominator[i] += steadystateprob[i, node] * avgtime[i, node]
for idx, node in enumerate(connected):
if rpdenominator[i] > 0:
randprob[i, node] = (steadystateprob[i, node] *
avgtime[i, node] / rpdenominator[i])
# Compute capacity difference
F = np.zeros(h)
for l in range(h):
capa[l] = np.sum(randprob[:, l + 1])
F[l] = m[l] - capa[l]
return F, randprob
# Initial guess with random seed
x0 = np.random.uniform(0, 10, h)
# Solve using fsolve
def objective(x):
F, _ = ttl_tree_time(x)
return F
t, info, ier, mesg = fsolve(objective, x0, full_output=True)
# Get final probabilities
_, prob = ttl_tree_time(t)
return prob
def _dtmc_solve(P: np.ndarray) -> np.ndarray:
"""
Solve for stationary distribution of a DTMC.
Args:
P: Transition probability matrix
Returns:
Stationary distribution vector
"""
n = P.shape[0]
if n == 0:
return np.array([])
if n == 1:
return np.array([1.0])
# Normalize rows to make it a proper transition matrix
row_sums = np.sum(P, axis=1, keepdims=True)
row_sums[row_sums == 0] = 1 # Avoid division by zero
P = P / row_sums
# Solve (P^T - I) * pi = 0 with sum(pi) = 1
# Use the method: replace last equation with sum = 1
A = P.T - np.eye(n)
A[-1, :] = 1.0
b = np.zeros(n)
b[-1] = 1.0
try:
pi = np.linalg.solve(A, b)
pi = np.maximum(pi, 0) # Ensure non-negative
pi = pi / np.sum(pi) # Normalize
except np.linalg.LinAlgError:
# Fallback: power method
pi = np.ones(n) / n
for _ in range(1000):
pi_new = P.T @ pi
pi_new = pi_new / np.sum(pi_new)
if np.max(np.abs(pi_new - pi)) < 1e-10:
break
pi = pi_new
pi = pi_new
return pi
__all__ = [
'cache_t_lrum',
'cache_t_hlru',
'cache_ttl_lrum',
'cache_ttl_hlru',
'cache_ttl_lrua',
]