Source code for line_solver.api.rl

"""
Reinforcement Learning (RL) agents for queueing control.

Native Python implementations of RL agents for
queueing network control and optimization.

Key classes:
    RLTDAgent: Temporal Difference learning agent
    RLEnvironment: Queueing network environment

Port from:
    matlab/src/api/rl/rl_td_agent.m
    matlab/src/api/rl/rl_env.m
"""

import numpy as np
from typing import Optional, List, Tuple, Dict, Any


[docs] class RLEnvironment: """ Reinforcement Learning environment for queueing networks. This class wraps a queueing network model and provides an interface for RL agents to interact with it through sampling and state updates. Attributes: model: The queueing network model gamma: Discount factor for future rewards queue_indices: Indices of queue nodes in model.nodes source_indices: Indices of source nodes in model.nodes state_size: Maximum state size to consider action_size: Number of possible actions (number of queues) """
[docs] def __init__(self, model: Any, queue_indices: List[int], source_indices: List[int], state_size: int, gamma: float = 0.99): """ Initialize the RL environment. Args: model: Queueing network model queue_indices: Indices of queue nodes in model.nodes source_indices: Indices of source nodes in model.nodes state_size: Maximum number of jobs per queue to consider gamma: Discount factor (0 < gamma <= 1) """ self.model = model self.queue_indices = queue_indices self.source_indices = source_indices self.state_size = state_size self.gamma = gamma self.action_size = len(queue_indices)
[docs] def is_in_state_space(self, state: np.ndarray) -> bool: """ Check if current state is within the defined state space. Args: state: Array of queue lengths Returns: True if all queue lengths are within state_size bounds """ return np.all(state <= self.state_size)
[docs] def is_in_action_space(self, state: np.ndarray) -> bool: """ Check if actions are valid from current state. Args: state: Array of queue lengths Returns: True if all queues can accept new jobs (not at capacity) """ return np.all(state < self.state_size)
[docs] def sample(self) -> Tuple[float, int]: """ Sample the next event from the environment. Uses the SSA solver to sample a single system event. Returns: Tuple of (time_delta, departure_node_index) """ try: from ...solvers import SolverSSA solver = SolverSSA(self.model) sample = solver.sample_sys_aggr(1) t = sample['t'] # Determine which node had the departure dep_node = -1 for event in sample.get('events', []): if event.get('event') == 'DEP': dep_node = event.get('node', -1) break return t, dep_node except Exception: # Fallback: return random exponential time and random node t = np.random.exponential(1.0) dep_node = np.random.choice(self.queue_indices + self.source_indices) return t, dep_node
[docs] def update(self, new_state: np.ndarray) -> None: """ Update the model state after an event. Args: new_state: New queue lengths for each queue """ # Update queue states in the model if hasattr(self.model, 'nodes'): for i, queue_idx in enumerate(self.queue_indices): if hasattr(self.model.nodes[queue_idx], 'state'): self.model.nodes[queue_idx].state = int(new_state[i])
[docs] def reset(self) -> np.ndarray: """ Reset the environment to initial state. Returns: Initial state (zeros) """ if hasattr(self.model, 'reset'): self.model.reset() if hasattr(self.model, 'init_default'): self.model.init_default() return np.zeros(self.action_size, dtype=np.int32)
def get_state(self) -> np.ndarray: """ Get current state from the model. Returns: Array of queue lengths """ state = np.zeros(self.action_size, dtype=np.int32) if hasattr(self.model, 'nodes'): for i, queue_idx in enumerate(self.queue_indices): if hasattr(self.model.nodes[queue_idx], 'state'): state[i] = int(np.sum(self.model.nodes[queue_idx].state)) return state
[docs] class RLTDAgent: """ Temporal Difference (TD) learning agent for queueing control. Implements average-reward TD learning for optimal routing decisions in queueing networks. The agent learns a value function V(s) that estimates the long-run average cost from each state. Attributes: learning_rate: Step size for value function updates epsilon: Exploration rate for epsilon-greedy policy epsilon_decay: Decay factor for exploration rate V: Value function array Q: Q-function array (state-action values) """
[docs] def __init__(self, learning_rate: float = 0.05, epsilon: float = 1.0, epsilon_decay: float = 0.99): """ Initialize the TD agent. Args: learning_rate: Learning rate (step size) for updates epsilon: Initial exploration rate (0 to 1) epsilon_decay: Decay factor applied to epsilon each episode """ self.learning_rate = learning_rate self.epsilon = epsilon self.epsilon_decay = epsilon_decay # Value function and Q-function (initialized on first use) self.V: Optional[np.ndarray] = None self.Q: Optional[np.ndarray] = None self.V_shape: Optional[Tuple] = None self.Q_shape: Optional[Tuple] = None
[docs] def reset(self, env: RLEnvironment) -> None: """ Reset agent and environment. Args: env: The RL environment """ self.V = None self.Q = None self.V_shape = None self.Q_shape = None env.reset()
[docs] def get_value_function(self) -> Optional[np.ndarray]: """Get the learned value function.""" return self.V
[docs] def get_q_function(self) -> Optional[np.ndarray]: """Get the learned Q-function.""" return self.Q
[docs] def solve(self, env: RLEnvironment, num_episodes: int = 10000, verbose: bool = True) -> Dict[str, Any]: """ Train the agent using TD learning. Runs TD(0) learning for the specified number of episodes, learning an optimal routing policy for the queueing network. Args: env: The RL environment num_episodes: Number of training episodes verbose: Whether to print progress Returns: Dictionary with training results: - 'V': Learned value function - 'Q': Learned Q-function - 'mean_cost_rate': Final estimated average cost rate """ self.reset(env) # Initialize value function with extra buffer for state space v_size = tuple([env.state_size + 5] * env.action_size) self.V = np.zeros(v_size) self.V_shape = v_size # Initialize Q-function q_size = v_size + (env.action_size,) self.Q = np.random.rand(*q_size) self.Q_shape = q_size # State tracking x = np.zeros(env.action_size, dtype=np.int32) # Current state n = np.zeros(env.action_size, dtype=np.int32) # Previous state t = 0.0 # Current time c = 0.0 # Cumulative cost T = 0.0 # Total discounted time C = 0.0 # Total discounted cost eps = self.epsilon episode = 0 while episode < num_episodes: if verbose and episode % 1000 == 0: print(f"Running episode #{episode}") # Decay exploration rate eps *= self.epsilon_decay # Sample next event dt, dep_node = env.sample() t += dt # Accumulate holding cost c += np.sum(x) * dt # Process event if dep_node in env.source_indices: # New arrival - choose routing action if env.is_in_action_space(x): # Create epsilon-greedy policy next_states = [] for a in range(env.action_size): next_x = x.copy() next_x[a] += 1 next_states.append(self._get_state_index(next_x + 1)) next_values = np.array([self.V[s] for s in next_states]) policy = self._create_greedy_policy(next_values, eps, env.action_size) # Sample action from policy action = np.random.choice(env.action_size, p=policy) else: # State space boundary - use Join Shortest Queue action = np.argmin(x) x[action] += 1 env.update(x) elif dep_node in env.queue_indices: # Departure from queue queue_idx = env.queue_indices.index(dep_node) x[queue_idx] = max(0, x[queue_idx] - 1) env.update(x) # TD update when in valid state space if env.is_in_state_space(x): episode += 1 T = env.gamma * T + t C = env.gamma * C + c mean_cost_rate = C / T if T > 0 else 0 prev_state = self._get_state_index(n + 1) curr_state = self._get_state_index(x + 1) # TD(0) update td_target = c - t * mean_cost_rate + self.V[curr_state] self.V[prev_state] = (1 - self.learning_rate) * self.V[prev_state] + \ self.learning_rate * td_target # Normalize value function self.V = self.V - self.V.flat[0] # Reset for next episode t = 0.0 c = 0.0 n = x.copy() return { 'V': self.V, 'Q': self.Q, 'mean_cost_rate': C / T if T > 0 else 0 }
def _get_state_index(self, loc: np.ndarray) -> Tuple: """ Convert location array to state index. Args: loc: Array of queue lengths (1-indexed) Returns: Tuple index into value function array """ loc = np.clip(loc, 0, self.V_shape[0] - 1).astype(int) return tuple(loc) @staticmethod def _create_greedy_policy(state_values: np.ndarray, epsilon: float, num_actions: int) -> np.ndarray: """ Create epsilon-greedy policy from state values. Args: state_values: Value estimates for each action epsilon: Exploration probability num_actions: Number of possible actions Returns: Probability distribution over actions """ policy = np.ones(num_actions) * epsilon / num_actions # Find actions with minimum value (cost minimization) min_val = np.min(state_values) best_actions = np.where(np.abs(state_values - min_val) < 1e-10)[0] # Add exploitation probability to best actions exploit_prob = (1 - epsilon) / len(best_actions) policy[best_actions] += exploit_prob return policy
[docs] class RLEnvironmentGeneral: """ General RL environment for queueing networks with flexible action spaces. Unlike RLEnvironment which assumes sources dispatch to queues, this class supports arbitrary action nodes with configurable routing destinations. Actions are dispatching decisions at specific nodes that route jobs to connected downstream nodes. Attributes: model: The queueing network model gamma: Discount factor for future rewards queue_indices: Indices of queue nodes in model.nodes nqueues: Number of queues action_node_indices: Indices of nodes where routing actions are needed state_size: Maximum state size to consider action_space: Dict mapping action_node -> list of possible destination nodes MATLAB: matlab/src/api/rl/rl_env_general.m """
[docs] def __init__(self, model: Any, queue_indices: List[int], action_node_indices: List[int], state_size: int, gamma: float = 0.99): """ Initialize the general RL environment. Args: model: Queueing network model queue_indices: Indices of queue nodes in model.nodes action_node_indices: Indices of nodes where dispatch actions are taken state_size: Maximum number of jobs per queue to consider gamma: Discount factor (0 < gamma <= 1) """ self.model = model self.queue_indices = queue_indices self.nqueues = len(queue_indices) self.action_node_indices = action_node_indices self.state_size = state_size self.gamma = gamma # Build action space: for each action node, find connected destinations self.action_space: Dict[int, List[int]] = {} if hasattr(model, 'connections'): connections = model.connections for node_idx in action_node_indices: if hasattr(connections, '__getitem__'): try: row = connections[node_idx] destinations = [j for j in range(len(row)) if row[j] == 1] self.action_space[node_idx] = destinations except (IndexError, TypeError): self.action_space[node_idx] = list(queue_indices) else: self.action_space[node_idx] = list(queue_indices) else: for node_idx in action_node_indices: self.action_space[node_idx] = list(queue_indices)
[docs] def is_in_state_space(self, state: np.ndarray) -> bool: """ Check if the given state is within the defined state space. Args: state: Array of queue lengths (one per queue) Returns: True if all queue lengths are within state_size bounds """ if len(state) != self.nqueues: return False return np.all(state <= self.state_size)
[docs] def is_in_action_space(self, state: np.ndarray) -> bool: """ Check if actions are valid from the given state. Args: state: Array of queue lengths Returns: True if at least one queue can accept a new job """ if len(state) != self.nqueues: return False return np.all(state < self.state_size)
[docs] def sample(self) -> Tuple[float, int, int, Any]: """ Sample the next event from the environment. Uses the SSA solver to sample a single system event. Returns: Tuple of (time_delta, departure_node, arrival_node, sample_data) """ dep_node = -1 arv_node = -1 sample_data = None try: from ...solvers import SolverSSA solver = SolverSSA(self.model) sample_data = solver.sample_sys_aggr(1) dt = sample_data['t'] for event in sample_data.get('events', []): etype = event.get('event', '') if etype == 'DEP': dep_node = event.get('node', -1) elif etype == 'ARV': arv_node = event.get('node', -1) return dt, dep_node, arv_node, sample_data except Exception: dt = np.random.exponential(1.0) dep_node = np.random.choice(self.queue_indices) arv_node = np.random.choice(self.queue_indices) return dt, dep_node, arv_node, sample_data
[docs] def update(self, sample_data: Any) -> None: """ Update the model state using the sample event data. Applies the state transitions from the sampled events to the model. Args: sample_data: Sample data from the SSA solver """ if sample_data is None: return if hasattr(self.model, 'nodes') and hasattr(sample_data, 'get'): for event in sample_data.get('events', []): node_idx = event.get('node', -1) if node_idx >= 0 and node_idx < len(self.model.nodes): node = self.model.nodes[node_idx] if hasattr(node, 'state'): etype = event.get('event', '') if etype == 'DEP': node.state = max(0, int(np.sum(node.state)) - 1) elif etype == 'ARV': node.state = int(np.sum(node.state)) + 1
[docs] def reset(self) -> np.ndarray: """ Reset the environment to initial state. Returns: Initial state (zeros) """ if hasattr(self.model, 'reset'): self.model.reset() if hasattr(self.model, 'init_default'): self.model.init_default() return np.zeros(self.nqueues, dtype=np.int32)
[docs] class RLTDAgentGeneral: """ General TD learning agent for queueing control with flexible policies. Supports multiple solve methods: - solve(): TD control with tabular value function - solve_for_fixed_policy(): TD learning (evaluation) with fixed heuristic - solve_by_hashmap(): TD control with hash-map value function - solve_by_linear(): TD control with linear function approximation - solve_by_quad(): TD control with quadratic function approximation Works with RLEnvironmentGeneral for arbitrary action spaces. MATLAB: matlab/src/api/rl/rl_td_agent_general.m """
[docs] def __init__(self, learning_rate: float = 0.1, epsilon: float = 1.0, epsilon_decay: float = 0.9999): """ Initialize the general TD agent. Args: learning_rate: Learning rate for updates epsilon: Initial exploration rate (0 to 1) epsilon_decay: Decay factor applied to epsilon each episode """ self.learning_rate = learning_rate self.epsilon = epsilon self.epsilon_decay = epsilon_decay self.V: Optional[np.ndarray] = None self.V_shape: Optional[Tuple] = None
[docs] def reset(self, env: RLEnvironmentGeneral) -> None: """ Reset agent and environment. Args: env: The RL environment """ self.V = None self.V_shape = None env.reset()
[docs] def get_value_function(self) -> Optional[np.ndarray]: """Get the learned value function.""" return self.V
[docs] def solve_for_fixed_policy(self, env: RLEnvironmentGeneral, num_episodes: int = 10000, verbose: bool = True) -> np.ndarray: """ TD learning for value function evaluation with heuristic routing. Evaluates the value function under the existing (model-defined) routing policy without modifying routing decisions. Args: env: The general RL environment num_episodes: Number of training episodes verbose: Whether to print progress Returns: Learned value function array MATLAB: rl_td_agent_general.solve_for_fixed_policy """ self.reset(env) v_size = tuple([env.state_size + 1] * env.nqueues) self.V = np.zeros(v_size) self.V_shape = v_size t = 0.0 c = 0.0 T = 0.0 C = 0.0 x = np.zeros(env.nqueues, dtype=np.int32) n_prev = np.zeros(env.nqueues, dtype=np.int32) episode = 0 while episode < num_episodes: if verbose and episode % 1000 == 0: print(f"Running episode #{episode}") dt, dep_node, arv_node, sample_data = env.sample() t += dt c += np.sum(x) * dt # Process departure if dep_node in env.queue_indices: dep_server = env.queue_indices.index(dep_node) x[dep_server] = max(0, x[dep_server] - 1) # Process arrival if arv_node in env.queue_indices: arv_server = env.queue_indices.index(arv_node) x[arv_server] += 1 env.update(sample_data) if env.is_in_state_space(x): episode += 1 T = env.gamma * T + t C = env.gamma * C + c mean_cost_rate = C / T if T > 0 else 0.0 prev_idx = tuple(np.clip(n_prev + 1, 0, self.V_shape[0] - 1).astype(int)) curr_idx = tuple(np.clip(x + 1, 0, self.V_shape[0] - 1).astype(int)) self.V[prev_idx] = ((1 - self.learning_rate) * self.V[prev_idx] + self.learning_rate * (c - t * mean_cost_rate + self.V[curr_idx])) self.V = self.V - self.V.flat[0] t = 0.0 c = 0.0 n_prev = x.copy() return self.V
[docs] def solve(self, env: RLEnvironmentGeneral, num_episodes: int = 10000, verbose: bool = True) -> np.ndarray: """ TD control with tabular value function. Learns an optimal routing policy using epsilon-greedy exploration. At each action node departure, the agent selects the best downstream destination based on the current value function. Args: env: The general RL environment num_episodes: Number of training episodes verbose: Whether to print progress Returns: Learned value function array MATLAB: rl_td_agent_general.solve """ self.reset(env) v_size = tuple([env.state_size + 1] * env.nqueues) self.V = np.zeros(v_size) self.V_shape = v_size t = 0.0 c = 0.0 T = 0.0 C = 0.0 x = np.zeros(env.nqueues, dtype=np.int32) n_prev = np.zeros(env.nqueues, dtype=np.int32) eps = self.epsilon episode = 0 while episode < num_episodes: if verbose and episode % 1000 == 0: print(f"Running episode #{episode}") eps *= self.epsilon_decay dt, dep_node, arv_node, sample_data = env.sample() t += dt c += np.sum(x) * dt # Process departure if dep_node in env.queue_indices: dep_server = env.queue_indices.index(dep_node) x[dep_server] = max(0, x[dep_server] - 1) # If departure is at an action node and we can act if dep_node in env.action_node_indices and env.is_in_action_space(x): actions = env.action_space.get(dep_node, []) if len(actions) > 0: # Compute next values for each possible action next_values = self._gen_next_values(env, x, actions) policy = self._create_greedy_policy(next_values, eps, len(actions)) # Sample action chosen_idx = np.random.choice(len(actions), p=policy) arv_node = actions[chosen_idx] # Update sample data to reflect the chosen arrival node if sample_data is not None and hasattr(sample_data, 'get'): for event in sample_data.get('events', []): if event.get('event') == 'ARV': event['node'] = arv_node break # Process arrival if arv_node in env.queue_indices: arv_server = env.queue_indices.index(arv_node) x[arv_server] += 1 env.update(sample_data) if env.is_in_state_space(x): episode += 1 T = env.gamma * T + t C = env.gamma * C + c mean_cost_rate = C / T if T > 0 else 0.0 prev_idx = tuple(np.clip(n_prev + 1, 0, self.V_shape[0] - 1).astype(int)) curr_idx = tuple(np.clip(x + 1, 0, self.V_shape[0] - 1).astype(int)) self.V[prev_idx] = ((1 - self.learning_rate) * self.V[prev_idx] + self.learning_rate * (c - t * mean_cost_rate + self.V[curr_idx])) self.V = self.V - self.V.flat[0] t = 0.0 c = 0.0 n_prev = x.copy() return self.V
[docs] def solve_by_hashmap(self, env: RLEnvironmentGeneral, num_episodes: int = 10000, verbose: bool = True) -> Tuple[np.ndarray, np.ndarray]: """ TD control using hash-map value function. Uses a dictionary to store value estimates only for visited states, with an 'external' default for unvisited states. Args: env: The general RL environment num_episodes: Number of training episodes verbose: Whether to print progress Returns: Tuple of (X, Y) where: X: State features matrix (n_states x (1 + nqueues)) Y: Value estimates (n_states x 1) MATLAB: rl_td_agent_general.solve_by_hashmap """ self.reset(env) point_values: Dict[str, float] = {} zero_key = str(np.zeros(env.nqueues, dtype=int).tolist()) point_values[zero_key] = 0.0 point_values['external'] = 0.0 t = 0.0 c = 0.0 T = 0.0 C = 0.0 x = np.zeros(env.nqueues, dtype=np.int32) n_prev = np.zeros(env.nqueues, dtype=np.int32) eps = self.epsilon episode = 0 while episode < num_episodes: if verbose and episode % 1000 == 0: print(f"Running episode #{episode}") eps *= self.epsilon_decay dt, dep_node, arv_node, sample_data = env.sample() t += dt c += np.sum(x) * dt # Process departure if dep_node in env.queue_indices: dep_server = env.queue_indices.index(dep_node) x[dep_server] = max(0, x[dep_server] - 1) # Action at action nodes if dep_node in env.action_node_indices and env.is_in_action_space(x): actions = env.action_space.get(dep_node, []) if len(actions) > 0: next_point_values = np.zeros(len(actions)) for act_i, act_node in enumerate(actions): if act_node in env.queue_indices: q_idx = env.queue_indices.index(act_node) tmp_state = x.copy() tmp_state[q_idx] += 1 key = str(tmp_state.tolist()) if key in point_values: next_point_values[act_i] = point_values[key] else: next_point_values[act_i] = point_values['external'] policy = self._create_greedy_policy(next_point_values, eps, len(actions)) chosen_idx = np.random.choice(len(actions), p=policy) arv_node = actions[chosen_idx] if sample_data is not None and hasattr(sample_data, 'get'): for event in sample_data.get('events', []): if event.get('event') == 'ARV': event['node'] = arv_node break # Process arrival if arv_node in env.queue_indices: arv_server = env.queue_indices.index(arv_node) x[arv_server] += 1 env.update(sample_data) if env.is_in_state_space(x): episode += 1 T = env.gamma * T + t C = env.gamma * C + c mean_cost_rate = C / T if T > 0 else 0.0 n_key = str(n_prev.tolist()) x_key = str(x.tolist()) if n_key not in point_values: point_values[n_key] = point_values['external'] if x_key in point_values: point_values[n_key] = ((1 - self.learning_rate) * point_values[n_key] + self.learning_rate * (c - t * mean_cost_rate + point_values[x_key])) else: point_values[n_key] = ((1 - self.learning_rate) * point_values[n_key] + self.learning_rate * (c - t * mean_cost_rate + point_values['external'])) # Normalize relative to zero state if np.sum(n_prev) == 0: subtractor = point_values[n_key] for k in point_values: point_values[k] -= subtractor t = 0.0 c = 0.0 n_prev = x.copy() # Build output arrays if 'external' in point_values: del point_values['external'] n_entries = len(point_values) X = np.zeros((n_entries, 1 + env.nqueues)) Y = np.zeros((n_entries, 1)) for idx, (key, val) in enumerate(point_values.items()): state = np.array(eval(key)) # Parse state from string key X[idx, 0] = 1.0 # Intercept X[idx, 1:] = state Y[idx, 0] = val return X, Y
[docs] def solve_by_linear(self, env: RLEnvironmentGeneral, num_episodes: int = 10000, verbose: bool = True) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ TD control with linear function approximation. Learns a linear value function: v(q1,...,qn) = w0 + w1*q1 + ... + wn*qn Args: env: The general RL environment num_episodes: Number of training episodes verbose: Whether to print progress Returns: Tuple of (X, Y, coefficients) where: X: State features matrix Y: Value estimates coefficients: Linear regression coefficients MATLAB: rl_td_agent_general.solve_by_linear """ X, Y = self.solve_by_hashmap(env, num_episodes, verbose) # Linear regression: Y = X * coeff try: coeff, _, _, _ = np.linalg.lstsq(X, Y, rcond=None) except Exception: coeff = np.zeros((X.shape[1], 1)) return X, Y, coeff
[docs] def solve_by_quad(self, env: RLEnvironmentGeneral, num_episodes: int = 10000, verbose: bool = True) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ TD control with quadratic function approximation. Learns a quadratic value function: v(q1,...,qn) = sum_{i,j} w_{ij} * q_i * q_j Args: env: The general RL environment num_episodes: Number of training episodes verbose: Whether to print progress Returns: Tuple of (X_quad, Y, coefficients) where: X_quad: Augmented features (linear + quadratic terms) Y: Value estimates coefficients: Quadratic regression coefficients MATLAB: rl_td_agent_general.solve_by_quad """ X, Y = self.solve_by_hashmap(env, num_episodes, verbose) # Augment X with quadratic terms n_cols = X.shape[1] quad_features = [] for i in range(1, n_cols): # Skip intercept for j in range(i, n_cols): quad_features.append(X[:, i] * X[:, j]) if quad_features: X_quad = np.hstack([X] + [f.reshape(-1, 1) for f in quad_features]) else: X_quad = X try: coeff, _, _, _ = np.linalg.lstsq(X_quad, Y, rcond=None) except Exception: coeff = np.zeros((X_quad.shape[1], 1)) return X_quad, Y, coeff
def _gen_next_values(self, env: RLEnvironmentGeneral, cur_state: np.ndarray, actions: List[int]) -> np.ndarray: """ Compute value estimates for each possible action from current state. Args: env: Environment cur_state: Current queue lengths actions: List of possible action (destination node) indices Returns: Array of value estimates for each action """ values = np.zeros(len(actions)) for act_i, act_node in enumerate(actions): if act_node in env.queue_indices: q_idx = env.queue_indices.index(act_node) tmp_loc = cur_state.copy() + 1 tmp_loc[q_idx] += 1 tmp_idx = tuple(np.clip(tmp_loc, 0, self.V_shape[0] - 1).astype(int)) values[act_i] = self.V[tmp_idx] return values @staticmethod def _create_greedy_policy(state_values: np.ndarray, epsilon: float, num_actions: int) -> np.ndarray: """ Create epsilon-greedy policy from state-action values. Args: state_values: Value estimates for each action epsilon: Exploration probability num_actions: Number of possible actions Returns: Probability distribution over actions """ policy = np.ones(num_actions) * epsilon / num_actions min_val = np.min(state_values) best_actions = np.where(np.abs(state_values - min_val) < 1e-10)[0] exploit_prob = (1 - epsilon) / len(best_actions) policy[best_actions] += exploit_prob return policy
__all__ = [ 'RLEnvironment', 'RLTDAgent', 'RLEnvironmentGeneral', 'RLTDAgentGeneral', ]