Source code for line_solver.api.rl


"""
Reinforcement learning environments for queueing networks.

This module provides reinforcement learning (RL) environments that
integrate with LINE queueing network models, enabling RL agents
to learn control policies for queueing systems.

Key classes:
- RlEnv: Basic RL environment for queueing networks
- RlEnvGeneral: General-purpose RL environment
- RlTDAgent: Temporal difference learning agent
- RlTDAgentGeneral: General TD agent

These environments support research into adaptive control of queueing
systems using reinforcement learning techniques.
"""

import jpype
import numpy as np
from line_solver import jlineMatrixToArray, jlineMatrixFromArray


[docs] class RlEnv:
[docs] def __init__(self, model, idx_of_queue_in_nodes, idx_of_source_in_nodes, state_size, gamma): """Initialize the RL environment.""" self._java_env = jpype.JPackage('jline').api.rl.RlEnv( model, jpype.JArray(jpype.JInt)(idx_of_queue_in_nodes), jpype.JArray(jpype.JInt)(idx_of_source_in_nodes), jpype.JInt(state_size), jpype.JDouble(gamma) )
@property def model(self): """Get the network model.""" return self._java_env.model @property def action_size(self): """Get the number of possible actions.""" return self._java_env.actionSize
[docs] def is_in_state_space(self, nodes): """Check if nodes configuration is in state space.""" java_nodes = jpype.JArray(jpype.JObject)(len(nodes)) for i, node in enumerate(nodes): java_nodes[i] = jpype.JInt(node) return self._java_env.isInStateSpace(java_nodes)
[docs] def is_in_action_space(self, nodes): """Check if nodes configuration is in action space.""" java_nodes = jpype.JArray(jpype.JObject)(len(nodes)) for i, node in enumerate(nodes): java_nodes[i] = jpype.JInt(node) return self._java_env.isInActionSpace(java_nodes)
[docs] def sample(self): """Sample state and action from environment.""" result = self._java_env.sample() return result.getFirst(), result.getSecond()
[docs] def update(self, new_state): """Update environment with new state.""" java_state = jpype.JArray(jpype.JInt)(new_state) self._java_env.update(java_state)
[docs] def reset(self): """Reset environment to initial state.""" self._java_env.reset()
[docs] class RlEnvGeneral:
[docs] def __init__(self, model, idx_of_queue_in_nodes, idx_of_action_nodes, state_size, gamma): """Initialize the general RL environment.""" self._java_env = jpype.JPackage('jline').api.rl.RlEnvGeneral( model, jpype.JArray(jpype.JInt)(idx_of_queue_in_nodes), jpype.JArray(jpype.JInt)(idx_of_action_nodes), jpype.JInt(state_size), jpype.JDouble(gamma) )
@property def model(self): """Get the network model.""" return self._java_env.model @property def nqueues(self): """Get the number of queues.""" return self._java_env.nqueues @property def action_space(self): """Get the action space mapping.""" return self._java_env.actionSpace
[docs] def is_in_state_space(self, state): """Check if state is in state space.""" java_state = jpype.JArray(jpype.JInt)(state) return self._java_env.isInStateSpace(java_state)
[docs] def is_in_action_space(self, state): """Check if state is in action space.""" java_state = jpype.JArray(jpype.JInt)(state) return self._java_env.isInActionSpace(java_state)
[docs] def sample(self): """Sample from environment.""" return self._java_env.sample()
[docs] def update(self, sample): """Update environment with sample.""" self._java_env.update(sample)
[docs] def reset(self): """Reset environment to initial state.""" self._java_env.reset()
[docs] class RlTDAgent:
[docs] def __init__(self, lr=0.05, epsilon=1.0, eps_decay=0.99): """Initialize the TD agent.""" self._java_agent = jpype.JPackage('jline').api.rl.RlTDAgent( jpype.JDouble(lr), jpype.JDouble(epsilon), jpype.JDouble(eps_decay) )
[docs] def reset(self, env): """Reset agent with environment.""" self._java_agent.reset(env._java_env)
[docs] def get_value_function(self): """Get value function as numpy array.""" v = self._java_agent.getValueFunction() if v is None: return None result = np.zeros((len(v), len(v[0]))) for i in range(len(v)): for j in range(len(v[i])): result[i][j] = v[i][j] return result
[docs] def get_q_function(self): """Get Q-function as numpy array.""" q = self._java_agent.getQFunction() if q is None: return None result = np.zeros((len(q), len(q[0]), len(q[0][0]))) for i in range(len(q)): for j in range(len(q[i])): for k in range(len(q[i][j])): result[i][j][k] = q[i][j][k] return result
[docs] def solve(self, env): """Solve RL problem in environment.""" self._java_agent.solve(env._java_env)
[docs] @staticmethod def create_greedy_policy(state_q, epsilon, n_a): """Create greedy policy from Q-values.""" result = jpype.JPackage('jline').api.rl.RlTDAgent.createGreedyPolicy( jpype.JArray(jpype.JDouble)(state_q), jpype.JDouble(epsilon), jpype.JInt(n_a) ) return np.array(result)
[docs] @staticmethod def get_state_from_loc(obj_size, loc): """Get state vector from location indices.""" result = jpype.JPackage('jline').api.rl.RlTDAgent.getStateFromLoc( jpype.JArray(jpype.JInt)(obj_size), jpype.JArray(jpype.JInt)(loc) ) return np.array(result)
[docs] class RlTDAgentGeneral:
[docs] def __init__(self, lr=0.1, epsilon=1.0, eps_decay=0.9999): """Initialize the advanced TD agent.""" self._java_agent = jpype.JPackage('jline').api.rl.RlTDAgentGeneral( jpype.JDouble(lr), jpype.JDouble(epsilon), jpype.JDouble(eps_decay) )
[docs] def reset(self, env): """Reset agent with environment.""" self._java_agent.reset(env._java_env)
[docs] def get_value_function(self): """Get value function as numpy array.""" v = self._java_agent.getValueFunction() if v is None: return None result = np.zeros((len(v), len(v[0]))) for i in range(len(v)): for j in range(len(v[i])): result[i][j] = v[i][j] return result
[docs] def solve_for_fixed_policy(self, env, num_episodes=10000): """Solve for fixed policy with given episodes.""" result = self._java_agent.solveForFixedPolicy(env._java_env, jpype.JInt(num_episodes)) v = np.zeros((len(result), len(result[0]))) for i in range(len(result)): for j in range(len(result[i])): v[i][j] = result[i][j] return v
[docs] def solve(self, env, num_episodes=10000): """Solve RL problem with given episodes.""" result = self._java_agent.solve(env._java_env, jpype.JInt(num_episodes)) v = np.zeros((len(result), len(result[0]))) for i in range(len(result)): for j in range(len(result[i])): v[i][j] = result[i][j] return v
[docs] def solve_by_hashmap(self, env, num_episodes=10000): """Solve using hashmap-based value iteration.""" result = self._java_agent.solveByHashmap(env._java_env, jpype.JInt(num_episodes)) v_java = result.getFirst() v = np.zeros((len(v_java), len(v_java[0]))) for i in range(len(v_java)): for j in range(len(v_java[i])): v[i][j] = v_java[i][j] rewards = np.array(result.getSecond()) return v, rewards
[docs] def solve_by_linear(self, env, num_episodes=10000): """Solve using linear function approximation.""" result = self._java_agent.solveByLinear(env._java_env, jpype.JInt(num_episodes)) v_java = result.getFirst() v = np.zeros((len(v_java), len(v_java[0]))) for i in range(len(v_java)): for j in range(len(v_java[i])): v[i][j] = v_java[i][j] rewards = np.array(result.getSecond()) theta = np.array(result.getThird()) return v, rewards, theta
[docs] def solve_by_quad(self, env, num_episodes=10000): """Solve using quadratic function approximation.""" result = self._java_agent.solveByQuad(env._java_env, jpype.JInt(num_episodes)) v_java = result.getFirst() v = np.zeros((len(v_java), len(v_java[0]))) for i in range(len(v_java)): for j in range(len(v_java[i])): v[i][j] = v_java[i][j] rewards = np.array(result.getSecond()) theta = np.array(result.getThird()) return v, rewards, theta
[docs] @staticmethod def create_greedy_policy(state_q, epsilon, n_a): """Create greedy policy from Q-values.""" result = jpype.JPackage('jline').api.rl.RlTDAgentGeneral.createGreedyPolicy( jpype.JArray(jpype.JDouble)(state_q), jpype.JDouble(epsilon), jpype.JInt(n_a) ) return np.array(result)