Source code for line_solver.api.lsn.max_multiplicity

"""
Layered Stochastic Network Maximum Multiplicity Computation.

Computes maximum multiplicity constraints for load sharing network (LSN)
analysis. Essential for determining feasible population bounds and
capacity constraints in layered queueing network models.

References:
    Casale, G., et al. "LINE: A unified library for queueing network modeling."
"""

import numpy as np
from dataclasses import dataclass
from typing import Optional, List
from enum import IntEnum


[docs] class LayeredNetworkElement(IntEnum): """Types of elements in a layered network.""" TASK = 1 ENTRY = 2 ACTIVITY = 3 PROCESSOR = 4 HOST = 5
[docs] @dataclass class LayeredNetworkStruct: """ Structure representing a layered network for LSN analysis. Attributes: dag: Directed acyclic graph adjacency matrix (n x n). dag[i,j] > 0 indicates an edge from node i to node j. mult: Multiplicity (max concurrent instances) for each node (n,). type: Node type for each node (n,), using LayeredNetworkElement values. isref: Reference task flags (n,). Non-zero indicates a reference task. """ dag: np.ndarray mult: np.ndarray type: np.ndarray isref: np.ndarray
[docs] def kahn_topological_sort(adjacency: np.ndarray) -> List[int]: """ Perform Kahn's algorithm for topological sorting. Args: adjacency: Adjacency matrix where adjacency[i,j] > 0 means edge i -> j. Returns: List of node indices in topological order. Raises: ValueError: If the graph contains a cycle. """ n = adjacency.shape[0] # Compute in-degree for each node in_degree = np.zeros(n, dtype=int) for j in range(n): for i in range(n): if adjacency[i, j] > 0: in_degree[j] += 1 # Initialize queue with nodes having in-degree 0 queue = [i for i in range(n) if in_degree[i] == 0] result = [] while queue: # Remove a node with in-degree 0 node = queue.pop(0) result.append(node) # Decrease in-degree of neighbors for j in range(n): if adjacency[node, j] > 0: in_degree[j] -= 1 if in_degree[j] == 0: queue.append(j) if len(result) != n: raise ValueError("Graph contains a cycle - cannot perform topological sort") return result
[docs] def lsn_max_multiplicity(lsn: LayeredNetworkStruct) -> np.ndarray: """ Compute the maximum multiplicity for each task in a layered network. This function uses flow analysis based on Kahn's topological sorting algorithm to determine the maximum sustainable throughput for each task, considering both the incoming flow and the multiplicity constraints. Args: lsn: The layered network structure containing task dependencies and constraints. Returns: Matrix of maximum multiplicities for each task in the network (n x 1). Algorithm: 1. Build binary adjacency graph from DAG 2. Apply Kahn's topological sort to determine processing order 3. Initialize inflow from reference tasks 4. For each node in topological order: - outflow = min(inflow, multiplicity constraint) - Propagate outflow to downstream nodes 5. Handle unreachable tasks (infinite multiplicity) Example: >>> lsn = LayeredNetworkStruct( ... dag=np.array([[0, 1, 0], [0, 0, 1], [0, 0, 0]]), ... mult=np.array([2, 3, 5]), ... type=np.array([1, 1, 1]), ... isref=np.array([1, 0, 0]) ... ) >>> max_mult = lsn_max_multiplicity(lsn) """ dag = np.asarray(lsn.dag, dtype=np.float64) n = dag.shape[0] # Build binary adjacency graph ag = np.zeros((n, n), dtype=np.float64) for i in range(n): for j in range(n): if dag[i, j] > 0: ag[i, j] = 1.0 mult = np.asarray(lsn.mult, dtype=np.float64).flatten() node_type = np.asarray(lsn.type, dtype=np.float64).flatten() isref = np.asarray(lsn.isref, dtype=np.float64).flatten() # Get topological order order = kahn_topological_sort(ag) # Initialize inflow from reference tasks inflow = np.zeros(n) for ist in range(n): if node_type[ist] == LayeredNetworkElement.TASK and isref[ist] != 0: inflow[ist] = mult[ist] if ist < len(mult) else np.inf # Initialize outflow outflow = np.zeros(n) # Extend mult array if needed if len(mult) < n: mult_extended = np.full(n, np.inf) mult_extended[:len(mult)] = mult mult = mult_extended # Propagate through topological order for k in range(n): ist = order[k] inflow_val = inflow[ist] mult_val = mult[ist] outflow[ist] = min(inflow_val, mult_val) # Propagate to downstream nodes for jst in range(n): if jst != ist and ag[ist, jst] != 0: inflow[jst] = inflow[jst] + outflow[ist] # Handle non-reference tasks with infinite multiplicity for ist in range(n): if (node_type[ist] == LayeredNetworkElement.TASK and mult[ist] == np.inf and isref[ist] == 0): outflow[ist] = np.inf # Return as column vector return outflow.reshape(-1, 1)
__all__ = [ 'LayeredNetworkElement', 'LayeredNetworkStruct', 'kahn_topological_sort', 'lsn_max_multiplicity', ]