Source code for line_solver.api.cache.ttl

"""
TTL-Based Cache Analysis Methods.

Native Python implementations of Time-To-Live (TTL) based cache analysis
methods, including LRU(m) and hierarchical LRU caches.

Key functions:
    cache_t_lrum: Characteristic times for LRU(m) cache levels
    cache_t_hlru: Characteristic times for hierarchical LRU cache
    cache_ttl_lrum: Steady-state probabilities for TTL-LRU(m) cache
    cache_ttl_hlru: Steady-state probabilities for TTL hierarchical LRU
    cache_ttl_lrua: Steady-state probabilities with arrival-based routing

References:
    Original MATLAB: matlab/src/api/cache/cache_ttl*.m, cache_t*.m
"""

import numpy as np
from scipy.optimize import fsolve, least_squares
from typing import Tuple, Optional


[docs] def cache_t_lrum(gamma: np.ndarray, m: np.ndarray) -> np.ndarray: """ Compute characteristic times for LRU(m) cache levels. Uses fixed-point iteration (fsolve) to compute the characteristic time for each level of an LRU(m) cache. Args: gamma: Item popularity probabilities or arrival rates (n x h) m: Cache capacity vector (h,) Returns: Characteristic time for each cache level (h,) References: Original MATLAB: matlab/src/api/cache/cache_t_lrum.m """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() n, h = gamma.shape def time_equations(x): """System of equations for characteristic times.""" F = np.zeros(h) # Compute transition probabilities and capacities trans = np.zeros((n, h)) logtrans = np.zeros((n, h)) denom = np.zeros(n) stablecapa = np.zeros(h) for k in range(n): for j in range(h): val = np.exp(gamma[k, j] * x[j]) - 1 trans[k, j] = max(val, 1e-300) logtrans[k, j] = np.log(trans[k, j]) denom[k] = np.sum(np.exp(np.cumsum(logtrans[k, :]))) for l in range(h): for k in range(n): log_prod = np.sum(np.log(trans[k, :l+1])) stablecapa[l] += np.exp(log_prod - np.log(1 + denom[k])) F[l] = m[l] - stablecapa[l] return F # Initial guess x0 = np.ones(h) # Solve t, info, ier, mesg = fsolve(time_equations, x0, full_output=True) if ier != 1: # Try different initial conditions for scale in [0.1, 0.5, 2.0, 5.0]: t, info, ier, mesg = fsolve(time_equations, x0 * scale, full_output=True) if ier == 1: break return t
[docs] def cache_t_hlru(gamma: np.ndarray, m: np.ndarray) -> np.ndarray: """ Compute characteristic times for hierarchical LRU cache levels. Uses fixed-point iteration (fsolve) to compute the characteristic time for each level of a hierarchical LRU cache. Args: gamma: Item popularity probabilities or arrival rates (n x h) m: Cache capacity vector (h,) Returns: Characteristic time for each cache level (h,) References: Original MATLAB: matlab/src/api/cache/cache_t_hlru.m """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() n, h = gamma.shape def time_equations(x): """System of equations for characteristic times.""" F = np.zeros(h) for a in range(h): temp1 = np.ones(n) temp2 = np.zeros(n) probh = np.zeros(n) for k in range(n): # Compute temp1: product of (1 - exp(-gamma * t)) for levels 0..a for s in range(a + 1): temp1[k] *= (1 - np.exp(-gamma[k, s] * x[s])) # Compute temp2 middtemp2 = 0.0 for l in range(a): middtemp = 1.0 for s in range(l + 1): middtemp *= (1 - np.exp(-gamma[k, s] * x[s])) middtemp2 += middtemp temp2[k] = np.exp(-gamma[k, a] * x[a]) * (1 + middtemp2) # Hit probability at level a denom = temp1[k] + temp2[k] probh[k] = temp1[k] / denom if denom > 0 else 0.0 F[a] = m[a] - np.sum(probh) return F # Initial guess x0 = np.ones(h) # Solve t, info, ier, mesg = fsolve(time_equations, x0, full_output=True) if ier != 1: for scale in [0.1, 0.5, 2.0, 5.0]: t, info, ier, mesg = fsolve(time_equations, x0 * scale, full_output=True) if ier == 1: break return t
[docs] def cache_ttl_lrum(gamma: np.ndarray, m: np.ndarray) -> np.ndarray: """ Compute steady-state probabilities for TTL-based LRU(m) cache. Args: gamma: Arrival rates per item per list. Can be (n x h) or (1 x n x h+1). If 3D, uses gamma[0, :, 1:] as the (n x h) input. m: Cache capacity vector (h,) Returns: Steady-state probability distribution (n x h+1): prob[:, 0] = miss probability prob[:, 1:] = hit probability at each level References: Original MATLAB: matlab/src/api/cache/cache_ttl_lrum.m """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() # Handle 3D input (MATLAB convention) if gamma.ndim == 3: gamma = gamma[0, :, 1:] n, h = gamma.shape # Compute characteristic times t = cache_t_lrum(gamma, m) # Compute steady-state probabilities probh = np.zeros((n, h)) prob0 = np.zeros(n) trans = np.zeros((n, h)) denom = np.zeros(n) for k in range(n): for j in range(h): trans[k, j] = np.exp(gamma[k, j] * t[j]) - 1 denom[k] = np.sum(np.cumprod(trans[k, :])) for k in range(n): for l in range(h): probh[k, l] = np.prod(trans[k, :l+1]) / (1 + denom[k]) prob0[k] = 1 - np.sum(probh[k, :]) prob = np.column_stack([prob0, probh]) return prob
[docs] def cache_ttl_hlru(gamma: np.ndarray, m: np.ndarray) -> np.ndarray: """ Compute steady-state probabilities for TTL-based hierarchical LRU cache. Args: gamma: Arrival rates per item per list. Can be (n x h) or (1 x n x h+1). If 3D, uses gamma[0, :, 1:] as the (n x h) input. m: Cache capacity vector (h,) Returns: Steady-state probability distribution (n x 2): prob[:, 0] = miss probability prob[:, 1] = hit probability at level h References: Original MATLAB: matlab/src/api/cache/cache_ttl_hlru.m """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() # Handle 3D input if gamma.ndim == 3: gamma = gamma[0, :, 1:] n, h = gamma.shape # Compute characteristic times t = cache_t_hlru(gamma, m) # Compute steady-state probabilities probh = np.zeros(n) prob0 = np.zeros(n) temp1 = np.ones(n) temp2 = np.zeros(n) for k in range(n): # Product of (1 - exp(-gamma * t)) for all levels for s in range(h): temp1[k] *= (1 - np.exp(-gamma[k, s] * t[s])) # Compute temp2 middtemp2 = 0.0 for l in range(h - 1): middtemp = 1.0 for s in range(l + 1): middtemp *= (1 - np.exp(-gamma[k, s] * t[s])) middtemp2 += middtemp temp2[k] = np.exp(-gamma[k, h-1] * t[h-1]) * (1 + middtemp2) denom = temp1[k] + temp2[k] probh[k] = temp1[k] / denom if denom > 0 else 0.0 prob0[k] = 1 / (denom * np.exp(gamma[k, h-1] * t[h-1])) if denom > 0 else 1.0 prob = np.column_stack([prob0, probh]) return prob
[docs] def cache_ttl_lrua(lambd: np.ndarray, R: list, m: np.ndarray, seed: int = 23000, ttl: Optional[np.ndarray] = None) -> np.ndarray: """ Compute steady-state probabilities for TTL-LRU cache with arrival routing. Uses fixed-point iteration with DTMC solving for cache systems with multiple users, items, and levels with routing. Args: lambd: Arrival rates per user per item per list (u x n x h+1) R: Routing probability structure. Can be either: - 1D list: R[i] is the (h+1 x h+1) routing matrix for item i - 2D list: R[v][i] is the routing matrix for user v, item i m: Cache capacity vector (h,) seed: Random seed for initialization (default: 23000) ttl: Optional real TTL per cache level (h,). Caps the characteristic time at each level: items expire after ttl[l] time units even if the cache is not full. Units are in model time (request epochs when arrival rates sum to 1). Use np.inf for levels with no TTL. When TTL is binding, effective occupancy may be less than m[l]. Returns: Steady-state probability distribution (n x h+1) References: Original MATLAB: matlab/src/api/cache/cache_ttl_lrua.m """ np.random.seed(seed) lambd = np.asarray(lambd, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() if ttl is not None: ttl = np.asarray(ttl, dtype=np.float64).ravel() u = lambd.shape[0] # number of users n = lambd.shape[1] # number of items h = lambd.shape[2] - 1 # number of lists # Determine if R is 1D (R[i]) or 2D (R[v][i]) structure # Check if R[0] is a numpy array (1D case) or a list (2D case) is_2d_structure = isinstance(R[0], list) def ttl_tree_time(x): """Compute capacity difference for given characteristic times.""" # Cap characteristic times at real TTL if provided x_eff = x.copy() if ttl is not None: x_eff = np.minimum(x_eff, ttl) steadystateprob = np.zeros((n, h + 1)) randprob = np.zeros((n, h + 1)) avgtime = np.zeros((n, h + 1)) capa = np.zeros(h) rpdenominator = np.zeros(n) for i in range(n): # Build transition matrix # MATLAB uses R{1,i} which accesses user 1 (first user), item i # In Python: R[0][i] for 2D structure, R[i] for 1D structure Ri = R[0][i] if is_2d_structure else R[i] transmatrix = np.zeros((h + 1, h + 1)) for j in range(h + 1): leafnodes = np.where(Ri[j, :] > 0)[0] for k in leafnodes: if j == 0: transmatrix[j, k] = Ri[j, k] else: transmatrix[j, k] = (1 - np.exp(-lambd[0, i, j] * x_eff[j-1])) * Ri[j, k] if j != k and k > 0: transmatrix[k, j] = np.exp(-lambd[0, i, k] * x_eff[k-1]) # Remove disconnected nodes connected = [] for j in range(h + 1): if np.any(transmatrix[j, :] > 0) or np.any(transmatrix[:, j] > 0): connected.append(j) if len(connected) == 0: continue # Extract submatrix for connected nodes submatrix = transmatrix[np.ix_(connected, connected)] # Solve DTMC dtmcprob = _dtmc_solve(submatrix) for idx, node in enumerate(connected): steadystateprob[i, node] = dtmcprob[idx] if node > 0: rate = lambd[0, i, node] if rate > 0: avgtime[i, node] = (1 - np.exp(-rate * x_eff[node-1])) / rate else: avgtime[i, node] = 0 else: rate = lambd[0, i, node] avgtime[i, node] = 1 / rate if rate > 0 else 1.0 rpdenominator[i] += steadystateprob[i, node] * avgtime[i, node] for idx, node in enumerate(connected): if rpdenominator[i] > 0: randprob[i, node] = (steadystateprob[i, node] * avgtime[i, node] / rpdenominator[i]) # Compute capacity difference F = np.zeros(h) for l in range(h): capa[l] = np.sum(randprob[:, l + 1]) F[l] = m[l] - capa[l] return F, randprob # Initial guess with random seed x0 = np.random.uniform(0, 10, h) # Solve using fsolve (standard) or least_squares (when TTL caps apply) def objective(x): F, _ = ttl_tree_time(x) return F if ttl is not None: # With TTL caps, F=0 may not be achievable — use least_squares # with lower bound 0 (characteristic times are positive) result = least_squares(objective, x0, bounds=(0, np.inf)) t = result.x else: t, info, ier, mesg = fsolve(objective, x0, full_output=True) if ier != 1: for scale in [0.1, 0.5, 2.0, 5.0]: t, info, ier, mesg = fsolve(objective, x0 * scale, full_output=True) if ier == 1: break # Get final probabilities _, prob = ttl_tree_time(t) return prob
def _dtmc_solve(P: np.ndarray) -> np.ndarray: """ Solve for stationary distribution of a DTMC. Args: P: Transition probability matrix Returns: Stationary distribution vector """ n = P.shape[0] if n == 0: return np.array([]) if n == 1: return np.array([1.0]) # Normalize rows to make it a proper transition matrix row_sums = np.sum(P, axis=1, keepdims=True) row_sums[row_sums == 0] = 1 # Avoid division by zero P = P / row_sums # Solve (P^T - I) * pi = 0 with sum(pi) = 1 # Use the method: replace last equation with sum = 1 A = P.T - np.eye(n) A[-1, :] = 1.0 b = np.zeros(n) b[-1] = 1.0 try: pi = np.linalg.solve(A, b) pi = np.maximum(pi, 0) # Ensure non-negative pi = pi / np.sum(pi) # Normalize except np.linalg.LinAlgError: # Fallback: power method pi = np.ones(n) / n for _ in range(1000): pi_new = P.T @ pi pi_new = pi_new / np.sum(pi_new) if np.max(np.abs(pi_new - pi)) < 1e-10: break pi = pi_new pi = pi_new return pi __all__ = [ 'cache_t_lrum', 'cache_t_hlru', 'cache_ttl_lrum', 'cache_ttl_hlru', 'cache_ttl_lrua', ]