Source code for line_solver.api.npfqn.traffic

"""
NPFQN Traffic Merging and Splitting Algorithms.

Implements traffic flow operations for non-product-form queueing networks
including merging, splitting, and class switching capabilities.

Key functions:
    npfqn_traffic_merge: Merge multiple MMAP traffic flows
    npfqn_traffic_merge_cs: Merge with class switching
    npfqn_traffic_split_cs: Split traffic with class switching

References:
    Casale, G., et al. "LINE: A unified library for queueing network modeling."
"""

import numpy as np
from typing import Dict, List, Optional, Tuple, Union
import warnings

from ..mam import (
    mmap_normalize,
    mmap_super_safe,
    mmap_mark,
    mmap_compress,
    map_isfeasible,
)


# Type alias for MMAP: List of numpy arrays [D0, D1, D2, ...]
MMAP = List[np.ndarray]


def _is_mmap_empty(mmap: Optional[MMAP]) -> bool:
    """Check if an MMAP is empty or None."""
    if mmap is None:
        return True
    if len(mmap) == 0:
        return True
    if mmap[0] is None or mmap[0].size == 0:
        return True
    return False


def _mmap_to_tuple(mmap: MMAP) -> Tuple[np.ndarray, List[np.ndarray]]:
    """Convert MMAP list representation to (D0, D_list) tuple."""
    D0 = mmap[0]
    D_list = mmap[1:] if len(mmap) > 1 else []
    return D0, D_list


def _tuple_to_mmap(D0: np.ndarray, D_list: List[np.ndarray]) -> MMAP:
    """Convert (D0, D_list) tuple to MMAP list representation."""
    return [D0] + D_list


