Source code for line_solver.api.cache.rrm

"""
Random Replacement Model (RRM) Mean-Field Cache Analysis.

Native Python implementations of mean-field methods for analyzing cache
systems with random replacement policies.

Key functions:
    cache_rrm_meanfield_ode: ODE function for RRM dynamics
    cache_rrm_meanfield: Solve RRM mean-field steady state

References:
    Original MATLAB: matlab/src/api/cache/cache_rrm_meanfield*.m
"""

import numpy as np
from scipy.integrate import solve_ivp
from typing import Tuple, Optional


[docs] def cache_rrm_meanfield_ode(t: float, x: np.ndarray, lambd: np.ndarray, m: np.ndarray, n: int, h: int) -> np.ndarray: """ ODE function for RRM mean-field cache dynamics. Defines the differential equations for the Random Replacement Model mean-field cache dynamics. Args: t: Time variable (unused, for ODE solver compatibility) x: State vector of length n*(h+1), representing probabilities lambd: Arrival rates per item (n,) m: Cache capacity vector (h,) n: Number of items h: Number of cache levels Returns: Time derivative of state vector References: Original MATLAB: matlab/src/api/cache/cache_rrm_meanfield_ode.m """ lambd = np.asarray(lambd, dtype=np.float64).ravel() m = np.asarray(m, dtype=np.float64).ravel() # Reshape state vector to matrix form: x[k, s] x = x.reshape((n, 1 + h)) dxdt = np.zeros((n, 1 + h)) for k in range(n): for s in range(1, h + 1): # s = 1, ..., h (1-indexed in formulation) # First term: promotion from list s-1 sum1 = 0.0 for k1 in range(n): sum1 += lambd[k1] / m[s-1] * x[k1, s-1] * x[k, s] # Second term: demotion from list s+1 if s < h: sum2 = 0.0 for k1 in range(n): sum2 += lambd[k1] / m[s] * x[k1, s] * x[k, s+1] sum2 = sum2 - lambd[k] * x[k, s] else: sum2 = 0.0 # Drift component dxdt[k, s] = lambd[k] * x[k, s-1] - sum1 + sum2 # Case s=0: conservation of probability dxdt[k, 0] = -np.sum(dxdt[k, 1:h+1]) return dxdt.flatten()
[docs] def cache_rrm_meanfield(lambd: np.ndarray, m: np.ndarray, t_end: float = 10000.0, seed: int = 23000 ) -> Tuple[np.ndarray, float, float]: """ Solve RRM mean-field steady state using ODE integration. Computes the steady-state probability distribution for a cache with random replacement policy using mean-field ODE dynamics. Args: lambd: Arrival rates per item (n,) m: Cache capacity vector (h,) t_end: End time for ODE integration (default: 10000.0) seed: Random seed for initial conditions (default: 23000) Returns: Tuple of (prob, missrate, missratio) where: - prob: Steady-state probability matrix (n x h+1) - missrate: Global miss rate (lambda * miss_prob) - missratio: Miss ratio (missrate / sum(lambda)) References: Original MATLAB: matlab/src/api/cache/cache_rrm_meanfield.m """ np.random.seed(seed) lambd = np.asarray(lambd, dtype=np.float64).ravel() m = np.asarray(m, dtype=np.float64).ravel() n = len(lambd) h = len(m) # Initial condition: all items start in state 0 (miss) x0 = np.zeros((n, 1 + h)) x0[:, 0] = 1.0 # Solve ODE def ode_func(t, x): return cache_rrm_meanfield_ode(t, x, lambd, m, n, h) sol = solve_ivp(ode_func, [0, t_end], x0.flatten(), method='BDF', # Stiff solver like ode23s rtol=1e-6, atol=1e-9) # Extract final state x_final = sol.y[:, -1].reshape((n, 1 + h)) # Compute miss metrics missrate = np.dot(lambd, x_final[:, 0]) lambda_sum = np.sum(lambd) missratio = missrate / lambda_sum if lambda_sum > 0 else 0.0 return x_final, missrate, missratio
[docs] def cache_gamma_lp(lambd: np.ndarray, R: list) -> Tuple[np.ndarray, int, int, int]: """ Compute gamma parameters for cache models using linear programming approach. Computes item popularity probabilities at each cache level based on arrival rates and routing probabilities. Args: lambd: Arrival rates per user per item per list (u x n x h+1) R: Routing probability structure (list of lists, R[v][i] is matrix for user v, item i) Returns: Tuple of (gamma, u, n, h) where: - gamma: Item popularity probabilities at each level (n x h) - u: Number of users - n: Number of items - h: Number of cache levels References: Original MATLAB: matlab/src/api/cache/cache_gamma_lp.m """ lambd = np.asarray(lambd, dtype=np.float64) u = lambd.shape[0] # number of users n = lambd.shape[1] # number of items h = lambd.shape[2] - 1 # number of lists gamma = np.zeros((n, h)) def find_parent(Rvi, j): """Find parent of node j in routing matrix.""" if j == 0: return None parents = np.where(Rvi[:j, j] > 0)[0] if len(parents) == 0: return None if len(parents) > 1: raise ValueError("Cache has a list with more than one parent, but structure must be a tree.") return parents[0] for i in range(n): for j in range(h): # Compute gamma(i, j) # Sum routing matrices across users Rvi = np.zeros_like(R[0][i]) for v in range(u): Rvi = Rvi + R[v][i] # Build path from root to level j+1 (0-indexed level is j, but +1 for 1-indexed list) target = j + 1 # 1-indexed list Pij = [target] # Trace back to root pr_j = find_parent(Rvi, target) while pr_j is not None: Pij.insert(0, pr_j) pr_j = find_parent(Rvi, pr_j) if len(Pij) < 2: gamma[i, j] = 0.0 else: gamma[i, j] = 1.0 for li in range(1, len(Pij)): l_1 = Pij[li - 1] l = Pij[li] y = 0.0 for v in range(u): for t in range(l_1 + 1): # t from 0 to l_1 (1-indexed: 1 to l_1) y += lambd[v, i, t] * R[v][i][t, l] gamma[i, j] *= y return gamma, u, n, h
__all__ = [ 'cache_rrm_meanfield_ode', 'cache_rrm_meanfield', 'cache_gamma_lp', ]