"""
Reinforcement Learning (RL) agents for queueing control.
Native Python implementations of RL agents for
queueing network control and optimization.
Key classes:
RLTDAgent: Temporal Difference learning agent
RLEnvironment: Queueing network environment
Port from:
matlab/src/api/rl/rl_td_agent.m
matlab/src/api/rl/rl_env.m
"""
import numpy as np
from typing import Optional, List, Tuple, Dict, Any
[docs]
class RLEnvironment:
"""
Reinforcement Learning environment for queueing networks.
This class wraps a queueing network model and provides an interface
for RL agents to interact with it through sampling and state updates.
Attributes:
model: The queueing network model
gamma: Discount factor for future rewards
queue_indices: Indices of queue nodes in model.nodes
source_indices: Indices of source nodes in model.nodes
state_size: Maximum state size to consider
action_size: Number of possible actions (number of queues)
"""
[docs]
def __init__(self,
model: Any,
queue_indices: List[int],
source_indices: List[int],
state_size: int,
gamma: float = 0.99):
"""
Initialize the RL environment.
Args:
model: Queueing network model
queue_indices: Indices of queue nodes in model.nodes
source_indices: Indices of source nodes in model.nodes
state_size: Maximum number of jobs per queue to consider
gamma: Discount factor (0 < gamma <= 1)
"""
self.model = model
self.queue_indices = queue_indices
self.source_indices = source_indices
self.state_size = state_size
self.gamma = gamma
self.action_size = len(queue_indices)
[docs]
def is_in_state_space(self, state: np.ndarray) -> bool:
"""
Check if current state is within the defined state space.
Args:
state: Array of queue lengths
Returns:
True if all queue lengths are within state_size bounds
"""
return np.all(state <= self.state_size)
[docs]
def is_in_action_space(self, state: np.ndarray) -> bool:
"""
Check if actions are valid from current state.
Args:
state: Array of queue lengths
Returns:
True if all queues can accept new jobs (not at capacity)
"""
return np.all(state < self.state_size)
[docs]
def sample(self) -> Tuple[float, int]:
"""
Sample the next event from the environment.
Uses the SSA solver to sample a single system event.
Returns:
Tuple of (time_delta, departure_node_index)
"""
try:
from ...solvers import SolverSSA
solver = SolverSSA(self.model)
sample = solver.sample_sys_aggr(1)
t = sample['t']
# Determine which node had the departure
dep_node = -1
for event in sample.get('events', []):
if event.get('event') == 'DEP':
dep_node = event.get('node', -1)
break
return t, dep_node
except Exception:
# Fallback: return random exponential time and random node
t = np.random.exponential(1.0)
dep_node = np.random.choice(self.queue_indices + self.source_indices)
return t, dep_node
[docs]
def update(self, new_state: np.ndarray) -> None:
"""
Update the model state after an event.
Args:
new_state: New queue lengths for each queue
"""
# Update queue states in the model
if hasattr(self.model, 'nodes'):
for i, queue_idx in enumerate(self.queue_indices):
if hasattr(self.model.nodes[queue_idx], 'state'):
self.model.nodes[queue_idx].state = int(new_state[i])
[docs]
def reset(self) -> np.ndarray:
"""
Reset the environment to initial state.
Returns:
Initial state (zeros)
"""
if hasattr(self.model, 'reset'):
self.model.reset()
if hasattr(self.model, 'init_default'):
self.model.init_default()
return np.zeros(self.action_size, dtype=np.int32)
def get_state(self) -> np.ndarray:
"""
Get current state from the model.
Returns:
Array of queue lengths
"""
state = np.zeros(self.action_size, dtype=np.int32)
if hasattr(self.model, 'nodes'):
for i, queue_idx in enumerate(self.queue_indices):
if hasattr(self.model.nodes[queue_idx], 'state'):
state[i] = int(np.sum(self.model.nodes[queue_idx].state))
return state
[docs]
class RLTDAgent:
"""
Temporal Difference (TD) learning agent for queueing control.
Implements average-reward TD learning for optimal routing decisions
in queueing networks. The agent learns a value function V(s) that
estimates the long-run average cost from each state.
Attributes:
learning_rate: Step size for value function updates
epsilon: Exploration rate for epsilon-greedy policy
epsilon_decay: Decay factor for exploration rate
V: Value function array
Q: Q-function array (state-action values)
"""
[docs]
def __init__(self,
learning_rate: float = 0.05,
epsilon: float = 1.0,
epsilon_decay: float = 0.99):
"""
Initialize the TD agent.
Args:
learning_rate: Learning rate (step size) for updates
epsilon: Initial exploration rate (0 to 1)
epsilon_decay: Decay factor applied to epsilon each episode
"""
self.learning_rate = learning_rate
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
# Value function and Q-function (initialized on first use)
self.V: Optional[np.ndarray] = None
self.Q: Optional[np.ndarray] = None
self.V_shape: Optional[Tuple] = None
self.Q_shape: Optional[Tuple] = None
[docs]
def reset(self, env: RLEnvironment) -> None:
"""
Reset agent and environment.
Args:
env: The RL environment
"""
self.V = None
self.Q = None
self.V_shape = None
self.Q_shape = None
env.reset()
[docs]
def get_value_function(self) -> Optional[np.ndarray]:
"""Get the learned value function."""
return self.V
[docs]
def get_q_function(self) -> Optional[np.ndarray]:
"""Get the learned Q-function."""
return self.Q
[docs]
def solve(self,
env: RLEnvironment,
num_episodes: int = 10000,
verbose: bool = True) -> Dict[str, Any]:
"""
Train the agent using TD learning.
Runs TD(0) learning for the specified number of episodes,
learning an optimal routing policy for the queueing network.
Args:
env: The RL environment
num_episodes: Number of training episodes
verbose: Whether to print progress
Returns:
Dictionary with training results:
- 'V': Learned value function
- 'Q': Learned Q-function
- 'mean_cost_rate': Final estimated average cost rate
"""
self.reset(env)
# Initialize value function with extra buffer for state space
v_size = tuple([env.state_size + 5] * env.action_size)
self.V = np.zeros(v_size)
self.V_shape = v_size
# Initialize Q-function
q_size = v_size + (env.action_size,)
self.Q = np.random.rand(*q_size)
self.Q_shape = q_size
# State tracking
x = np.zeros(env.action_size, dtype=np.int32) # Current state
n = np.zeros(env.action_size, dtype=np.int32) # Previous state
t = 0.0 # Current time
c = 0.0 # Cumulative cost
T = 0.0 # Total discounted time
C = 0.0 # Total discounted cost
eps = self.epsilon
episode = 0
while episode < num_episodes:
if verbose and episode % 1000 == 0:
print(f"Running episode #{episode}")
# Decay exploration rate
eps *= self.epsilon_decay
# Sample next event
dt, dep_node = env.sample()
t += dt
# Accumulate holding cost
c += np.sum(x) * dt
# Process event
if dep_node in env.source_indices:
# New arrival - choose routing action
if env.is_in_action_space(x):
# Create epsilon-greedy policy
next_states = []
for a in range(env.action_size):
next_x = x.copy()
next_x[a] += 1
next_states.append(self._get_state_index(next_x + 1))
next_values = np.array([self.V[s] for s in next_states])
policy = self._create_greedy_policy(next_values, eps, env.action_size)
# Sample action from policy
action = np.random.choice(env.action_size, p=policy)
else:
# State space boundary - use Join Shortest Queue
action = np.argmin(x)
x[action] += 1
env.update(x)
elif dep_node in env.queue_indices:
# Departure from queue
queue_idx = env.queue_indices.index(dep_node)
x[queue_idx] = max(0, x[queue_idx] - 1)
env.update(x)
# TD update when in valid state space
if env.is_in_state_space(x):
episode += 1
T = env.gamma * T + t
C = env.gamma * C + c
mean_cost_rate = C / T if T > 0 else 0
prev_state = self._get_state_index(n + 1)
curr_state = self._get_state_index(x + 1)
# TD(0) update
td_target = c - t * mean_cost_rate + self.V[curr_state]
self.V[prev_state] = (1 - self.learning_rate) * self.V[prev_state] + \
self.learning_rate * td_target
# Normalize value function
self.V = self.V - self.V.flat[0]
# Reset for next episode
t = 0.0
c = 0.0
n = x.copy()
return {
'V': self.V,
'Q': self.Q,
'mean_cost_rate': C / T if T > 0 else 0
}
def _get_state_index(self, loc: np.ndarray) -> Tuple:
"""
Convert location array to state index.
Args:
loc: Array of queue lengths (1-indexed)
Returns:
Tuple index into value function array
"""
loc = np.clip(loc, 0, self.V_shape[0] - 1).astype(int)
return tuple(loc)
@staticmethod
def _create_greedy_policy(state_values: np.ndarray,
epsilon: float,
num_actions: int) -> np.ndarray:
"""
Create epsilon-greedy policy from state values.
Args:
state_values: Value estimates for each action
epsilon: Exploration probability
num_actions: Number of possible actions
Returns:
Probability distribution over actions
"""
policy = np.ones(num_actions) * epsilon / num_actions
# Find actions with minimum value (cost minimization)
min_val = np.min(state_values)
best_actions = np.where(np.abs(state_values - min_val) < 1e-10)[0]
# Add exploitation probability to best actions
exploit_prob = (1 - epsilon) / len(best_actions)
policy[best_actions] += exploit_prob
return policy
__all__ = [
'RLEnvironment',
'RLTDAgent',
]