LINE Solver
MATLAB API documentation
Loading...
Searching...
No Matches
solver_mam_basic_fj.m
1function [QN,UN,RN,TN,CN,XN,totiter] = solver_mam_basic_fj(sn, options)
2% [QN,UN,RN,TN,CN,XN,TOTITER] = SOLVER_MAM_BASIC_FJ(SN, OPTIONS)
3%
4% MAM decomposition solver with fork-join synchronization via mmap_max.
5% Follows the solver_mam.m (dec.mmap) iteration pattern with FJ-aware
6% traffic analysis using solver_mam_traffic_fj.
7%
8% Supports open networks with general FJ topologies and heterogeneous
9% service distributions at parallel queues.
10%
11% Copyright (c) 2012-2026, Imperial College London
12% All rights reserved.
13
14config = options.config;
15if ~isfield(config, 'fj_sync_q_len')
16 config.fj_sync_q_len = 2;
17end
18
19PH = sn.proc;
20I = sn.nnodes;
21M = sn.nstations;
22K = sn.nclasses;
23C = sn.nchains;
24N = sn.njobs';
25V = cellsum(sn.visits);
26S = 1./sn.rates;
27
28QN = zeros(M,K);
29UN = zeros(M,K);
30RN = zeros(M,K);
31TN = zeros(M,K);
32CN = zeros(1,K);
33XN = zeros(1,K);
34
35% Build FJ synchronization map
36fjSyncMap = sn_build_fj_sync_map(sn);
37
38% Determine per-chain arrival rates
39lambda = zeros(1,K);
40for c=1:C
41 inchain = sn.inchain{c};
42 lambdas_inchain = sn.rates(sn.refstat(inchain(1)),inchain);
43 lambdas_inchain = lambdas_inchain(isfinite(lambdas_inchain));
44 lambda(inchain) = sum(lambdas_inchain);
45end
46
47% Prepare PH service distributions
48pie = {};
49D0 = {};
50for ist=1:M
51 switch sn.sched(ist)
52 case SchedStrategy.EXT
53 TN(ist,:) = sn.rates(ist,:);
54 TN(ist,isnan(TN(ist,:))) = 0;
55 case {SchedStrategy.FCFS, SchedStrategy.HOL, SchedStrategy.FCFSPRPRIO}
56 for k=1:K
57 % divide service time by number of servers and put
58 % later a surrogate delay server in tandem to compensate
59 PH{ist}{k} = map_scale(PH{ist}{k}, map_mean(PH{ist}{k})/sn.nservers(ist));
60 pie{ist}{k} = map_pie(PH{ist}{k});
61 D0{ist,k} = PH{ist}{k}{1};
62 if any(isnan(D0{ist,k}))
63 D0{ist,k} = -GlobalConstants.Immediate;
64 pie{ist}{k} = 1;
65 PH{ist}{k} = map_exponential(GlobalConstants.Immediate);
66 end
67 end
68 case SchedStrategy.INF
69 for k=1:K
70 pie{ist}{k} = map_pie(PH{ist}{k});
71 D0{ist,k} = PH{ist}{k}{1};
72 if any(isnan(D0{ist,k}))
73 D0{ist,k} = -GlobalConstants.Immediate;
74 pie{ist}{k} = 1;
75 PH{ist}{k} = map_exponential(GlobalConstants.Immediate);
76 end
77 end
78 case SchedStrategy.PS
79 for k=1:K
80 PH{ist}{k} = map_scale(PH{ist}{k}, map_mean(PH{ist}{k})/sn.nservers(ist));
81 pie{ist}{k} = map_pie(PH{ist}{k});
82 D0{ist,k} = PH{ist}{k}{1};
83 if any(isnan(D0{ist,k}))
84 D0{ist,k} = -GlobalConstants.Immediate;
85 pie{ist}{k} = 1;
86 PH{ist}{k} = map_exponential(GlobalConstants.Immediate);
87 end
88 end
89 end
90end
91
92it_max = options.iter_max;
93for it=1:it_max
94 % Initialize departure processes (node-indexed: DEP{ind,r})
95 if it == 1
96 DEP = cell(I,K);
97 for ind=1:I
98 isForkJoin = (sn.nodetype(ind) == NodeType.Fork || sn.nodetype(ind) == NodeType.Join);
99 if sn.isstation(ind) && ~isForkJoin
100 ist = sn.nodeToStation(ind);
101 for r=1:K
102 if V(ist,r) > 0 && lambda(r) > 0
103 DEP{ind,r} = map_scale(PH{ist}{r}, 1 / (lambda(r) * V(ist,r)));
104 else
105 DEP{ind,r} = PH{ist}{r};
106 end
107 end
108 else
109 % Non-station nodes or Fork/Join: pass-through
110 % map_exponential takes MEAN interarrival time = 1/rate
111 for r=1:K
112 if lambda(r) > 0
113 DEP{ind,r} = map_exponential(1/lambda(r));
114 else
115 DEP{ind,r} = map_exponential(1/GlobalConstants.Immediate);
116 end
117 end
118 end
119 end
120 end
121
122 % Compute arrival processes with FJ synchronization
123 ARV = solver_mam_traffic_fj(sn, DEP, config, fjSyncMap);
124
125 QN_1 = QN;
126 for ist=1:M
127 ind = sn.stationToNode(ist);
128 switch sn.nodetype(ind)
129 case NodeType.Join
130 % Join: throughput from synced arrival rate, no queueing
131 if ~isempty(ARV{ind}) && iscell(ARV{ind})
132 arrRates = mmap_lambda(ARV{ind});
133 for k=1:K
134 TN(ist,k) = arrRates(k);
135 UN(ist,k) = 0;
136 QN(ist,k) = 0;
137 RN(ist,k) = 0;
138 end
139 end
140 case NodeType.Queue
141 if ~isempty(ARV{ind}) && iscell(ARV{ind})
142 % Compress arrival process if too large
143 if length(ARV{ind}{1}) > config.space_max
144 if options.verbose
145 line_printf('\nArrival process at node %d is now at %d states. Compressing.', ind, length(ARV{ind}{1}));
146 end
147 ARV{ind} = mmap_compress(ARV{ind});
148 end
149
150 switch sn.sched(ist)
151 case {SchedStrategy.FCFS, SchedStrategy.HOL, SchedStrategy.FCFSPRPRIO}
152 [Qret{1:K}, ~] = MMAPPH1FCFS({ARV{ind}{[1,3:end]}}, {pie{ist}{:}}, {D0{ist,:}}, 'ncMoms', 1, 'ncDistr', 2);
153 for k=1:K
154 QN(ist,k) = sum(Qret{k});
155 end
156 TN(ist,:) = mmap_lambda(ARV{ind});
157 case SchedStrategy.PS
158 TN(ist,:) = mmap_lambda(ARV{ind});
159 for k=1:K
160 UN(ist,k) = TN(ist,k) * S(ist,k);
161 end
162 Uden = min([1-GlobalConstants.FineTol, sum(UN(ist,:))]);
163 for k=1:K
164 QN(ist,k) = UN(ist,k)/(1-Uden);
165 end
166 end
167
168 for k=1:K
169 UN(ist,k) = TN(ist,k) * map_mean(PH{ist}{k});
170 % add number of jobs at the surrogate delay server
171 QN(ist,k) = QN(ist,k) + TN(ist,k)*(map_mean(PH{ist}{k})*sn.nservers(ist)) * (sn.nservers(ist)-1)/sn.nservers(ist);
172 RN(ist,k) = QN(ist,k) ./ TN(ist,k);
173 end
174 end
175 otherwise
176 switch sn.sched(ist)
177 case SchedStrategy.INF
178 if ~isempty(ARV{ind}) && iscell(ARV{ind})
179 TN(ist,:) = mmap_lambda(ARV{ind});
180 end
181 for k=1:K
182 if TN(ist,k) > 0
183 UN(ist,k) = S(ist,k)*TN(ist,k);
184 QN(ist,k) = TN(ist,k)*S(ist,k);
185 RN(ist,k) = S(ist,k);
186 end
187 end
188 case SchedStrategy.EXT
189 % Source: already set TN above
190 end
191 end
192 end
193
194 % Check convergence
195 if it >= 3 && max(abs(QN(:)-QN_1(:))./(QN_1(:)+GlobalConstants.FineTol)) < options.iter_tol
196 break;
197 end
198
199 % Update departure processes
200 for ist=1:M
201 ind = sn.stationToNode(ist);
202 switch sn.nodetype(ind)
203 case NodeType.Queue
204 if ~isempty(ARV{ind}) && iscell(ARV{ind})
205 switch sn.sched(ist)
206 case {SchedStrategy.FCFS, SchedStrategy.HOL, SchedStrategy.FCFSPRPRIO}
207 [Ret{1:2*K}] = MMAPPH1FCFS({ARV{ind}{[1,3:end]}}, {pie{ist}{:}}, {D0{ist,:}}, 'stDistrPH');
208 for r=1:K
209 % define a ph for the arrival process of class r
210 A = mmap_hide(ARV{ind}, setdiff(1:K,r));
211 tA = sum(A{2},2);
212 pieA = map_pie(A);
213
214 % define a ph for the service process of class r
215 Srv = PH{ist}{r};
216 pieS = map_pie(Srv);
217 tS = sum(Srv{2},2);
218
219 rho = sum(UN(ist,:));
220 AQ = sum(QN(ist,:));
221 Afull = AQ/max(rho, GlobalConstants.FineTol);
222 pfullonarrival = 1 - (Afull)/(1+Afull);
223
224 A = mmap_scale(A, map_mean(A)-map_mean(Srv));
225
226 zAS = 0*tA*pieS;
227 zSA = 0*tS*pieA;
228 zA = 0*A{2};
229
230 DEP0ir = [Srv{1}, sum(rho)*tS*pieA;
231 zAS, A{1}];
232
233 DEP1ir = [(1-sum(rho))*Srv{2}, zSA;
234 tA*pieS, zA];
235
236 DEP{ind,r} = map_normalize({DEP0ir, DEP1ir});
237 if V(ist,r) > 0 && lambda(r) > 0
238 DEP{ind,r} = map_scale(DEP{ind,r}, 1 / (lambda(r) * V(ist,r)));
239 end
240 end
241 case SchedStrategy.PS
242 % For PS, use simpler departure process update
243 for r=1:K
244 if V(ist,r) > 0 && lambda(r) > 0
245 DEP{ind,r} = map_scale(PH{ist}{r}, 1 / (lambda(r) * V(ist,r)));
246 end
247 end
248 end
249 end
250 case NodeType.Join
251 % Join: departure = synchronized arrival (pass-through)
252 % map_exponential takes MEAN interarrival time = 1/rate
253 for r=1:K
254 if TN(ist,r) > 0
255 DEP{ind,r} = map_exponential(1/TN(ist,r));
256 end
257 end
258 end
259 % Fork nodes keep their initial departure processes (pass-through)
260 end
261end
262
263totiter = it;
264if options.verbose
265 line_printf('\nMAM FJ parametric decomposition completed in %d iterations.', it);
266end
267
268% Post-processing
269CN = sum(RN,1);
270QN(isnan(QN)) = 0;
271RN(isnan(RN)) = 0;
272UN(isnan(UN)) = 0;
273TN(isnan(TN)) = 0;
274end
Definition mmt.m:93