[docs] def npfqn_traffic_merge( MMAPa: Dict[int, Optional[MMAP]], config_merge: str = "default", config_compress: Optional[str] = None ) -> Optional[MMAP]: """ Merge multiple MMAP traffic flows. Combines multiple MMAPs using specified aggregation strategies. Supports various merge configurations for different network topologies. Args: MMAPa: Dictionary of MMAP traffic flows to be merged. Keys are integer indices, values are MMAP lists [D0, D1, ...]. config_merge: Merge configuration. Options: - "default", "super": Use MMAP superposition - "mixture": Apply mixture fitting after superposition - "interpos": Interposition method (falls back to super) config_compress: Compression configuration. Options: - None, "none": No compression - "default": Apply default compression Returns: Merged and normalized MMAP, or None if input is empty. Raises: RuntimeError: If unsupported merge configuration is provided. Example: >>> mmap1 = [np.array([[-1.0]]), np.array([[1.0]])] >>> mmap2 = [np.array([[-2.0]]), np.array([[2.0]])] >>> result = npfqn_traffic_merge({0: mmap1, 1: mmap2}) """ # Filter out empty/None MMAPs valid_mmaps = [] for key in sorted(MMAPa.keys()): mmap = MMAPa[key] if not _is_mmap_empty(mmap): valid_mmaps.append(mmap) n = len(valid_mmaps) if n == 0: # Return empty/default MMAP D0 = np.array([[0.0]]) D_list = [np.array([[0.0]])] D0_norm, D_list_norm = mmap_normalize(D0, D_list) return _tuple_to_mmap(D0_norm, D_list_norm) if n == 1: D0, D_list = _mmap_to_tuple(valid_mmaps[0]) D0_norm, D_list_norm = mmap_normalize(D0, D_list) return _tuple_to_mmap(D0_norm, D_list_norm) # Validate merge configuration if config_merge not in ["default", "super", "mixture", "interpos"]: raise RuntimeError(f"Unsupported configuration for merge: {config_merge}") # Collect all (D0, D_list) tuples for superposition mmap_tuples = [] for mmap in valid_mmaps: D0, D_list = _mmap_to_tuple(mmap) mmap_tuples.append((D0, D_list)) # Perform merging based on configuration if config_merge in ["default", "super"]: # Use safe superposition D0_result, D_list_result = mmap_super_safe(mmap_tuples) elif config_merge == "mixture": # Superpose then apply mixture fitting D0_result, D_list_result = mmap_super_safe(mmap_tuples) # Check if result is feasible before mixture fitting try: if map_isfeasible(D0_result, sum(D_list_result)): # Use compression with mixture method as approximation # (mmap_mixture_fit_mmap is not available in native) D0_result, D_list_result = mmap_compress(D0_result, D_list_result, method="mixture") except Exception: pass # Keep superposition result if fitting fails elif config_merge == "interpos": # Interposition method not yet ported - fall back to super warnings.warn( "npfqn_traffic_merge: 'interpos' merge mode requires m3pp2m functions " "that are not yet ported. Using 'super' merge instead.", RuntimeWarning ) D0_result, D_list_result = mmap_super_safe(mmap_tuples) # Apply compression if requested if config_compress == "default": D0_result, D_list_result = mmap_compress(D0_result, D_list_result) # Normalize result D0_norm, D_list_norm = mmap_normalize(D0_result, D_list_result) return _tuple_to_mmap(D0_norm, D_list_norm)
[docs] def npfqn_traffic_split_cs( MMAP_input: MMAP, P: np.ndarray ) -> Dict[int, Optional[MMAP]]: """ Split MMAP traffic flows with class switching. Decomposes a single MMAP into multiple MMAPs based on routing probabilities and class switching matrix. Args: MMAP_input: Input MMAP as list [D0, D1, D2, ...]. P: Class switching probability matrix (R x J) where: - R is the number of arrival classes - J = M * R where M is the number of destinations - P[r, (jst-1)*R + s] = probability that class r arrival becomes class s at destination jst Returns: Dictionary mapping destination index to split MMAP. Keys are 0-indexed destination indices. Algorithm: 1. Parse dimensions from P matrix 2. For each destination, create MMAP with weighted marking matrices 3. Normalize each result Example: >>> mmap = [np.array([[-3.0]]), np.array([[1.0]]), np.array([[2.0]])] >>> P = np.array([[0.8, 0.2, 0.5, 0.5], [0.3, 0.7, 0.6, 0.4]]) >>> result = npfqn_traffic_split_cs(mmap, P) """ P = np.asarray(P, dtype=np.float64) R = P.shape[0] # Number of classes J = P.shape[1] # Total columns = M * R M = int(np.round(J / R)) # Number of destinations D0 = np.asarray(MMAP_input[0], dtype=np.float64) n = D0.shape[0] # State space size # Arrival matrices for each class (indices 1 to R in MMAP) D_arrivals = [] for r in range(R): if r + 1 < len(MMAP_input): D_arrivals.append(np.asarray(MMAP_input[r + 1], dtype=np.float64)) else: D_arrivals.append(np.zeros((n, n))) # Also need D1 (total arrivals) from original MMAP if len(MMAP_input) > 1: D1_total = np.asarray(MMAP_input[1], dtype=np.float64).copy() else: D1_total = np.zeros((n, n)) SMMAP: Dict[int, Optional[MMAP]] = {} for jst in range(M): # Initialize split MMAP for this destination # D0_split = D0 + D1 (hidden + arrivals become hidden) D0_split = D0.copy() + D1_total.copy() D1_split = np.zeros((n, n)) # Marking matrices for each class at this destination D_list_split = [] for s in range(R): D_s = np.zeros((n, n)) for r in range(R): # Weight by class switching probability # P[r, jst * R + s] = prob class r becomes class s at dest jst if jst * R + s < J: weight = P[r, jst * R + s] else: weight = 0.0 D_s = D_s + weight * D_arrivals[r] D_list_split.append(D_s) D1_split = D1_split + D_s D0_split = D0_split + D_s # Normalize try: D0_norm, D_list_norm = mmap_normalize(D0_split, D_list_split) SMMAP[jst] = _tuple_to_mmap(D0_norm, D_list_norm) except Exception: SMMAP[jst] = None return SMMAP
[docs] def npfqn_traffic_merge_cs( MMAPs: Dict[int, MMAP], prob: np.ndarray, config: str = "default" ) -> Optional[MMAP]: """ Merge MMAP traffic flows with class switching. Combines multiple MMAPs while applying class switching transformations based on the probability matrix. Args: MMAPs: Dictionary of MMAP traffic flows indexed by source. prob: Class switching probability matrix ((n*R) x R) where: - n is the number of sources - R is the number of classes - prob[(i-1)*R + r, s] = probability that class r from source i becomes class s in the merged stream config: Merge configuration ("default" or "super"). Returns: Merged MMAP with class switching applied, or None if empty. Algorithm: 1. Apply mmap_mark to each source MMAP to encode class switching 2. Superpose all marked MMAPs 3. Return result Example: >>> mmap1 = [np.array([[-1.0]]), np.array([[0.5]]), np.array([[0.5]])] >>> mmap2 = [np.array([[-2.0]]), np.array([[1.0]]), np.array([[1.0]])] >>> prob = np.array([[0.8, 0.2], [0.3, 0.7], [0.6, 0.4], [0.5, 0.5]]) >>> result = npfqn_traffic_merge_cs({0: mmap1, 1: mmap2}, prob) """ prob = np.asarray(prob, dtype=np.float64) n = len(MMAPs) R = prob.shape[1] # Number of output classes if n == 0: return None # Create copies and apply class switching via mmap_mark MMAP_marked: Dict[int, Tuple[np.ndarray, List[np.ndarray]]] = {} for i, key in enumerate(sorted(MMAPs.keys())): mmap = MMAPs[key] D0, D_list = _mmap_to_tuple(mmap) # Build class switching matrix P for this source P_local = np.zeros((R, R)) for r in range(R): for s in range(R): idx = i * R + r if idx < prob.shape[0]: P_local[r, s] = prob[idx, s] # Apply marking with class switching D0_marked, D_list_marked = mmap_mark(D0, D_list, P_local) MMAP_marked[i] = (D0_marked, D_list_marked) # Handle single source case if n == 1: D0, D_list = list(MMAP_marked.values())[0] return _tuple_to_mmap(D0, D_list) # Merge using superposition if config in ["default", "super"]: mmap_tuples = list(MMAP_marked.values()) D0_result, D_list_result = mmap_super_safe(mmap_tuples) return _tuple_to_mmap(D0_result, D_list_result) return None
__all__ = [ 'npfqn_traffic_merge', 'npfqn_traffic_split_cs', 'npfqn_traffic_merge_cs', ]