"""
Akyildiz-Bolch AMVA method for multi-server BCMP networks.
This module implements the Akyildiz-Bolch linearizer algorithm for solving
queueing networks with multiple servers and various scheduling strategies.
References:
Akyildiz, I.F. and Bolch, G., "Mean Value Analysis Approximation for
Multiple Server Queueing Networks", Performance Evaluation, 1988.
"""
import numpy as np
from typing import Tuple, Dict, Optional, List
from dataclasses import dataclass
from enum import IntEnum
class SchedStrategy(IntEnum):
"""Scheduling strategies for queueing stations."""
FCFS = 0
LCFS = 1
INF = 2 # Infinite server (delay)
PS = 3 # Processor sharing
[docs]
@dataclass
class AbAmvaResult:
"""Result from Akyildiz-Bolch AMVA algorithm."""
QN: np.ndarray # Queue lengths (M x K)
UN: np.ndarray # Utilization (M x K)
RN: np.ndarray # Residence times (M x K)
CN: np.ndarray # Cycle times (1 x K)
XN: np.ndarray # Throughput (1 x K)
totiter: int # Total iterations
[docs]
def pfqn_ab_amva(
D: np.ndarray,
N: np.ndarray,
V: np.ndarray,
nservers: np.ndarray,
sched: np.ndarray,
fcfs_schmidt: bool = False,
marginal_prob_method: str = 'ab'
) -> AbAmvaResult:
"""
Akyildiz-Bolch AMVA method for multi-server BCMP networks.
Args:
D: Service time matrix (M x K)
N: Population vector (1 x K)
V: Visit ratio matrix (M x K)
nservers: Number of servers at each station (M x 1)
sched: Scheduling strategies for each station (M x 1)
fcfs_schmidt: Whether to use Schmidt formula for FCFS stations
marginal_prob_method: Method for marginal probability ('ab' or 'scat')
Returns:
AbAmvaResult containing queue lengths, utilization, residence times,
cycle times, throughput, and iteration count.
"""
D = np.atleast_2d(D)
N = np.atleast_1d(N).astype(float)
V = np.atleast_2d(V)
nservers = np.atleast_1d(nservers).astype(float)
sched = np.atleast_1d(sched)
M, K = D.shape
QN, UN, RN, CN, XN, totiter = _ab_linearizer(
K, M, N, nservers, sched, V, D, fcfs_schmidt, marginal_prob_method
)
return AbAmvaResult(QN=QN, UN=UN, RN=RN, CN=CN, XN=XN, totiter=totiter)
def _ab_linearizer(
K: int,
M: int,
population: np.ndarray,
nservers: np.ndarray,
sched_type: np.ndarray,
v: np.ndarray,
s: np.ndarray,
fcfs_schmidt: bool,
marginal_prob_method: str
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]:
"""Akyildiz-Bolch linearizer method for multi-server BCMP networks."""
# Initialize queue length matrix
L = np.zeros((M, K))
for i in range(M):
for r in range(K):
L[i, r] = population[r] / M
# Initialize L_without_r matrices
l_without_r: List[np.ndarray] = []
for r in range(K):
l_wr = np.zeros((M, K))
for i in range(M):
for t in range(K):
if r == t:
l_wr[i, t] = (population[r] - 1) / M
else:
l_wr[i, t] = L[i, r]
l_without_r.append(l_wr)
# Fractional changes matrix (initialized to 0)
D_frac = np.zeros((M, K, K))
# STEP 1: Apply core at full population
L_updated, _, _, _, _, _ = _pfqn_ab_core(
K, M, population, nservers, sched_type, v, s, 100, D_frac, L,
fcfs_schmidt, marginal_prob_method
)
# STEP 2: Apply core at N-e_k populations
for r in range(K):
population_without_c = population.copy()
population_without_c[r] = population[r] - 1
l_without_c = np.zeros((M, K))
for j in range(M):
for c in range(K):
l_without_c[j, c] = l_without_r[c][j, c]
ret_q, _, _, _, _, _ = _pfqn_ab_core(
K, M, population_without_c, nservers, sched_type, v, s, 100,
D_frac, l_without_c, fcfs_schmidt, marginal_prob_method
)
for j in range(M):
for c in range(K):
l_without_r[c][j, r] = ret_q[j, c]
# STEP 3: Compute estimates of F_mk(N) and F_mk(N-e_j)
for i in range(M):
for r in range(K):
F_ir = L_updated[i, r] / population[r] if population[r] > 0 else 0
for t in range(K):
if r == t:
divisor = population[r] - 1
else:
divisor = population[r]
if divisor != 0:
F_irt = l_without_r[r][i, t] / divisor
else:
F_irt = 0
if np.isnan(F_irt):
F_irt = 0
D_frac[i, r, t] = F_irt - F_ir
# STEP 4: Apply core at full population using L values from step 1 and D values from step 3
QN, UN, RN, CN, XN, totiter = _pfqn_ab_core(
K, M, population, nservers, sched_type, v, s, 100, D_frac, L_updated,
fcfs_schmidt, marginal_prob_method
)
return QN, UN, RN, CN, XN, totiter
def _pfqn_ab_core(
K: int,
M: int,
population: np.ndarray,
nservers: np.ndarray,
sched_type: np.ndarray,
v: np.ndarray,
s: np.ndarray,
maxiter: int,
D_frac: np.ndarray,
l_in: np.ndarray,
fcfs_schmidt: bool,
marginal_prob_method: str
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]:
"""Akyildiz-Bolch core method for multi-server BCMP networks."""
L = l_in.copy()
tol = 1 / (4000 + 16 * np.sum(population))
totiter = 0
W = np.zeros((M, K))
while totiter < maxiter:
F = np.zeros((M, K))
l_without_j = np.zeros((M, K, K))
for i in range(M):
for r in range(K):
if population[r] > 0:
F[i, r] = L[i, r] / population[r]
else:
F[i, r] = 0
for i in range(M):
for r in range(K):
for t in range(K):
if r == t:
scalar = population[r] - 1
else:
scalar = population[r]
l_without_j[i, r, t] = scalar * (F[i, r] + D_frac[i, r, t])
for i in range(M):
for r in range(K):
if sched_type[i] == SchedStrategy.INF:
W[i, r] = s[i, r]
elif nservers[i] == 1:
total_queue_length = 0
for c in range(K):
total_queue_length += l_without_j[i, c, r]
W[i, r] = s[i, r] * (1 + total_queue_length)
elif fcfs_schmidt and sched_type[i] == SchedStrategy.FCFS:
# Schmidt formula for FCFS (simplified)
wait_time = 0
nvec = _pprod_init(population)
while nvec is not None:
if nvec[r] > 0:
bcn = _get_bcn_for_ab(s, i, r, nvec, K, int(nservers[i]))
prob = _get_marginal_prob(
_oner(nvec, r), _oner(population, r),
population, l_in[i, r], K
)
wait_time += bcn * prob
nvec = _pprod_next(nvec, population)
if wait_time <= 1e-3:
wait_time = 0
W[i, r] = wait_time
else:
queue_length = 0
for j in range(K):
queue_length += l_without_j[i, j, r]
num_servers = int(nservers[i])
multi_server_weighted_ql = 0
if num_servers > 1:
population_without_r = population.copy()
population_without_r[r] = population_without_r[r] - 1
marginal_probs = _find_marginal_probs(
queue_length, num_servers, population_without_r,
r, marginal_prob_method
)
for j in range(num_servers - 1):
prob_j = marginal_probs.get(j, 0)
multi_server_weighted_ql += prob_j * (num_servers - j)
wait_time = (s[i, r] / num_servers) * (1 + queue_length + multi_server_weighted_ql)
W[i, r] = wait_time
# Calculate cycle time for each class
CN = np.zeros(K)
for r in range(K):
cycle_time = 0
for i in range(M):
cycle_time += v[i, r] * W[i, r]
CN[r] = cycle_time
# Calculate queue length L_ir = N_r * W_ir / C_r
iteration_queue_length = np.zeros((M, K))
for i in range(M):
for r in range(K):
if CN[r] > 0:
queue_length = population[r] * (v[i, r] * W[i, r] / CN[r])
else:
queue_length = 0
iteration_queue_length[i, r] = queue_length
# Check convergence
max_difference = 0
for i in range(M):
for r in range(K):
if population[r] > 0:
difference = abs(L[i, r] - iteration_queue_length[i, r]) / population[r]
max_difference = max(max_difference, difference)
totiter += 1
L = iteration_queue_length
if max_difference < tol:
break
# Calculate throughput
XN = np.zeros(K)
for r in range(K):
if W[0, r] > 0:
XN[r] = L[0, r] / W[0, r]
else:
XN[r] = 0
# Calculate utilization
UN = np.zeros((M, K))
for i in range(M):
for r in range(K):
if s[i, r] > 0:
if sched_type[i] == SchedStrategy.INF:
UN[i, r] = XN[r] * s[i, r]
else:
UN[i, r] = (XN[r] * s[i, r]) / nservers[i]
else:
UN[i, r] = 0
QN = L
return QN, UN, W, CN, XN, totiter
def _get_bcn_for_ab(
D: np.ndarray, i: int, c: int, nvec: np.ndarray, K: int, ns: int
) -> float:
"""Calculate Bcn for AB algorithm."""
bcn = D[i, c]
if np.sum(nvec) > 1:
eps_val = 1e-12
sum_val = 0
for t in range(K):
sum_val += nvec[t] * D[i, t]
bcn = bcn + (max(0, np.sum(nvec) - ns) / max(ns * (np.sum(nvec) - 1), eps_val) * (sum_val - D[i, c]))
return bcn
def _get_marginal_prob(
n: np.ndarray, k: np.ndarray, k_pop: np.ndarray, l_jr: float, R: int
) -> float:
"""Compute marginal probability using binomial distribution."""
from scipy.special import comb
prob = 1.0
for r in range(R):
frac = l_jr / k_pop[r] if k_pop[r] > 0 else 0
if frac != 0 and k_pop[r] > 0 and 0 <= n[r] <= k_pop[r]:
term1 = comb(int(k_pop[r]), int(n[r]), exact=True)
term2 = frac ** n[r]
term3 = (1 - frac) ** (k_pop[r] - n[r])
prob = prob * (term1 * term2 * term3)
return prob
def _find_marginal_probs(
avg_jobs: float,
num_servers: int,
population: np.ndarray,
class_idx: int,
marginal_prob_method: str
) -> Dict[int, float]:
"""Finds marginal probabilities using specified method."""
marginal_probs: Dict[int, float] = {}
if marginal_prob_method == 'scat':
floor_val = int(np.floor(avg_jobs))
ceil_val = floor_val + 1
marginal_probs[floor_val] = ceil_val - avg_jobs
marginal_probs[ceil_val] = avg_jobs - floor_val
return marginal_probs
# AB method
ALPHA = 45.0
BETA = 0.7
w = _weight_fun(population, ALPHA, BETA)
floor_val = int(np.floor(avg_jobs))
ceiling = floor_val + 1
max_val = min((2 * floor_val) + 1, num_servers - 2)
for j in range(max_val + 1):
if j <= floor_val:
l_dist = floor_val - j
lower_val = floor_val - l_dist
upper_val = ceiling + l_dist
if l_dist > 25:
prob = 0
else:
if floor_val < population[class_idx]:
if upper_val != lower_val:
prob = w[floor_val, l_dist] * ((upper_val - avg_jobs) / (upper_val - lower_val))
else:
prob = 0
else:
prob = 0
marginal_probs[j] = prob
else:
u_dist = j - ceiling
if u_dist > 25:
marginal_probs[j] = 0
elif j > population[class_idx] - 1 and u_dist < 25:
existing_prob = marginal_probs.get(int(population[class_idx] - 1), 0)
mp_floor_udist = marginal_probs.get(floor_val - u_dist, 0)
new_prob = existing_prob + (w[floor_val, u_dist] - mp_floor_udist)
marginal_probs[int(population[class_idx] - 1)] = new_prob
else:
mp_floor_udist = marginal_probs.get(floor_val - u_dist, 0)
new_prob = w[floor_val, u_dist] - mp_floor_udist
marginal_probs[j] = new_prob
return marginal_probs
def _weight_fun(population: np.ndarray, alpha: float, beta: float) -> np.ndarray:
"""Computes weight function for marginal probability calculation."""
max_class_population = int(np.max(population))
# Calculate scaling function PR
scaling_fun = np.zeros(max_class_population + 1)
if max_class_population >= 1:
scaling_fun[1] = alpha
for n in range(2, max_class_population + 1):
scaling_fun[n] = beta * scaling_fun[n - 1]
# Calculate weight function W
w = np.zeros((max_class_population + 1, max_class_population + 1))
w[0, 0] = 1.0
for l in range(1, max_class_population + 1):
for j in range(l):
w[l, j] = w[l - 1, j] - (w[l - 1, j] * scaling_fun[l]) / 100.0
sum_val = 0
for j in range(l):
sum_val += w[l, j]
w[l, l] = 1 - sum_val
return w
def _pprod_init(population: np.ndarray) -> np.ndarray:
"""Initialize population product iterator."""
return np.zeros_like(population)
def _pprod_next(nvec: np.ndarray, population: np.ndarray) -> Optional[np.ndarray]:
"""Get next population vector in product iteration."""
K = len(nvec)
result = nvec.copy()
for i in range(K):
if result[i] < population[i]:
result[i] += 1
return result
result[i] = 0
return None
def _oner(vec: np.ndarray, idx: int) -> np.ndarray:
"""Decrement vector at given index by 1."""
result = vec.copy()
result[idx] = max(0, result[idx] - 1)
return result
[docs]
def pfqn_ab_core(
K: int,
M: int,
population: np.ndarray,
nservers: np.ndarray,
sched_type: np.ndarray,
v: np.ndarray,
s: np.ndarray,
maxiter: int,
D_frac: np.ndarray,
l_in: np.ndarray,
fcfs_schmidt: bool = False,
marginal_prob_method: str = 'ab'
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, int]:
"""
Akyildiz-Bolch core method for multi-server BCMP networks.
Public wrapper for the internal core algorithm of the Akyildiz-Bolch
linearizer method.
Args:
K: Number of classes
M: Number of stations
population: Population vector (K,)
nservers: Number of servers at each station (M,)
sched_type: Scheduling strategies for each station (M,)
v: Visit ratio matrix (M x K)
s: Service time matrix (M x K)
maxiter: Maximum iterations
D_frac: Fractional changes matrix (M x K x K)
l_in: Initial queue length matrix (M x K)
fcfs_schmidt: Whether to use Schmidt formula for FCFS stations
marginal_prob_method: Method for marginal probability ('ab' or 'scat')
Returns:
Tuple of (QN, UN, RN, CN, XN, totiter):
QN: Queue lengths (M x K)
UN: Utilization (M x K)
RN: Residence times (M x K)
CN: Cycle times (K,)
XN: Throughput (K,)
totiter: Total iterations
References:
Akyildiz, I.F. and Bolch, G., "Mean Value Analysis Approximation for
Multiple Server Queueing Networks", Performance Evaluation, 1988.
"""
return _pfqn_ab_core(
K, M, population, nservers, sched_type, v, s, maxiter, D_frac, l_in,
fcfs_schmidt, marginal_prob_method
)
__all__ = [
'pfqn_ab_amva',
'pfqn_ab_core',
'AbAmvaResult',
'SchedStrategy',
]