Source code for line_solver.api.cache.spm

"""
Singular Perturbation Method (SPM) for Cache Analysis.

Native Python implementations of SPM techniques for cache system analysis
using partial differential equation methods. Provides numerical solutions
for cache performance in high-dimensional parameter spaces.

References:
    Gast, N. and Van Houdt, B., "Transient and Steady-state Regime of a
    Family of List-based Cache Replacement Algorithms." SIGMETRICS 2015.
"""

import numpy as np
from scipy import special
from typing import Tuple


[docs] def cache_xi_iter(gamma: np.ndarray, m: np.ndarray, tol: float = 1e-12, max_iter: int = 1000) -> np.ndarray: """ Compute cache xi terms using iterative method from Gast-van Houdt. This method calculates the xi values which are important for understanding the distribution of items in the cache. The algorithm assumes that the access factors are monotone with the list index. Args: gamma: Cache access factors matrix (n x h). m: Cache capacity vector (h,). tol: Convergence tolerance (default: 1e-12). max_iter: Maximum iterations (default: 1000). Returns: Vector of xi values (h,). """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() n = gamma.shape[0] # Number of items h = len(m) # Number of cache levels # Normalize capacity f = m / n # Build pp matrix (h+1 x n) # pp[0, :] = ones, pp[j+1, :] = gamma[:, j] pp = np.zeros((h + 1, n)) pp[0, :] = 1.0 for i in range(h): pp[i + 1, :] = gamma[:, i] z_old = np.zeros(h + 1) z = np.ones(h + 1) iteration = 0 while np.max(np.abs(z - z_old)) > tol * np.max(np.abs(z_old)) and iteration < max_iter: z_old = z.copy() temp = n * (z @ pp) # Element-wise: n * sum_j z[j] * pp[j, :] for i in range(h): # a = temp - n * z[i+1] * pp[i+1, :] a = temp - n * z[i + 1] * pp[i + 1, :] # Fi = sum_k (pp[i+1, k] / (n * pp[i+1, k] + a[k])) denom = n * pp[i + 1, :] + a denom = np.where(np.abs(denom) < 1e-14, 1e-14, denom) Fi = np.sum(pp[i + 1, :] / denom) # Bisection to find z[i+1] if Fi > f[i]: zi_min = 0.0 zi_max = 1.0 else: zi_min = 1.0 zi_max = 2.0 # Expand search range while True: denom_test = n * zi_max * pp[i + 1, :] + a denom_test = np.where(np.abs(denom_test) < 1e-14, 1e-14, denom_test) Fi_test = np.sum(zi_max * pp[i + 1, :] / denom_test) if Fi_test >= f[i]: break zi_min = zi_max zi_max *= 2 # Bisection search for _ in range(50): zi = (zi_min + zi_max) / 2 z[i + 1] = zi denom_zi = n * zi * pp[i + 1, :] + a denom_zi = np.where(np.abs(denom_zi) < 1e-14, 1e-14, denom_zi) Fi_zi = np.sum(zi * pp[i + 1, :] / denom_zi) if Fi_zi < f[i]: zi_min = zi else: zi_max = zi iteration += 1 # Return xi values (without the first element) return z[1:]
[docs] def cache_spm(gamma: np.ndarray, m: np.ndarray ) -> Tuple[float, float, np.ndarray]: """ Approximate the normalizing constant using SPM method. Computes the normalizing constant of the cache steady state distribution using the singular perturbation method (also known as ray integration). Args: gamma: Cache access factors matrix (n x h). m: Cache capacity vector (h,). Returns: Tuple of (Z, lZ, xi) where: - Z: Approximated normalizing constant - lZ: Log of normalizing constant - xi: Xi terms vector """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() # Filter out items with zero access row_sums = np.sum(gamma, axis=1) keep_rows = row_sums > 0 gamma = gamma[keep_rows, :] h = len(m) # Number of cache levels n = gamma.shape[0] # Number of items mt = np.sum(m) # Total capacity if n == mt: print("Warning: Number of items equals the cache capacity.") # Compute xi terms xi = cache_xi_iter(gamma, m) # Compute S_k = sum_l gamma[k, l] * xi[l] S = gamma @ xi # Shape: (n,) # Compute phi = sum_k log(1 + S_k) - sum_l m_l * log(xi_l) phi = np.sum(np.log(1 + S)) - np.sum(m * np.log(xi)) # Compute matrix C for determinant delta = np.eye(h) C = np.zeros((h, h)) for j in range(h): for l in range(h): # C1 = sum_k gamma[k, j] / (1 + S_k) C1 = np.sum(gamma[:, j] / (1 + S)) # C2 = sum_k gamma[k, j] * gamma[k, l] / (1 + S_k)^2 C2 = np.sum(gamma[:, j] * gamma[:, l] / (1 + S)**2) C[j, l] = delta[j, l] * C1 - xi[j] * C2 # Compute Z using the formula det_C = np.linalg.det(C) if det_C <= 0: det_C = 1e-100 # Avoid log of non-positive # m! (factorial of each element) m_fact = special.factorial(m) m_fact_prod = np.prod(m_fact) # Log of Z lZ = (-h * np.log(np.sqrt(2 * np.pi)) + phi + np.sum(special.gammaln(m + 1)) - # log of m! np.sum(0.5 * np.log(xi)) - 0.5 * np.log(det_C)) # Z Z = (np.exp(phi) * (2 * np.pi)**(-h/2) * m_fact_prod / np.prod(np.sqrt(xi)) / np.sqrt(det_C)) return Z, lZ, xi
[docs] def cache_prob_spm(gamma: np.ndarray, m: np.ndarray) -> np.ndarray: """ Compute cache miss probabilities using singular perturbation method. Uses the SPM (ray integration) method to compute miss probabilities for cache systems. Args: gamma: Cache access factors matrix (n x h). m: Cache capacity vector (h,). Returns: Vector of miss probabilities for each item. """ gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() n = gamma.shape[0] h = gamma.shape[1] # Get xi values _, _, xi = cache_spm(gamma, m) # Compute S_k S = gamma @ xi # Miss probability for item k: 1 / (1 + S_k) miss_prob = 1.0 / (1 + S) return miss_prob
[docs] def cache_prob_fpi(gamma: np.ndarray, m: np.ndarray, tol: float = 1e-8, max_iter: int = 1000) -> np.ndarray: """ Compute cache hit probabilities using fixed point iteration (FPI method). Matches MATLAB cache_prob_fpi: Uses fixed point iteration to compute cache hit probability distribution. Args: gamma: Cache access factors matrix (n x h). m: Cache capacity vector (h,). tol: Convergence tolerance (unused, kept for API compatibility). max_iter: Maximum iterations (unused, kept for API compatibility). Returns: Matrix (n x h+1) where: - Column 0: miss probabilities - Columns 1:h: hit probabilities at each level """ from .miss import cache_xi_fp gamma = np.asarray(gamma, dtype=np.float64) m = np.asarray(m, dtype=np.float64).ravel() n = gamma.shape[0] h = gamma.shape[1] # Get xi using fixed point method (matches MATLAB cache_xi_fp) xi, _, _, _ = cache_xi_fp(gamma, m) # Build probability matrix (n x h+1) # Column 0: miss probability, columns 1:h: hit at each level prob = np.zeros((n, h + 1)) for i in range(n): # S_i = gamma(i,:) * xi S_i = np.dot(gamma[i, :], xi) # Miss probability: 1 / (1 + S_i) prob[i, 0] = 1.0 / (1.0 + S_i) # Hit probabilities at each level: gamma(i,j)*xi(j) / (1 + S_i) for j in range(h): prob[i, j + 1] = gamma[i, j] * xi[j] / (1.0 + S_i) return prob
__all__ = [ 'cache_xi_iter', 'cache_spm', 'cache_prob_spm', 'cache_prob_fpi', ]