LINE Solver
MATLAB API documentation
Loading...
Searching...
No Matches
buildLayersRecursive.m
1function buildLayersRecursive(self, idx, callers, ishostlayer)
2lqn = self.lqn;
3jobPosKey = zeros(lqn.nidx,1);
4curClassKey = cell(lqn.nidx,1);
5% Fan-out check: when all callers have fan-out >= nreplicas for this task,
6% each replica sees the full caller traffic (fork-join semantics).
7% Model a single representative replica; updateThinkTimes multiplies by K.
8% For host layers: if all caller tasks have the same replication as the host,
9% the host is co-replicated with the task, so also use single-replica modeling.
10rawReplicas = lqn.repl(idx);
11reduceFanout = false;
12if rawReplicas > 1 && ~isempty(callers)
13 if ~ishostlayer && isfield(lqn, 'fanout') && ~isempty(lqn.fanout)
14 reduceFanout = true;
15 for c = callers(:)'
16 if lqn.fanout(c, idx) < rawReplicas
17 reduceFanout = false;
18 break;
19 end
20 end
21 elseif ishostlayer
22 reduceFanout = true;
23 for c = callers(:)'
24 if lqn.repl(c) ~= rawReplicas
25 reduceFanout = false;
26 break;
27 end
28 end
29 end
30end
31if reduceFanout
32 nreplicas = 1;
33 if ~ishostlayer
34 self.singleReplicaTasks(end+1) = idx;
35 end
36else
37 nreplicas = rawReplicas;
38end
39%mult = lqn.mult;
40mult = lqn.maxmult; % this removes spare capacity that cannot be used
41lqn.mult = mult;
42callservtproc = self.callservtproc;
43model = Network(lqn.hashnames{idx});
44model.setChecks(false); % fast mode
45model.attribute = struct('hosts',[],'tasks',[],'entries',[],'activities',[],'calls',[],'serverIdx',0);
46if ishostlayer | any(any(lqn.issynccaller(callers, lqn.entriesof{idx}))) | any(any(lqn.isasynccaller(callers, lqn.entriesof{idx}))) %#ok<OR2>
47 clientDelay = Delay(model, 'Clients');
48 model.attribute.clientIdx = 1;
49 model.attribute.serverIdx = 2;
50 model.attribute.sourceIdx = NaN;
51else
52 model.attribute.serverIdx = 1;
53 model.attribute.clientIdx = NaN;
54 model.attribute.sourceIdx = NaN;
55end
56serverStation = cell(1,nreplicas);
57isfunctionlayer = all(lqn.isfunction(callers)) && ishostlayer;
58for m=1:nreplicas
59 if m == 1
60 serverStation{m} = Queue(model,lqn.hashnames{idx}, lqn.sched(idx));
61 else
62 serverStation{m} = Queue(model,[lqn.hashnames{idx},'.',num2str(m)], lqn.sched(idx));
63 end
64 serverStation{m}.setNumberOfServers(mult(idx));
65 serverStation{m}.attribute.ishost = ishostlayer;
66 serverStation{m}.attribute.idx = idx;
67end
68
69iscachelayer = all(lqn.iscache(callers)) && ishostlayer;
70if iscachelayer
71 cacheNode = Cache(model, lqn.hashnames{callers}, lqn.nitems(callers), lqn.itemcap{callers}, lqn.replacestrat(callers));
72end
73
74actsInCaller = [lqn.actsof{callers}];
75isPostAndAct = full(lqn.actposttype)==ActivityPrecedenceType.POST_AND;
76isPreAndAct = full(lqn.actpretype)==ActivityPrecedenceType.PRE_AND;
77hasfork = any(intersect(find(isPostAndAct),actsInCaller));
78
79maxfanout = 1; % maximum output parallelism level of fork nodes
80for aidx = actsInCaller(:)'
81 successors = find(lqn.graph(aidx,:));
82 if any(isPostAndAct(successors))
83 maxfanout = max(maxfanout, sum(isPostAndAct(successors)));
84 end
85end
86
87if hasfork
88 forkNode = Fork(model, 'Fork_PostAnd');
89 for f=1:maxfanout
90 forkOutputRouter{f} = Router(model, ['Fork_PostAnd_',num2str(f)]);
91 end
92 forkClassStack = []; % stack with the entry class at the visited forks, the last visited is end of the list.
93end
94
95isPreAndAct = full(lqn.actpretype)==ActivityPrecedenceType.PRE_AND;
96hasjoin = any(isPreAndAct(actsInCaller));
97if hasjoin
98 joinNode = Join(model, 'Join_PreAnd', forkNode);
99end
100
101aidxClass = cell(1, lqn.nidx);
102aidxThinkClass = cell(1, lqn.nidx); % auxiliary classes for activity think-time
103cidxClass = cell(1,0);
104cidxAuxClass = cell(1,0);
105
106self.servt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % server classes to update
107self.thinkt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % client classes to update
108self.actthinkt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % activity think-time classes to update
109self.arvproc_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % classes to update in the next iteration for asynch calls
110self.call_classes_updmap{idx} = zeros(0,4); % [modelidx, callidx, node, class] % calls classes to update in the next iteration (includes calls in client classes)
111self.route_prob_updmap{idx} = zeros(0,7); % [modelidx, actidxfrom, actidxto, nodefrom, nodeto, classfrom, classto] % routing probabilities to update in the next iteration
112
113if ishostlayer
114 model.attribute.hosts(end+1,:) = [NaN, model.attribute.serverIdx ];
115else
116 model.attribute.tasks(end+1,:) = [NaN, model.attribute.serverIdx ];
117end
118
119hasSource = false; % flag whether a source is needed
120openClasses = [];
121entryOpenClasses = []; % track entry-level open arrivals
122% first pass: create the classes
123for tidx_caller = callers
124 % For host layers, check if the task has any entries with sync/async callers
125 % or has open arrivals, OR if any entry is a forwarding target.
126 hasDirectCallers = false;
127 isForwardingTarget = false;
128 if ishostlayer
129 % Check if the task is a reference task (always create closed class)
130 if lqn.isref(tidx_caller)
131 hasDirectCallers = true;
132 else
133 % Check if any entry of this task has sync or async callers
134 for eidx = lqn.entriesof{tidx_caller}
135 if any(full(lqn.issynccaller(:, eidx))) || any(full(lqn.isasynccaller(:, eidx)))
136 hasDirectCallers = true;
137 break;
138 end
139 % Also check for open arrivals on this entry
140 if isfield(lqn, 'arrival') && ~isempty(lqn.arrival) && ...
141 iscell(lqn.arrival) && eidx <= length(lqn.arrival) && ...
142 ~isempty(lqn.arrival{eidx})
143 hasDirectCallers = true;
144 break;
145 end
146 % Check if this entry is a forwarding target
147 for cidx = 1:lqn.ncalls
148 if full(lqn.calltype(cidx)) == CallType.FWD && full(lqn.callpair(cidx, 2)) == eidx
149 isForwardingTarget = true;
150 break;
151 end
152 end
153 end
154 end
155 end
156 if (ishostlayer && (hasDirectCallers || isForwardingTarget)) | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2> % if it is only an asynch caller the closed classes are not needed
157 if self.njobs(tidx_caller,idx) == 0
158 % for each entry of the calling task
159 % determine job population
160 % this block matches the corresponding calculations in
161 % updateThinkTimes
162 % Use single-replica njobs if this layer or the caller is in single-replica mode
163 callerIsSingleReplica = reduceFanout || any(self.singleReplicaTasks == tidx_caller);
164 if callerIsSingleReplica
165 njobs = mult(tidx_caller);
166 else
167 njobs = mult(tidx_caller)*lqn.repl(tidx_caller);
168 end
169 if isinf(njobs)
170 callers_of_tidx_caller = find(lqn.taskgraph(:,tidx_caller));
171 njobs = sum(mult(callers_of_tidx_caller)); %#ok<FNDSB>
172 if isinf(njobs)
173 % if also the callers of tidx_caller are inf servers, then use
174 % an heuristic
175 njobs = min(sum(mult(isfinite(mult)) .* lqn.repl(isfinite(mult))),1000); % Python parity: cap at 1000
176 end
177 end
178 self.njobs(tidx_caller,idx) = njobs;
179 else
180 njobs = self.njobs(tidx_caller,idx);
181 end
182 caller_name = lqn.hashnames{tidx_caller};
183 aidxClass{tidx_caller} = ClosedClass(model, caller_name, njobs, clientDelay);
184 clientDelay.setService(aidxClass{tidx_caller}, Disabled.getInstance());
185 for m=1:nreplicas
186 serverStation{m}.setService(aidxClass{tidx_caller}, Disabled.getInstance());
187 end
188 aidxClass{tidx_caller}.completes = false;
189 aidxClass{tidx_caller}.setReferenceClass(true); % renormalize residence times using the visits to the task
190 aidxClass{tidx_caller}.attribute = [LayeredNetworkElement.TASK, tidx_caller];
191 model.attribute.tasks(end+1,:) = [aidxClass{tidx_caller}.index, tidx_caller];
192 clientDelay.setService(aidxClass{tidx_caller}, self.thinkproc{tidx_caller});
193 if ~lqn.isref(tidx_caller)
194 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, tidx_caller, 1, aidxClass{tidx_caller}.index];
195 end
196 for eidx = lqn.entriesof{tidx_caller}
197 % create a class
198 aidxClass{eidx} = ClosedClass(model, lqn.hashnames{eidx}, 0, clientDelay);
199 clientDelay.setService(aidxClass{eidx}, Disabled.getInstance());
200 for m=1:nreplicas
201 serverStation{m}.setService(aidxClass{eidx}, Disabled.getInstance());
202 end
203 aidxClass{eidx}.completes = false;
204 aidxClass{eidx}.attribute = [LayeredNetworkElement.ENTRY, eidx];
205 model.attribute.entries(end+1,:) = [aidxClass{eidx}.index, eidx];
206 [singleton, javasingleton] = Immediate.getInstance();
207 if isempty(model.obj)
208 clientDelay.setService(aidxClass{eidx}, singleton);
209 else
210 clientDelay.setService(aidxClass{eidx}, javasingleton);
211 end
212
213 % Check for open arrival distribution on this entry
214 if isfield(lqn, 'arrival') && ~isempty(lqn.arrival) && ...
215 iscell(lqn.arrival) && eidx <= length(lqn.arrival) && ...
216 ~isempty(lqn.arrival{eidx})
217
218 if ~hasSource
219 hasSource = true;
220 model.attribute.sourceIdx = length(model.nodes)+1;
221 sourceStation = Source(model,'Source');
222 sinkStation = Sink(model,'Sink');
223 end
224
225 % Create open class for this entry
226 openClassForEntry = OpenClass(model, [lqn.hashnames{eidx}, '_Open'], 0);
227 sourceStation.setArrival(openClassForEntry, lqn.arrival{eidx});
228 clientDelay.setService(openClassForEntry, Disabled.getInstance());
229
230 % Use bound activity's service time (entries themselves have Immediate service)
231 % Find activities bound to this entry via graph
232 bound_act_indices = find(lqn.graph(eidx,:) > 0);
233 if ~isempty(bound_act_indices)
234 % Use first bound activity's service time
235 bound_aidx = bound_act_indices(1);
236 for m=1:nreplicas
237 serverStation{m}.setService(openClassForEntry, self.servtproc{bound_aidx});
238 end
239 else
240 % Fallback to entry service (should not happen in well-formed models)
241 for m=1:nreplicas
242 serverStation{m}.setService(openClassForEntry, self.servtproc{eidx});
243 end
244 end
245
246 % Track for routing setup later: [class_index, entry_index]
247 entryOpenClasses(end+1,:) = [openClassForEntry.index, eidx];
248
249 % Track: Use negative entry index to distinguish from call arrivals
250 self.arvproc_classes_updmap{idx}(end+1,:) = [idx, -eidx, ...
251 model.getNodeIndex(sourceStation), openClassForEntry.index];
252
253 openClassForEntry.completes = false;
254 openClassForEntry.attribute = [LayeredNetworkElement.ENTRY, eidx];
255 end
256 end
257 end
258
259 % for each activity of the calling task
260 for aidx = lqn.actsof{tidx_caller}
261 if ishostlayer | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2>
262 % create a class
263 aidxClass{aidx} = ClosedClass(model, lqn.hashnames{aidx}, 0, clientDelay);
264 clientDelay.setService(aidxClass{aidx}, Disabled.getInstance());
265 for m=1:nreplicas
266 serverStation{m}.setService(aidxClass{aidx}, Disabled.getInstance());
267 end
268 aidxClass{aidx}.completes = false;
269 aidxClass{aidx}.attribute = [LayeredNetworkElement.ACTIVITY, aidx];
270 model.attribute.activities(end+1,:) = [aidxClass{aidx}.index, aidx];
271 hidx = lqn.parent(lqn.parent(aidx)); % index of host processor
272 if ~(ishostlayer && (hidx == idx))
273 % set the host demand for the activity
274 clientDelay.setService(aidxClass{aidx}, self.servtproc{aidx});
275 end
276 if lqn.sched(tidx_caller)~=SchedStrategy.REF % in 'ref' case the service activity is constant
277 % updmap(end+1,:) = [idx, aidx, 1, idxClass{aidx}.index];
278 end
279 if iscachelayer && full(lqn.graph(eidx,aidx))
280 clientDelay.setService(aidxClass{aidx}, self.servtproc{aidx});
281 end
282
283 % Create auxiliary think-time class if activity has think-time
284 if ~isempty(lqn.actthink{aidx}) && lqn.actthink{aidx}.getMean() > GlobalConstants.FineTol
285 aidxThinkClass{aidx} = ClosedClass(model, [lqn.hashnames{aidx},'.Think'], 0, clientDelay);
286 aidxThinkClass{aidx}.completes = false;
287 aidxThinkClass{aidx}.attribute = [LayeredNetworkElement.ACTIVITY, aidx];
288 clientDelay.setService(aidxThinkClass{aidx}, lqn.actthink{aidx});
289 for m=1:nreplicas
290 serverStation{m}.setService(aidxThinkClass{aidx}, Disabled.getInstance());
291 end
292 self.actthinkt_classes_updmap{idx}(end+1,:) = [idx, aidx, 1, aidxThinkClass{aidx}.index];
293 end
294 end
295 % add a class for each outgoing call from this activity
296 for cidx = lqn.callsof{aidx}
297 callmean(cidx) = lqn.callproc{cidx}.getMean;
298 switch lqn.calltype(cidx)
299 case CallType.ASYNC
300 if lqn.parent(lqn.callpair(cidx,2)) == idx % add only if the target is serverStation
301 if ~hasSource % we need to add source and sink to the model
302 hasSource = true;
303 model.attribute.sourceIdx = length(model.nodes)+1;
304 sourceStation = Source(model,'Source');
305 sinkStation = Sink(model,'Sink');
306 end
307 cidxClass{cidx} = OpenClass(model, lqn.callhashnames{cidx}, 0);
308 sourceStation.setArrival(cidxClass{cidx}, Immediate.getInstance());
309 clientDelay.setService(cidxClass{cidx}, Disabled.getInstance());
310 for m=1:nreplicas
311 serverStation{m}.setService(cidxClass{cidx}, Immediate.getInstance());
312 end
313 openClasses(end+1,:) = [cidxClass{cidx}.index, callmean(cidx), cidx];
314 model.attribute.calls(end+1,:) = [cidxClass{cidx}.index, cidx, lqn.callpair(cidx,1), lqn.callpair(cidx,2)];
315 cidxClass{cidx}.completes = false;
316 cidxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
317 minRespT = 0;
318 for tidx_act = lqn.actsof{idx}
319 minRespT = minRespT + lqn.hostdem{tidx_act}.getMean; % upper bound, uses all activities not just the ones reachable by this entry
320 end
321 for m=1:nreplicas
322 serverStation{m}.setService(cidxClass{cidx}, Exp.fitMean(minRespT));
323 end
324 end
325 case CallType.SYNC
326 cidxClass{cidx} = ClosedClass(model, lqn.callhashnames{cidx}, 0, clientDelay);
327 clientDelay.setService(cidxClass{cidx}, Disabled.getInstance());
328 for m=1:nreplicas
329 serverStation{m}.setService(cidxClass{cidx}, Disabled.getInstance());
330 end
331 model.attribute.calls(end+1,:) = [cidxClass{cidx}.index, cidx, lqn.callpair(cidx,1), lqn.callpair(cidx,2)];
332 cidxClass{cidx}.completes = false;
333 cidxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
334 minRespT = 0;
335 for tidx_act = lqn.actsof{idx}
336 minRespT = minRespT + lqn.hostdem{tidx_act}.getMean; % upper bound, uses all activities not just the ones reachable by this entry
337 end
338 for m=1:nreplicas
339 serverStation{m}.setService(cidxClass{cidx}, Exp.fitMean(minRespT));
340 end
341 end
342
343 if callmean(cidx) ~= nreplicas
344 switch lqn.calltype(cidx)
345 case CallType.SYNC
346 cidxAuxClass{cidx} = ClosedClass(model, [lqn.callhashnames{cidx},'.Aux'], 0, clientDelay);
347 cidxAuxClass{cidx}.completes = false;
348 cidxAuxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
349 clientDelay.setService(cidxAuxClass{cidx}, Immediate.getInstance());
350 for m=1:nreplicas
351 serverStation{m}.setService(cidxAuxClass{cidx}, Disabled.getInstance());
352 end
353 end
354 end
355
356 % For SYNC calls in task layers, create classes for forwarding
357 % calls from the target entry. This implements synthetic
358 % synchronization: the caller blocks until the forwarding
359 % target completes, modeling contention correctly.
360 % FWD calls have the source ENTRY (not activity) in callpair(:,1),
361 % so we scan all calls to find FWD calls from the target entry.
362 if ~ishostlayer && lqn.calltype(cidx) == CallType.SYNC
363 target_eidx = lqn.callpair(cidx, 2);
364 for fwd_cidx_iter = 1:lqn.ncalls
365 if lqn.calltype(fwd_cidx_iter) == CallType.FWD && lqn.callpair(fwd_cidx_iter, 1) == target_eidx
366 callmean(fwd_cidx_iter) = lqn.callproc{fwd_cidx_iter}.getMean;
367 cidxClass{fwd_cidx_iter} = ClosedClass(model, lqn.callhashnames{fwd_cidx_iter}, 0, clientDelay);
368 cidxClass{fwd_cidx_iter}.completes = false;
369 cidxClass{fwd_cidx_iter}.attribute = [LayeredNetworkElement.CALL, fwd_cidx_iter];
370 clientDelay.setService(cidxClass{fwd_cidx_iter}, Disabled.getInstance());
371 for m=1:nreplicas
372 serverStation{m}.setService(cidxClass{fwd_cidx_iter}, Disabled.getInstance());
373 end
374 model.attribute.calls(end+1,:) = [cidxClass{fwd_cidx_iter}.index, fwd_cidx_iter, lqn.callpair(fwd_cidx_iter,1), lqn.callpair(fwd_cidx_iter,2)];
375 end
376 end
377 end
378 end
379 end
380end
381
382% Ensure Source's sourceClasses and arrivalProcess arrays are properly sized for all classes
383% This is needed because the Source may be created during class iteration
384% when only some classes exist, and new closed classes added afterwards
385% won't have corresponding entries in sourceClasses/arrivalProcess
386if hasSource
387 nClasses = model.getNumberOfClasses();
388 for k = 1:nClasses
389 if k > length(sourceStation.input.sourceClasses) || isempty(sourceStation.input.sourceClasses{k})
390 sourceStation.input.sourceClasses{k} = {[], ServiceStrategy.LI, Disabled.getInstance()};
391 end
392 if k > length(sourceStation.arrivalProcess) || isempty(sourceStation.arrivalProcess{k})
393 sourceStation.arrivalProcess{k} = Disabled.getInstance();
394 end
395 end
396end
397
398P = model.initRoutingMatrix;
399if hasSource
400 for o = 1:size(openClasses,1)
401 oidx = openClasses(o,1);
402 p = 1 / openClasses(o,2); % divide by mean number of calls, they go to a server at random
403 for m=1:nreplicas
404 P{model.classes{oidx}, model.classes{oidx}}(sourceStation,serverStation{m}) = 1/nreplicas;
405 for n=1:nreplicas
406 P{model.classes{oidx}, model.classes{oidx}}(serverStation{m},serverStation{n}) = (1-p)/nreplicas;
407 end
408 P{model.classes{oidx}, model.classes{oidx}}(serverStation{m},sinkStation) = p;
409 end
410 cidx = openClasses(o,3); % 3 = source
411 self.arvproc_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(sourceStation), oidx];
412 for m=1:nreplicas
413 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), oidx];
414 end
415 end
416end
417
418%% job positions are encoded as follows: 1=client, 2=any of the nreplicas server stations, 3=cache node, 4=fork node, 5=join node
419atClient = 1;
420atServer = 2;
421atCache = 3;
422
423jobPos = atClient; % start at client
424% second pass: setup the routing out of entries
425for tidx_caller = callers
426 % Use same condition as first pass - only process if closed class was created
427 hasDirectCallers = false;
428 isForwardingTarget = false;
429 if ishostlayer
430 if lqn.isref(tidx_caller)
431 hasDirectCallers = true;
432 else
433 for eidx_check = lqn.entriesof{tidx_caller}
434 if any(full(lqn.issynccaller(:, eidx_check))) || any(full(lqn.isasynccaller(:, eidx_check)))
435 hasDirectCallers = true;
436 break;
437 end
438 if isfield(lqn, 'arrival') && ~isempty(lqn.arrival) && ...
439 iscell(lqn.arrival) && eidx_check <= length(lqn.arrival) && ...
440 ~isempty(lqn.arrival{eidx_check})
441 hasDirectCallers = true;
442 break;
443 end
444 % Check if this entry is a forwarding target
445 for cidx_fwd = 1:lqn.ncalls
446 if full(lqn.calltype(cidx_fwd)) == CallType.FWD && full(lqn.callpair(cidx_fwd, 2)) == eidx_check
447 isForwardingTarget = true;
448 break;
449 end
450 end
451 end
452 end
453 end
454 if (ishostlayer && (hasDirectCallers || isForwardingTarget)) | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2>
455 % for each entry of the calling task
456 ncaller_entries = length(lqn.entriesof{tidx_caller});
457 for eidx = lqn.entriesof{tidx_caller}
458 aidxClass_eidx = aidxClass{eidx};
459 aidxClass_tidx_caller = aidxClass{tidx_caller};
460 % initialize the probability to select an entry to be identical
461 P{aidxClass_tidx_caller, aidxClass_eidx}(clientDelay, clientDelay) = 1 / ncaller_entries;
462 if ncaller_entries > 1
463 % at successive iterations make sure to replace this with throughput ratio
464 self.route_prob_updmap{idx}(end+1,:) = [idx, tidx_caller, eidx, 1, 1, aidxClass_tidx_caller.index, aidxClass_eidx.index];
465 end
466 P = recurActGraph(P, tidx_caller, eidx, aidxClass_eidx, jobPos);
467 end
468 end
469end
470
471% Setup routing for entry-level open arrivals (AFTER recurActGraph to avoid being overwritten)
472if hasSource && ~isempty(entryOpenClasses)
473 for e = 1:size(entryOpenClasses,1)
474 eoidx = entryOpenClasses(e,1); % class index
475 openClass = model.classes{eoidx};
476
477 % Explicitly set routing: ONLY Source → Server → Sink
478 % Zero out all routing for this class first
479 for node1 = 1:length(model.nodes)
480 for node2 = 1:length(model.nodes)
481 P{openClass, openClass}(node1, node2) = 0;
482 end
483 end
484
485 % Now set the correct routing
486 for m=1:nreplicas
487 % Route: Source → ServerStation → Sink
488 P{openClass, openClass}(sourceStation,serverStation{m}) = 1/nreplicas;
489 P{openClass, openClass}(serverStation{m},sinkStation) = 1.0;
490 end
491 end
492end
493
494model.link(P);
495self.ensemble{idx} = model;
496
497 function [P, curClass, jobPos] = recurActGraph(P, tidx_caller, aidx, curClass, jobPos)
498 jobPosKey(aidx) = jobPos;
499 curClassKey{aidx} = curClass;
500 nextaidxs = find(lqn.graph(aidx,:)); % these include the called entries
501 if ~isempty(nextaidxs)
502 isNextPrecFork(aidx) = any(isPostAndAct(nextaidxs)); % indexed on aidx to avoid losing it during the recursion
503 % Save curClass/jobPos before fork branch loop so each branch
504 % starts with the same pre-fork state (prevents curClass
505 % corruption across parallel branches)
506 if isNextPrecFork(aidx)
507 forkSaveCurClass = curClass;
508 forkSaveJobPos = jobPos;
509 end
510 end
511
512 for nextaidx = nextaidxs % for all successor activities
513 if ~isempty(nextaidx)
514 % Restore pre-fork state for each branch iteration
515 if isNextPrecFork(aidx)
516 curClass = forkSaveCurClass;
517 jobPos = forkSaveJobPos;
518 end
519 isLoop = false;
520 % in the activity graph, the following if is entered only
521 % by an edge that is the return from a LOOP activity
522 if (lqn.graph(aidx,nextaidx) ~= lqn.dag(aidx,nextaidx))
523 isLoop = true;
524 end
525 if ~(lqn.parent(aidx) == lqn.parent(nextaidx)) % if different parent task
526 % if the successor activity is an entry of another task, this is a call
527 cidx = matchrow(lqn.callpair,[aidx,nextaidx]); % find the call index
528 switch lqn.calltype(cidx)
529 case CallType.ASYNC
530 % Async calls don't modify caller routing - caller continues immediately without blocking.
531 % Arrival rate at destination is handled via arvproc_classes_updmap (lines 170-195, 230-248).
532 case CallType.SYNC
533 [P, jobPos, curClass] = routeSynchCall(P, jobPos, curClass);
534 % Synthetic forwarding: if the target entry has
535 % FWD calls, route to their classes in the task
536 % layer. This models the additional blocking time
537 % the caller sees while the forwarding target
538 % processes the request.
539 if ~ishostlayer
540 sync_target_eidx = lqn.callpair(cidx, 2);
541 fwd_calls = [];
542 for fc_iter = 1:lqn.ncalls
543 if lqn.calltype(fc_iter) == CallType.FWD && lqn.callpair(fc_iter, 1) == sync_target_eidx ...
544 && length(cidxClass) >= fc_iter && ~isempty(cidxClass{fc_iter})
545 fwd_calls(end+1) = fc_iter; %#ok<AGROW>
546 end
547 end
548 if ~isempty(fwd_calls)
549 total_fwd_prob = sum(callmean(fwd_calls));
550 need_merge = (length(fwd_calls) > 1) || (total_fwd_prob < 1.0 - GlobalConstants.FineTol);
551 if need_merge
552 % Create merge class for after forwarding
553 fwdMergeClass = ClosedClass(model, ['FwdMerge_', lqn.hashnames{sync_target_eidx}], 0, clientDelay);
554 fwdMergeClass.completes = false;
555 clientDelay.setService(fwdMergeClass, Immediate.getInstance());
556 for m=1:nreplicas
557 serverStation{m}.setService(fwdMergeClass, Disabled.getInstance());
558 end
559 for fi = 1:length(fwd_calls)
560 fwd_cidx_iter = fwd_calls(fi);
561 P{curClass, cidxClass{fwd_cidx_iter}}(clientDelay, clientDelay) = callmean(fwd_cidx_iter);
562 P{cidxClass{fwd_cidx_iter}, fwdMergeClass}(clientDelay, clientDelay) = 1.0;
563 clientDelay.setService(cidxClass{fwd_cidx_iter}, callservtproc{fwd_cidx_iter});
564 self.call_classes_updmap{idx}(end+1,:) = [idx, fwd_cidx_iter, 1, cidxClass{fwd_cidx_iter}.index];
565 end
566 if total_fwd_prob < 1.0 - GlobalConstants.FineTol
567 P{curClass, fwdMergeClass}(clientDelay, clientDelay) = 1.0 - total_fwd_prob;
568 end
569 curClass = fwdMergeClass;
570 else
571 % Single FWD with prob 1.0
572 fwd_cidx_iter = fwd_calls(1);
573 P{curClass, cidxClass{fwd_cidx_iter}}(clientDelay, clientDelay) = 1.0;
574 clientDelay.setService(cidxClass{fwd_cidx_iter}, callservtproc{fwd_cidx_iter});
575 self.call_classes_updmap{idx}(end+1,:) = [idx, fwd_cidx_iter, 1, cidxClass{fwd_cidx_iter}.index];
576 curClass = cidxClass{fwd_cidx_iter};
577 end
578 jobPos = atClient;
579 end
580 end
581 case CallType.FWD
582 % In the host layer, forwarding targets have their own
583 % jobs competing for the processor independently.
584 % The source task's job returns to think time after
585 % completing its own activities; no routing through
586 % the forwarding target's activities is needed.
587 % curClass and jobPos remain unchanged.
588 end
589 else
590 % at this point, we have processed all calls, let us do the
591 % activities local to the task next
592 if isempty(intersect(lqn.eshift+(1:lqn.nentries), nextaidxs))
593 % if next activity is not an entry
594 jobPos = jobPosKey(aidx);
595 curClass = curClassKey{aidx};
596 else
597 if ismember(nextaidxs(find(nextaidxs==nextaidx)-1), lqn.eshift+(1:lqn.nentries))
598 curClassC = curClass;
599 end
600 jobPos = atClient;
601 curClass = curClassC;
602 end
603 if jobPos == atClient % at client node
604 if ishostlayer
605 if ~iscachelayer
606 for m=1:nreplicas
607 if isNextPrecFork(aidx)
608 % if next activity is a post-and
609 P{curClass, curClass}(clientDelay, forkNode) = 1.0;
610 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
611 forkClassStack(end+1) = curClass.index;
612 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
613 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
614 else
615 if isPreAndAct(aidx)
616 % before entering the job we go back to the entry class at the last fork
617 forkClass = model.classes{forkClassStack(end)};
618 forkClassStack(end) = [];
619 P{curClass, forkClass}(clientDelay,joinNode) = 1.0;
620 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
621 else
622 P{curClass, aidxClass{nextaidx}}(clientDelay,serverStation{m}) = full(lqn.graph(aidx,nextaidx));
623 end
624 end
625 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
626 if isfunctionlayer
627 serverStation{m}.setDelayOff(aidxClass{nextaidx}, lqn.setuptime{lqn.parent(nextaidx)}, lqn.delayofftime{lqn.parent(nextaidx)});
628 end
629 end
630 jobPos = atServer;
631 curClass = aidxClass{nextaidx};
632 self.servt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 2, aidxClass{nextaidx}.index];
633 else
634 P{curClass, aidxClass{nextaidx}}(clientDelay,cacheNode) = full(lqn.graph(aidx,nextaidx));
635
636 cacheNode.setReadItemEntry(aidxClass{nextaidx},lqn.itemproc{aidx},lqn.nitems(aidx));
637 lqn.hitmissaidx = find(lqn.graph(nextaidx,:));
638 lqn.hitaidx = lqn.hitmissaidx(1);
639 lqn.missaidx = lqn.hitmissaidx(2);
640
641 cacheNode.setHitClass(aidxClass{nextaidx},aidxClass{lqn.hitaidx});
642 cacheNode.setMissClass(aidxClass{nextaidx},aidxClass{lqn.missaidx});
643
644 jobPos = atCache; % cache
645 curClass = aidxClass{nextaidx};
646 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, lqn.hitaidx, 3, 3, aidxClass{nextaidx}.index, aidxClass{lqn.hitaidx}.index];
647 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, lqn.missaidx, 3, 3, aidxClass{nextaidx}.index, aidxClass{lqn.missaidx}.index];
648 end
649 else % not ishostlayer
650 if isNextPrecFork(aidx)
651 % if next activity is a post-and
652 P{curClass, curClass}(clientDelay, forkNode) = 1.0;
653 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
654 forkClassStack(end+1) = curClass.index;
655 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
656 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
657 else
658 if isPreAndAct(aidx)
659 % before entering the job we go back to the entry class at the last fork
660 forkClass = model.classes{forkClassStack(end)};
661 forkClassStack(end) = [];
662 P{curClass, forkClass}(clientDelay,joinNode) = 1.0;
663 P{forkClass, aidxClass{nextaidx}}(joinNode,clientDelay) = 1.0;
664 else
665 P{curClass, aidxClass{nextaidx}}(clientDelay,clientDelay) = full(lqn.graph(aidx,nextaidx));
666 end
667 end
668 jobPos = atClient;
669 curClass = aidxClass{nextaidx};
670 clientDelay.setService(aidxClass{nextaidx}, self.servtproc{nextaidx});
671 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 1, aidxClass{nextaidx}.index];
672 end
673 elseif jobPos == atServer || jobPos == atCache % at server station
674 if ishostlayer
675 if iscachelayer
676 curClass = aidxClass{nextaidx};
677 for m=1:nreplicas
678 if isNextPrecFork(aidx)
679 % if next activity is a post-and
680 P{curClass, curClass}(cacheNode, forkNode) = 1.0;
681 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
682 forkClassStack(end+1) = curClass.index;
683 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
684 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
685 else
686 if isPreAndAct(aidx)
687 % before entering the job we go back to the entry class at the last fork
688 forkClass = model.classes{forkClassStack(end)};
689 forkClassStack(end) = [];
690
691 P{curClass, forkClass}(cacheNode,joinNode) = 1.0;
692 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
693 else
694 P{curClass, aidxClass{nextaidx}}(cacheNode,serverStation{m}) = full(lqn.graph(aidx,nextaidx));
695 end
696 end
697 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
698 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, nextaidx, 3, 2, aidxClass{nextaidx}.index, aidxClass{nextaidx}.index];
699 end
700 else
701 for m=1:nreplicas
702 if isNextPrecFork(aidx)
703 % if next activity is a post-and
704 P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
705 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
706 forkClassStack(end+1) = curClass.index;
707 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
708 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
709 else
710 if isPreAndAct(aidx)
711 % before entering the job we go back to the entry class at the last fork
712 forkClass = model.classes{forkClassStack(end)};
713 forkClassStack(end) = [];
714 P{curClass, forkClass}(serverStation{m},joinNode) = 1.0;
715 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
716 else
717 P{curClass, aidxClass{nextaidx}}(serverStation{m},serverStation{m}) = full(lqn.graph(aidx,nextaidx));
718 end
719 end
720 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
721 end
722 end
723 jobPos = atServer;
724 curClass = aidxClass{nextaidx};
725 self.servt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 2, aidxClass{nextaidx}.index];
726 else
727 for m=1:nreplicas
728 if isNextPrecFork(aidx)
729 % if next activity is a post-and
730 P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
731 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
732 forkClassStack(end+1) = curClass.index;
733 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
734 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
735 else
736 if isPreAndAct(aidx)
737 % before entering the job we go back to the entry class at the last fork
738 forkClass = model.classes{forkClassStack(end)};
739 forkClassStack(end) = [];
740 P{curClass, forkClass}(serverStation{m},joinNode) = 1.0;
741 P{forkClass, aidxClass{nextaidx}}(joinNode,clientDelay) = 1.0;
742 else
743 P{curClass, aidxClass{nextaidx}}(serverStation{m},clientDelay) = full(lqn.graph(aidx,nextaidx));
744 end
745 end
746 end
747 jobPos = atClient;
748 curClass = aidxClass{nextaidx};
749 clientDelay.setService(aidxClass{nextaidx}, self.servtproc{nextaidx});
750 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 1, aidxClass{nextaidx}.index];
751 end
752 end
753 if aidx ~= nextaidx && ~isLoop
754 %% now recursively build the rest of the routing matrix graph
755 [P, curClass, jobPos] = recurActGraph(P, tidx_caller, nextaidx, curClass, jobPos);
756
757 % At this point curClassRec is the last class in the
758 % recursive branch, which we now close with a reply
759 if jobPos == atClient
760 P{curClass, aidxClass{tidx_caller}}(clientDelay,clientDelay) = 1;
761 if ~strcmp(curClass.name(end-3:end),'.Aux')
762 curClass.completes = true;
763 end
764 else
765 for m=1:nreplicas
766 P{curClass, aidxClass{tidx_caller}}(serverStation{m},clientDelay) = 1;
767 end
768 if ~strcmp(curClass.name(end-3:end),'.Aux')
769 curClass.completes = true;
770 end
771 end
772 end
773 end
774 end
775 end % nextaidx
776 end
777
778 function [P, jobPos, curClass] = routeSynchCall(P, jobPos, curClass)
779 switch jobPos
780 case atClient
781 if lqn.parent(lqn.callpair(cidx,2)) == idx
782 % if a call to an entry of the server in this layer
783 if callmean(cidx) < nreplicas
784 P{curClass, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1 - callmean(cidx); % note that callmean(cidx) < nreplicas
785 for m=1:nreplicas
786 % if isNextPrecFork(aidx)
787 % end
788 % % if next activity is a post-and
789 % P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
790 % forkStackClass(end+1) = curClass.index;
791 % f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
792 % P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
793 % P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
794 % else
795 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = callmean(cidx) / nreplicas;
796 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},clientDelay) = 1.0; % not needed, just to avoid leaving the Aux class disconnected
797 end
798 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,clientDelay) = 1.0; % not needed, just to avoid leaving the Aux class disconnected
799 elseif callmean(cidx) == nreplicas
800 for m=1:nreplicas
801 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1 / nreplicas;
802 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},clientDelay) = 1.0;
803 end
804 else % callmean(cidx) > nreplicas
805 for m=1:nreplicas
806 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1 / nreplicas;
807 P{cidxClass{cidx}, cidxAuxClass{cidx}}(serverStation{m},clientDelay) = 1.0 ;
808 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1.0 - 1.0 / (callmean(cidx) / nreplicas);
809 end
810 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,clientDelay) = 1.0 / (callmean(cidx));
811 end
812 jobPos = atClient;
813 clientDelay.setService(cidxClass{cidx}, Immediate.getInstance());
814 for m=1:nreplicas
815 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
816 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), cidxClass{cidx}.index];
817 end
818 curClass = cidxClass{cidx};
819 else
820 % if it is not a call to an entry of the server
821 if callmean(cidx) < nreplicas
822 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = callmean(cidx)/nreplicas; % the mean number of calls is now embedded in the demand
823 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1; % the mean number of calls is now embedded in the demand
824 P{curClass, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1 - callmean(cidx)/nreplicas; % the mean number of calls is now embedded in the demand
825 curClass = cidxAuxClass{cidx};
826 elseif callmean(cidx) == nreplicas
827 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = 1;
828 curClass = cidxClass{cidx};
829 else % callmean(cidx) > nreplicas
830 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = 1; % the mean number of calls is now embedded in the demand
831 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;% / (callmean(cidx)/nreplicas); % the mean number of calls is now embedded in the demand
832 curClass = cidxAuxClass{cidx};
833 end
834 jobPos = atClient;
835 clientDelay.setService(cidxClass{cidx}, callservtproc{cidx});
836 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, 1, cidxClass{cidx}.index];
837 end
838 case atServer % job at server
839 if lqn.parent(lqn.callpair(cidx,2)) == idx
840 % if it is a call to an entry of the server
841 if callmean(cidx) < nreplicas
842 for m=1:nreplicas
843 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1 - callmean(cidx);
844 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = callmean(cidx);
845 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
846 end
847 jobPos = atClient;
848 curClass = cidxAuxClass{cidx};
849 elseif callmean(cidx) == nreplicas
850 for m=1:nreplicas
851 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1;
852 end
853 jobPos = atServer;
854 curClass = cidxClass{cidx};
855 else % callmean(cidx) > nreplicas
856 for m=1:nreplicas
857 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1;
858 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1 - 1 / (callmean(cidx));
859 P{cidxClass{cidx}, cidxAuxClass{cidx}}(serverStation{m},clientDelay) = 1 / (callmean(cidx));
860 end
861 jobPos = atClient;
862 curClass = cidxAuxClass{cidx};
863 end
864 for m=1:nreplicas
865 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
866 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), cidxClass{cidx}.index];
867 end
868 else
869 % if it is not a call to an entry of the server
870 % callmean not needed since we switched
871 % to ResidT to model service time at client
872 if callmean(cidx) < nreplicas
873 for m=1:nreplicas
874 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
875 end
876 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;
877 curClass = cidxAuxClass{cidx};
878 elseif callmean(cidx) == nreplicas
879 for m=1:nreplicas
880 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
881 end
882 curClass = cidxClass{cidx};
883 else % callmean(cidx) > nreplicas
884 for m=1:nreplicas
885 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
886 end
887 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;
888 curClass = cidxAuxClass{cidx};
889 end
890 jobPos = atClient;
891 clientDelay.setService(cidxClass{cidx}, callservtproc{cidx});
892 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, 1, cidxClass{cidx}.index];
893 end
894 end
895
896 % After synch call returns, route through activity think-time class if applicable
897 callingAidx = lqn.callpair(cidx, 1); % source activity of the call
898 if ~isempty(aidxThinkClass{callingAidx})
899 % Route from current class to think-time class at client
900 P{curClass, aidxThinkClass{callingAidx}}(clientDelay, clientDelay) = 1.0;
901 curClass = aidxThinkClass{callingAidx};
902 jobPos = atClient;
903 end
904 end
905end
Definition mmt.m:124