LINE Solver
MATLAB API documentation
Loading...
Searching...
No Matches
solver_mam_basic_mmap.m
1function [QN,UN,RN,TN,CN,XN,totiter] = solver_mam_basic_mmap(sn, options)
2% [QN,UN,RN,TN,CN,XN,TOTITER] = SOLVER_MAM_BASIC_MMAP(SN, OPTIONS)
3%
4% MAM decomposition solver with fork-join synchronization via mmap_max.
5% Follows the solver_mam.m (dec.mmap) iteration pattern with FJ-aware
6% traffic analysis using solver_mam_traffic_mmap.
7%
8% Supports open networks with general FJ topologies and heterogeneous
9% service distributions at parallel queues.
10%
11% Copyright (c) 2012-2026, Imperial College London
12% All rights reserved.
13
14config = options.config;
15if ~isfield(config, 'fj_sync_q_len')
16 config.fj_sync_q_len = 2;
17end
18if ~isfield(config, 'etaqa_trunc')
19 config.etaqa_trunc = 8;
20end
21
22PH = sn.proc;
23I = sn.nnodes;
24M = sn.nstations;
25K = sn.nclasses;
26C = sn.nchains;
27N = sn.njobs';
28V = cellsum(sn.visits);
29S = 1./sn.rates;
30
31QN = zeros(M,K);
32UN = zeros(M,K);
33RN = zeros(M,K);
34TN = zeros(M,K);
35CN = zeros(1,K);
36XN = zeros(1,K);
37
38% Build FJ synchronization map
39fjSyncMap = sn_build_fj_sync_map(sn);
40
41% Determine per-chain arrival rates
42lambda = zeros(1,K);
43for c=1:C
44 inchain = sn.inchain{c};
45 lambdas_inchain = sn.rates(sn.refstat(inchain(1)),inchain);
46 lambdas_inchain = lambdas_inchain(isfinite(lambdas_inchain));
47 lambda(inchain) = sum(lambdas_inchain);
48end
49
50% Prepare PH service distributions
51pie = {};
52D0 = {};
53for ist=1:M
54 switch sn.sched(ist)
55 case SchedStrategy.EXT
56 TN(ist,:) = sn.rates(ist,:);
57 TN(ist,isnan(TN(ist,:))) = 0;
58 case {SchedStrategy.FCFS, SchedStrategy.HOL, SchedStrategy.FCFSPRPRIO}
59 for k=1:K
60 % divide service time by number of servers and put
61 % later a surrogate delay server in tandem to compensate
62 PH{ist}{k} = map_scale(PH{ist}{k}, map_mean(PH{ist}{k})/sn.nservers(ist));
63 pie{ist}{k} = map_pie(PH{ist}{k});
64 D0{ist,k} = PH{ist}{k}{1};
65 if any(isnan(D0{ist,k}))
66 D0{ist,k} = -GlobalConstants.Immediate;
67 pie{ist}{k} = 1;
68 PH{ist}{k} = map_exponential(GlobalConstants.Immediate);
69 end
70 end
71 case SchedStrategy.INF
72 for k=1:K
73 pie{ist}{k} = map_pie(PH{ist}{k});
74 D0{ist,k} = PH{ist}{k}{1};
75 if any(isnan(D0{ist,k}))
76 D0{ist,k} = -GlobalConstants.Immediate;
77 pie{ist}{k} = 1;
78 PH{ist}{k} = map_exponential(GlobalConstants.Immediate);
79 end
80 end
81 case SchedStrategy.PS
82 for k=1:K
83 PH{ist}{k} = map_scale(PH{ist}{k}, map_mean(PH{ist}{k})/sn.nservers(ist));
84 pie{ist}{k} = map_pie(PH{ist}{k});
85 D0{ist,k} = PH{ist}{k}{1};
86 if any(isnan(D0{ist,k}))
87 D0{ist,k} = -GlobalConstants.Immediate;
88 pie{ist}{k} = 1;
89 PH{ist}{k} = map_exponential(GlobalConstants.Immediate);
90 end
91 end
92 end
93end
94
95it_max = options.iter_max;
96for it=1:it_max
97 % Initialize departure processes (node-indexed: DEP{ind,r})
98 if it == 1
99 DEP = cell(I,K);
100 for ind=1:I
101 isForkJoin = (sn.nodetype(ind) == NodeType.Fork || sn.nodetype(ind) == NodeType.Join);
102 if sn.isstation(ind) && ~isForkJoin
103 ist = sn.nodeToStation(ind);
104 for r=1:K
105 if V(ist,r) > 0 && lambda(r) > 0
106 DEP{ind,r} = map_scale(PH{ist}{r}, 1 / (lambda(r) * V(ist,r)));
107 else
108 DEP{ind,r} = PH{ist}{r};
109 end
110 end
111 else
112 % Non-station nodes or Fork/Join: pass-through
113 % map_exponential takes MEAN interarrival time = 1/rate
114 for r=1:K
115 if lambda(r) > 0
116 DEP{ind,r} = map_exponential(1/lambda(r));
117 else
118 DEP{ind,r} = map_exponential(1/GlobalConstants.Immediate);
119 end
120 end
121 end
122 end
123 end
124
125 % Compute arrival processes with FJ synchronization
126 ARV = solver_mam_traffic_mmap(sn, DEP, config, fjSyncMap);
127
128 QN_1 = QN;
129 for ist=1:M
130 ind = sn.stationToNode(ist);
131 switch sn.nodetype(ind)
132 case NodeType.Join
133 % Join: throughput = arrival rate (flow conservation)
134 for k=1:K
135 TN(ist,k) = lambda(k);
136 UN(ist,k) = 0;
137 QN(ist,k) = 0;
138 RN(ist,k) = 0;
139 end
140 case NodeType.Queue
141 if ~isempty(ARV{ind}) && iscell(ARV{ind})
142 % Compress arrival process if too large
143 if length(ARV{ind}{1}) > config.space_max
144 if options.verbose
145 line_printf('\nArrival process at node %d is now at %d states. Compressing.', ind, length(ARV{ind}{1}));
146 end
147 ARV{ind} = mmap_compress(ARV{ind});
148 end
149
150 switch sn.sched(ist)
151 case {SchedStrategy.FCFS, SchedStrategy.HOL, SchedStrategy.FCFSPRPRIO}
152 [Qret{1:K}, ~] = MMAPPH1FCFS({ARV{ind}{[1,3:end]}}, {pie{ist}{:}}, {D0{ist,:}}, 'ncMoms', 1, 'ncDistr', 2);
153 for k=1:K
154 QN(ist,k) = sum(Qret{k});
155 end
156 TN(ist,:) = mmap_lambda(ARV{ind});
157 case SchedStrategy.PS
158 TN(ist,:) = mmap_lambda(ARV{ind});
159 for k=1:K
160 UN(ist,k) = TN(ist,k) * S(ist,k);
161 end
162 Uden = min([1-GlobalConstants.FineTol, sum(UN(ist,:))]);
163 for k=1:K
164 QN(ist,k) = UN(ist,k)/(1-Uden);
165 end
166 end
167
168 for k=1:K
169 UN(ist,k) = TN(ist,k) * map_mean(PH{ist}{k});
170 % add number of jobs at the surrogate delay server
171 QN(ist,k) = QN(ist,k) + TN(ist,k)*(map_mean(PH{ist}{k})*sn.nservers(ist)) * (sn.nservers(ist)-1)/sn.nservers(ist);
172 RN(ist,k) = QN(ist,k) ./ TN(ist,k);
173 end
174 end
175 otherwise
176 switch sn.sched(ist)
177 case SchedStrategy.INF
178 if ~isempty(ARV{ind}) && iscell(ARV{ind})
179 TN(ist,:) = mmap_lambda(ARV{ind});
180 end
181 for k=1:K
182 if TN(ist,k) > 0
183 UN(ist,k) = S(ist,k)*TN(ist,k);
184 QN(ist,k) = TN(ist,k)*S(ist,k);
185 RN(ist,k) = S(ist,k);
186 end
187 end
188 case SchedStrategy.EXT
189 % Source: already set TN above
190 end
191 end
192 end
193
194 % Check convergence
195 if it >= 3 && max(abs(QN(:)-QN_1(:))./(QN_1(:)+GlobalConstants.FineTol)) < options.iter_tol
196 break;
197 end
198
199 % Update departure processes
200 for ist=1:M
201 ind = sn.stationToNode(ist);
202 switch sn.nodetype(ind)
203 case NodeType.Queue
204 if ~isempty(ARV{ind}) && iscell(ARV{ind})
205 switch sn.sched(ist)
206 case {SchedStrategy.FCFS, SchedStrategy.HOL, SchedStrategy.FCFSPRPRIO}
207 for r=1:K
208 A = mmap_hide(ARV{ind}, setdiff(1:K,r));
209 Srv = PH{ist}{r};
210 na = length(A{1});
211 ns = length(Srv{1});
212 etaqa_n = config.etaqa_trunc;
213 etaqa_sz = (etaqa_n+1)*na*ns;
214 rho = sum(UN(ist,:));
215 if etaqa_sz <= config.space_max && rho < 1-GlobalConstants.FineTol
216 try
217 DEP{ind,r} = qbd_depproc_etaqa(A, Srv, etaqa_n);
218 DEP{ind,r} = map_normalize(DEP{ind,r});
219 catch
220 DEP{ind,r} = Srv;
221 end
222 else
223 DEP{ind,r} = Srv;
224 end
225 if V(ist,r) > 0 && lambda(r) > 0
226 DEP{ind,r} = map_scale(DEP{ind,r}, 1 / (lambda(r) * V(ist,r)));
227 end
228 end
229 case SchedStrategy.PS
230 for r=1:K
231 A = mmap_hide(ARV{ind}, setdiff(1:K,r));
232 Srv = PH{ist}{r};
233 na = length(A{1});
234 ns = length(Srv{1});
235 etaqa_n = config.etaqa_trunc;
236 etaqa_sz = (etaqa_n+1)*na*ns;
237 rho = sum(UN(ist,:));
238 if V(ist,r) > 0 && lambda(r) > 0
239 if etaqa_sz <= config.space_max && rho < 1-GlobalConstants.FineTol
240 try
241 DEP{ind,r} = qbd_depproc_etaqa_ps(A, Srv, etaqa_n);
242 DEP{ind,r} = map_normalize(DEP{ind,r});
243 catch
244 DEP{ind,r} = Srv;
245 end
246 else
247 DEP{ind,r} = Srv;
248 end
249 DEP{ind,r} = map_scale(DEP{ind,r}, 1 / (lambda(r) * V(ist,r)));
250 end
251 end
252 end
253 end
254 case NodeType.Join
255 % Join: departure = synchronized arrival (pass-through)
256 % map_exponential takes MEAN interarrival time = 1/rate
257 for r=1:K
258 if TN(ist,r) > 0
259 DEP{ind,r} = map_exponential(1/TN(ist,r));
260 end
261 end
262 end
263 % Fork nodes keep their initial departure processes (pass-through)
264 end
265end
266
267totiter = it;
268if options.verbose
269 line_printf('\nMAM FJ parametric decomposition completed in %d iterations.', it);
270end
271
272% Join nodes represent synchronization delay, not service demand. Recover
273% their queue length and response time from the parallel branch means.
274for joinIdx = find(sn.nodetype == NodeType.Join)'
275 joinStat = sn.nodeToStation(joinIdx);
276 if isnan(joinStat)
277 continue;
278 end
279 syncGroups = unique(fjSyncMap.nodeSync(joinIdx, :));
280 syncGroups = syncGroups(syncGroups > 0);
281 for r = 1:K
282 if TN(joinStat, r) <= 0
283 continue;
284 end
285 syncDelay = 0;
286 joinArrivalRate = 0;
287 for gid = syncGroups
288 branchNodes = find(fjSyncMap.nodeSync(joinIdx, :) == gid);
289 branchRt = zeros(1, numel(branchNodes));
290 branchTput = zeros(1, numel(branchNodes));
291 used = 0;
292 for b = 1:numel(branchNodes)
293 branchStat = sn.nodeToStation(branchNodes(b));
294 if isnan(branchStat) || RN(branchStat, r) <= 0
295 continue;
296 end
297 used = used + 1;
298 branchRt(used) = RN(branchStat, r);
299 branchTput(used) = TN(branchStat, r);
300 end
301 branchRt = branchRt(1:used);
302 branchTput = branchTput(1:used);
303 if numel(branchRt) < 2
304 continue;
305 end
306 lambdai = 1 ./ branchRt;
307 maxBranchRt = 0;
308 for pow = 0:(numel(branchRt) - 1)
309 maxBranchRt = maxBranchRt + (-1)^pow * sum(1 ./ sum(nchoosek(lambdai, pow + 1), 2));
310 end
311 syncDelay = syncDelay + max(maxBranchRt - mean(branchRt), 0);
312 joinArrivalRate = joinArrivalRate + sum(branchTput);
313 end
314 RN(joinStat, r) = syncDelay;
315 QN(joinStat, r) = joinArrivalRate * syncDelay;
316 UN(joinStat, r) = 0;
317 end
318end
319
320% Post-processing
321CN = sum(RN,1);
322QN(isnan(QN)) = 0;
323RN(isnan(RN)) = 0;
324UN(isnan(UN)) = 0;
325TN(isnan(TN)) = 0;
326end
Definition mmt.m:124