LINE Solver
MATLAB API documentation
Loading...
Searching...
No Matches
buildLayersRecursive.m
1function buildLayersRecursive(self, idx, callers, ishostlayer)
2lqn = self.lqn;
3jobPosKey = zeros(lqn.nidx,1);
4curClassKey = cell(lqn.nidx,1);
5% Fan-out check: when all callers have fan-out >= nreplicas for this task,
6% each replica sees the full caller traffic (fork-join semantics).
7% Model a single representative replica; updateThinkTimes multiplies by K.
8% For host layers: if all caller tasks have the same replication as the host,
9% the host is co-replicated with the task, so also use single-replica modeling.
10rawReplicas = lqn.repl(idx);
11reduceFanout = false;
12if rawReplicas > 1 && ~isempty(callers)
13 if ~ishostlayer && isfield(lqn, 'fanout') && ~isempty(lqn.fanout)
14 reduceFanout = true;
15 for c = callers(:)'
16 if lqn.fanout(c, idx) < rawReplicas
17 reduceFanout = false;
18 break;
19 end
20 end
21 elseif ishostlayer
22 reduceFanout = true;
23 for c = callers(:)'
24 if lqn.repl(c) ~= rawReplicas
25 reduceFanout = false;
26 break;
27 end
28 end
29 end
30end
31if reduceFanout
32 nreplicas = 1;
33 if ~ishostlayer
34 self.singleReplicaTasks(end+1) = idx;
35 end
36else
37 nreplicas = rawReplicas;
38end
39%mult = lqn.mult;
40mult = lqn.maxmult; % this removes spare capacity that cannot be used
41lqn.mult = mult;
42callservtproc = self.callservtproc;
43model = Network(lqn.hashnames{idx});
44model.setChecks(false); % fast mode
45model.attribute = struct('hosts',[],'tasks',[],'entries',[],'activities',[],'calls',[],'serverIdx',0);
46if ishostlayer | any(any(lqn.issynccaller(callers, lqn.entriesof{idx}))) | any(any(lqn.isasynccaller(callers, lqn.entriesof{idx}))) %#ok<OR2>
47 clientDelay = Delay(model, 'Clients');
48 model.attribute.clientIdx = 1;
49 model.attribute.serverIdx = 2;
50 model.attribute.sourceIdx = NaN;
51else
52 model.attribute.serverIdx = 1;
53 model.attribute.clientIdx = NaN;
54 model.attribute.sourceIdx = NaN;
55end
56serverStation = cell(1,nreplicas);
57isfunctionlayer = all(lqn.isfunction(callers)) && ishostlayer;
58for m=1:nreplicas
59 if m == 1
60 serverStation{m} = Queue(model,lqn.hashnames{idx}, lqn.sched(idx));
61 else
62 serverStation{m} = Queue(model,[lqn.hashnames{idx},'.',num2str(m)], lqn.sched(idx));
63 end
64 serverStation{m}.setNumberOfServers(mult(idx));
65 serverStation{m}.attribute.ishost = ishostlayer;
66 serverStation{m}.attribute.idx = idx;
67end
68
69iscachelayer = all(lqn.iscache(callers)) && ishostlayer;
70if iscachelayer
71 cacheNode = Cache(model, lqn.hashnames{callers}, lqn.nitems(callers), lqn.itemcap{callers}, lqn.replacestrat(callers));
72end
73
74actsInCaller = [lqn.actsof{callers}];
75isPostAndAct = full(lqn.actposttype)==ActivityPrecedenceType.POST_AND;
76isPreAndAct = full(lqn.actpretype)==ActivityPrecedenceType.PRE_AND;
77hasfork = any(intersect(find(isPostAndAct),actsInCaller));
78
79maxfanout = 1; % maximum output parallelism level of fork nodes
80for aidx = actsInCaller(:)'
81 successors = find(lqn.graph(aidx,:));
82 if any(isPostAndAct(successors))
83 maxfanout = max(maxfanout, sum(isPostAndAct(successors)));
84 end
85end
86
87if hasfork
88 forkNode = Fork(model, 'Fork_PostAnd');
89 for f=1:maxfanout
90 forkOutputRouter{f} = Router(model, ['Fork_PostAnd_',num2str(f)]);
91 end
92 forkClassStack = []; % stack with the entry class at the visited forks, the last visited is end of the list.
93end
94
95isPreAndAct = full(lqn.actpretype)==ActivityPrecedenceType.PRE_AND;
96hasjoin = any(isPreAndAct(actsInCaller));
97if hasjoin
98 joinNode = Join(model, 'Join_PreAnd', forkNode);
99end
100
101aidxClass = cell(1, lqn.nidx);
102aidxThinkClass = cell(1, lqn.nidx); % auxiliary classes for activity think-time
103cidxClass = cell(1,0);
104cidxAuxClass = cell(1,0);
105
106self.servt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % server classes to update
107self.thinkt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % client classes to update
108self.actthinkt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % activity think-time classes to update
109self.arvproc_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % classes to update in the next iteration for asynch calls
110self.call_classes_updmap{idx} = zeros(0,4); % [modelidx, callidx, node, class] % calls classes to update in the next iteration (includes calls in client classes)
111self.route_prob_updmap{idx} = zeros(0,7); % [modelidx, actidxfrom, actidxto, nodefrom, nodeto, classfrom, classto] % routing probabilities to update in the next iteration
112
113if ishostlayer
114 model.attribute.hosts(end+1,:) = [NaN, model.attribute.serverIdx ];
115else
116 model.attribute.tasks(end+1,:) = [NaN, model.attribute.serverIdx ];
117end
118
119hasSource = false; % flag whether a source is needed
120openClasses = [];
121entryOpenClasses = []; % track entry-level open arrivals
122% first pass: create the classes
123for tidx_caller = callers
124 % For host layers, check if the task has any entries with sync/async callers
125 % or has open arrivals, OR if any entry is a forwarding target.
126 hasDirectCallers = false;
127 isForwardingTarget = false;
128 if ishostlayer
129 % Check if the task is a reference task (always create closed class)
130 if lqn.isref(tidx_caller)
131 hasDirectCallers = true;
132 else
133 % Check if any entry of this task has sync or async callers
134 for eidx = lqn.entriesof{tidx_caller}
135 if any(full(lqn.issynccaller(:, eidx))) || any(full(lqn.isasynccaller(:, eidx)))
136 hasDirectCallers = true;
137 break;
138 end
139 % Also check for open arrivals on this entry
140 if isfield(lqn, 'arrival') && ~isempty(lqn.arrival) && ...
141 iscell(lqn.arrival) && eidx <= length(lqn.arrival) && ...
142 ~isempty(lqn.arrival{eidx})
143 hasDirectCallers = true;
144 break;
145 end
146 % Check if this entry is a forwarding target
147 for cidx = 1:lqn.ncalls
148 if full(lqn.calltype(cidx)) == CallType.FWD && full(lqn.callpair(cidx, 2)) == eidx
149 isForwardingTarget = true;
150 break;
151 end
152 end
153 end
154 end
155 end
156 if (ishostlayer && hasDirectCallers) | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2> % if it is only an asynch caller the closed classes are not needed
157 if self.njobs(tidx_caller,idx) == 0
158 % for each entry of the calling task
159 % determine job population
160 % this block matches the corresponding calculations in
161 % updateThinkTimes
162 % Use single-replica njobs if this layer or the caller is in single-replica mode
163 callerIsSingleReplica = reduceFanout || any(self.singleReplicaTasks == tidx_caller);
164 if callerIsSingleReplica
165 njobs = mult(tidx_caller);
166 else
167 njobs = mult(tidx_caller)*lqn.repl(tidx_caller);
168 end
169 if isinf(njobs)
170 callers_of_tidx_caller = find(lqn.taskgraph(:,tidx_caller));
171 njobs = sum(mult(callers_of_tidx_caller)); %#ok<FNDSB>
172 if isinf(njobs)
173 % if also the callers of tidx_caller are inf servers, then use
174 % an heuristic
175 njobs = min(sum(mult(isfinite(mult)) .* lqn.repl(isfinite(mult))),1000); % Python parity: cap at 1000
176 end
177 end
178 self.njobs(tidx_caller,idx) = njobs;
179 else
180 njobs = self.njobs(tidx_caller,idx);
181 end
182 caller_name = lqn.hashnames{tidx_caller};
183 aidxClass{tidx_caller} = ClosedClass(model, caller_name, njobs, clientDelay);
184 clientDelay.setService(aidxClass{tidx_caller}, Disabled.getInstance());
185 for m=1:nreplicas
186 serverStation{m}.setService(aidxClass{tidx_caller}, Disabled.getInstance());
187 end
188 aidxClass{tidx_caller}.completes = false;
189 aidxClass{tidx_caller}.setReferenceClass(true); % renormalize residence times using the visits to the task
190 aidxClass{tidx_caller}.attribute = [LayeredNetworkElement.TASK, tidx_caller];
191 model.attribute.tasks(end+1,:) = [aidxClass{tidx_caller}.index, tidx_caller];
192 clientDelay.setService(aidxClass{tidx_caller}, self.thinkproc{tidx_caller});
193 if ~lqn.isref(tidx_caller)
194 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, tidx_caller, 1, aidxClass{tidx_caller}.index];
195 end
196 for eidx = lqn.entriesof{tidx_caller}
197 % create a class
198 aidxClass{eidx} = ClosedClass(model, lqn.hashnames{eidx}, 0, clientDelay);
199 clientDelay.setService(aidxClass{eidx}, Disabled.getInstance());
200 for m=1:nreplicas
201 serverStation{m}.setService(aidxClass{eidx}, Disabled.getInstance());
202 end
203 aidxClass{eidx}.completes = false;
204 aidxClass{eidx}.attribute = [LayeredNetworkElement.ENTRY, eidx];
205 model.attribute.entries(end+1,:) = [aidxClass{eidx}.index, eidx];
206 [singleton, javasingleton] = Immediate.getInstance();
207 if isempty(model.obj)
208 clientDelay.setService(aidxClass{eidx}, singleton);
209 else
210 clientDelay.setService(aidxClass{eidx}, javasingleton);
211 end
212
213 % Check for open arrival distribution on this entry
214 if isfield(lqn, 'arrival') && ~isempty(lqn.arrival) && ...
215 iscell(lqn.arrival) && eidx <= length(lqn.arrival) && ...
216 ~isempty(lqn.arrival{eidx})
217
218 if ~hasSource
219 hasSource = true;
220 model.attribute.sourceIdx = length(model.nodes)+1;
221 sourceStation = Source(model,'Source');
222 sinkStation = Sink(model,'Sink');
223 end
224
225 % Create open class for this entry
226 openClassForEntry = OpenClass(model, [lqn.hashnames{eidx}, '_Open'], 0);
227 sourceStation.setArrival(openClassForEntry, lqn.arrival{eidx});
228 clientDelay.setService(openClassForEntry, Disabled.getInstance());
229
230 % Use bound activity's service time (entries themselves have Immediate service)
231 % Find activities bound to this entry via graph
232 bound_act_indices = find(lqn.graph(eidx,:) > 0);
233 if ~isempty(bound_act_indices)
234 % Use first bound activity's service time
235 bound_aidx = bound_act_indices(1);
236 for m=1:nreplicas
237 serverStation{m}.setService(openClassForEntry, self.servtproc{bound_aidx});
238 end
239 else
240 % Fallback to entry service (should not happen in well-formed models)
241 for m=1:nreplicas
242 serverStation{m}.setService(openClassForEntry, self.servtproc{eidx});
243 end
244 end
245
246 % Track for routing setup later: [class_index, entry_index]
247 entryOpenClasses(end+1,:) = [openClassForEntry.index, eidx];
248
249 % Track: Use negative entry index to distinguish from call arrivals
250 self.arvproc_classes_updmap{idx}(end+1,:) = [idx, -eidx, ...
251 model.getNodeIndex(sourceStation), openClassForEntry.index];
252
253 openClassForEntry.completes = false;
254 openClassForEntry.attribute = [LayeredNetworkElement.ENTRY, eidx];
255 end
256 end
257 end
258
259 % for each activity of the calling task
260 for aidx = lqn.actsof{tidx_caller}
261 if ishostlayer | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2>
262 % create a class
263 aidxClass{aidx} = ClosedClass(model, lqn.hashnames{aidx}, 0, clientDelay);
264 clientDelay.setService(aidxClass{aidx}, Disabled.getInstance());
265 for m=1:nreplicas
266 serverStation{m}.setService(aidxClass{aidx}, Disabled.getInstance());
267 end
268 aidxClass{aidx}.completes = false;
269 aidxClass{aidx}.attribute = [LayeredNetworkElement.ACTIVITY, aidx];
270 model.attribute.activities(end+1,:) = [aidxClass{aidx}.index, aidx];
271 hidx = lqn.parent(lqn.parent(aidx)); % index of host processor
272 if ~(ishostlayer && (hidx == idx))
273 % set the host demand for the activity
274 clientDelay.setService(aidxClass{aidx}, self.servtproc{aidx});
275 end
276 if lqn.sched(tidx_caller)~=SchedStrategy.REF % in 'ref' case the service activity is constant
277 % updmap(end+1,:) = [idx, aidx, 1, idxClass{aidx}.index];
278 end
279 if iscachelayer && full(lqn.graph(eidx,aidx))
280 clientDelay.setService(aidxClass{aidx}, self.servtproc{aidx});
281 end
282
283 % Create auxiliary think-time class if activity has think-time
284 if ~isempty(lqn.actthink{aidx}) && lqn.actthink{aidx}.getMean() > GlobalConstants.FineTol
285 aidxThinkClass{aidx} = ClosedClass(model, [lqn.hashnames{aidx},'.Think'], 0, clientDelay);
286 aidxThinkClass{aidx}.completes = false;
287 aidxThinkClass{aidx}.attribute = [LayeredNetworkElement.ACTIVITY, aidx];
288 clientDelay.setService(aidxThinkClass{aidx}, lqn.actthink{aidx});
289 for m=1:nreplicas
290 serverStation{m}.setService(aidxThinkClass{aidx}, Disabled.getInstance());
291 end
292 self.actthinkt_classes_updmap{idx}(end+1,:) = [idx, aidx, 1, aidxThinkClass{aidx}.index];
293 end
294 end
295 % add a class for each outgoing call from this activity
296 for cidx = lqn.callsof{aidx}
297 callmean(cidx) = lqn.callproc{cidx}.getMean;
298 switch lqn.calltype(cidx)
299 case CallType.ASYNC
300 if lqn.parent(lqn.callpair(cidx,2)) == idx % add only if the target is serverStation
301 if ~hasSource % we need to add source and sink to the model
302 hasSource = true;
303 model.attribute.sourceIdx = length(model.nodes)+1;
304 sourceStation = Source(model,'Source');
305 sinkStation = Sink(model,'Sink');
306 end
307 cidxClass{cidx} = OpenClass(model, lqn.callhashnames{cidx}, 0);
308 sourceStation.setArrival(cidxClass{cidx}, Immediate.getInstance());
309 clientDelay.setService(cidxClass{cidx}, Disabled.getInstance());
310 for m=1:nreplicas
311 serverStation{m}.setService(cidxClass{cidx}, Immediate.getInstance());
312 end
313 openClasses(end+1,:) = [cidxClass{cidx}.index, callmean(cidx), cidx];
314 model.attribute.calls(end+1,:) = [cidxClass{cidx}.index, cidx, lqn.callpair(cidx,1), lqn.callpair(cidx,2)];
315 cidxClass{cidx}.completes = false;
316 cidxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
317 minRespT = 0;
318 for tidx_act = lqn.actsof{idx}
319 minRespT = minRespT + lqn.hostdem{tidx_act}.getMean; % upper bound, uses all activities not just the ones reachable by this entry
320 end
321 for m=1:nreplicas
322 serverStation{m}.setService(cidxClass{cidx}, Exp.fitMean(minRespT));
323 end
324 end
325 case CallType.SYNC
326 cidxClass{cidx} = ClosedClass(model, lqn.callhashnames{cidx}, 0, clientDelay);
327 clientDelay.setService(cidxClass{cidx}, Disabled.getInstance());
328 for m=1:nreplicas
329 serverStation{m}.setService(cidxClass{cidx}, Disabled.getInstance());
330 end
331 model.attribute.calls(end+1,:) = [cidxClass{cidx}.index, cidx, lqn.callpair(cidx,1), lqn.callpair(cidx,2)];
332 cidxClass{cidx}.completes = false;
333 cidxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
334 minRespT = 0;
335 for tidx_act = lqn.actsof{idx}
336 minRespT = minRespT + lqn.hostdem{tidx_act}.getMean; % upper bound, uses all activities not just the ones reachable by this entry
337 end
338 for m=1:nreplicas
339 serverStation{m}.setService(cidxClass{cidx}, Exp.fitMean(minRespT));
340 end
341 end
342
343 if callmean(cidx) ~= nreplicas
344 switch lqn.calltype(cidx)
345 case CallType.SYNC
346 cidxAuxClass{cidx} = ClosedClass(model, [lqn.callhashnames{cidx},'.Aux'], 0, clientDelay);
347 cidxAuxClass{cidx}.completes = false;
348 cidxAuxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
349 clientDelay.setService(cidxAuxClass{cidx}, Immediate.getInstance());
350 for m=1:nreplicas
351 serverStation{m}.setService(cidxAuxClass{cidx}, Disabled.getInstance());
352 end
353 end
354 end
355 end
356 end
357end
358
359% Ensure Source's sourceClasses and arrivalProcess arrays are properly sized for all classes
360% This is needed because the Source may be created during class iteration
361% when only some classes exist, and new closed classes added afterwards
362% won't have corresponding entries in sourceClasses/arrivalProcess
363if hasSource
364 nClasses = model.getNumberOfClasses();
365 for k = 1:nClasses
366 if k > length(sourceStation.input.sourceClasses) || isempty(sourceStation.input.sourceClasses{k})
367 sourceStation.input.sourceClasses{k} = {[], ServiceStrategy.LI, Disabled.getInstance()};
368 end
369 if k > length(sourceStation.arrivalProcess) || isempty(sourceStation.arrivalProcess{k})
370 sourceStation.arrivalProcess{k} = Disabled.getInstance();
371 end
372 end
373end
374
375P = model.initRoutingMatrix;
376if hasSource
377 for o = 1:size(openClasses,1)
378 oidx = openClasses(o,1);
379 p = 1 / openClasses(o,2); % divide by mean number of calls, they go to a server at random
380 for m=1:nreplicas
381 P{model.classes{oidx}, model.classes{oidx}}(sourceStation,serverStation{m}) = 1/nreplicas;
382 for n=1:nreplicas
383 P{model.classes{oidx}, model.classes{oidx}}(serverStation{m},serverStation{n}) = (1-p)/nreplicas;
384 end
385 P{model.classes{oidx}, model.classes{oidx}}(serverStation{m},sinkStation) = p;
386 end
387 cidx = openClasses(o,3); % 3 = source
388 self.arvproc_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(sourceStation), oidx];
389 for m=1:nreplicas
390 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), oidx];
391 end
392 end
393end
394
395%% job positions are encoded as follows: 1=client, 2=any of the nreplicas server stations, 3=cache node, 4=fork node, 5=join node
396atClient = 1;
397atServer = 2;
398atCache = 3;
399
400jobPos = atClient; % start at client
401% second pass: setup the routing out of entries
402for tidx_caller = callers
403 % Use same condition as first pass - only process if closed class was created
404 hasDirectCallers = false;
405 if ishostlayer
406 if lqn.isref(tidx_caller)
407 hasDirectCallers = true;
408 else
409 for eidx_check = lqn.entriesof{tidx_caller}
410 if any(full(lqn.issynccaller(:, eidx_check))) || any(full(lqn.isasynccaller(:, eidx_check)))
411 hasDirectCallers = true;
412 break;
413 end
414 if isfield(lqn, 'arrival') && ~isempty(lqn.arrival) && ...
415 iscell(lqn.arrival) && eidx_check <= length(lqn.arrival) && ...
416 ~isempty(lqn.arrival{eidx_check})
417 hasDirectCallers = true;
418 break;
419 end
420 end
421 end
422 end
423 if (ishostlayer && hasDirectCallers) | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2>
424 % for each entry of the calling task
425 ncaller_entries = length(lqn.entriesof{tidx_caller});
426 for eidx = lqn.entriesof{tidx_caller}
427 aidxClass_eidx = aidxClass{eidx};
428 aidxClass_tidx_caller = aidxClass{tidx_caller};
429 % initialize the probability to select an entry to be identical
430 P{aidxClass_tidx_caller, aidxClass_eidx}(clientDelay, clientDelay) = 1 / ncaller_entries;
431 if ncaller_entries > 1
432 % at successive iterations make sure to replace this with throughput ratio
433 self.route_prob_updmap{idx}(end+1,:) = [idx, tidx_caller, eidx, 1, 1, aidxClass_tidx_caller.index, aidxClass_eidx.index];
434 end
435 P = recurActGraph(P, tidx_caller, eidx, aidxClass_eidx, jobPos);
436 end
437 end
438end
439
440% Setup routing for entry-level open arrivals (AFTER recurActGraph to avoid being overwritten)
441if hasSource && ~isempty(entryOpenClasses)
442 for e = 1:size(entryOpenClasses,1)
443 eoidx = entryOpenClasses(e,1); % class index
444 openClass = model.classes{eoidx};
445
446 % Explicitly set routing: ONLY Source → Server → Sink
447 % Zero out all routing for this class first
448 for node1 = 1:length(model.nodes)
449 for node2 = 1:length(model.nodes)
450 P{openClass, openClass}(node1, node2) = 0;
451 end
452 end
453
454 % Now set the correct routing
455 for m=1:nreplicas
456 % Route: Source → ServerStation → Sink
457 P{openClass, openClass}(sourceStation,serverStation{m}) = 1/nreplicas;
458 P{openClass, openClass}(serverStation{m},sinkStation) = 1.0;
459 end
460 end
461end
462
463model.link(P);
464self.ensemble{idx} = model;
465
466 function [P, curClass, jobPos] = recurActGraph(P, tidx_caller, aidx, curClass, jobPos)
467 jobPosKey(aidx) = jobPos;
468 curClassKey{aidx} = curClass;
469 nextaidxs = find(lqn.graph(aidx,:)); % these include the called entries
470 if ~isempty(nextaidxs)
471 isNextPrecFork(aidx) = any(isPostAndAct(nextaidxs)); % indexed on aidx to avoid losing it during the recursion
472 % Save curClass/jobPos before fork branch loop so each branch
473 % starts with the same pre-fork state (prevents curClass
474 % corruption across parallel branches)
475 if isNextPrecFork(aidx)
476 forkSaveCurClass = curClass;
477 forkSaveJobPos = jobPos;
478 end
479 end
480
481 for nextaidx = nextaidxs % for all successor activities
482 if ~isempty(nextaidx)
483 % Restore pre-fork state for each branch iteration
484 if isNextPrecFork(aidx)
485 curClass = forkSaveCurClass;
486 jobPos = forkSaveJobPos;
487 end
488 isLoop = false;
489 % in the activity graph, the following if is entered only
490 % by an edge that is the return from a LOOP activity
491 if (lqn.graph(aidx,nextaidx) ~= lqn.dag(aidx,nextaidx))
492 isLoop = true;
493 end
494 if ~(lqn.parent(aidx) == lqn.parent(nextaidx)) % if different parent task
495 % if the successor activity is an entry of another task, this is a call
496 cidx = matchrow(lqn.callpair,[aidx,nextaidx]); % find the call index
497 switch lqn.calltype(cidx)
498 case CallType.ASYNC
499 % Async calls don't modify caller routing - caller continues immediately without blocking.
500 % Arrival rate at destination is handled via arvproc_classes_updmap (lines 170-195, 230-248).
501 case CallType.SYNC
502 [P, jobPos, curClass] = routeSynchCall(P, jobPos, curClass);
503 case CallType.FWD
504 % Forwarding: after the source entry's activity completes,
505 % route to the forwarding target's bound activity on the same processor
506 if ishostlayer
507 target_entry = nextaidx;
508 target_bound_acts = find(lqn.graph(target_entry, :));
509 if ~isempty(target_bound_acts)
510 target_aidx = target_bound_acts(1); % First bound activity
511 fwd_prob = full(lqn.graph(aidx, target_entry)); % Forwarding probability
512
513 % Create class for target activity if needed
514 if isempty(aidxClass{target_aidx})
515 aidxClass{target_aidx} = ClosedClass(model, lqn.hashnames{target_aidx}, 0, clientDelay);
516 clientDelay.setService(aidxClass{target_aidx}, Disabled.getInstance());
517 for m=1:nreplicas
518 serverStation{m}.setService(aidxClass{target_aidx}, Disabled.getInstance());
519 end
520 aidxClass{target_aidx}.completes = false;
521 aidxClass{target_aidx}.attribute = [LayeredNetworkElement.ACTIVITY, target_aidx];
522 model.attribute.activities(end+1,:) = [aidxClass{target_aidx}.index, target_aidx];
523 end
524
525 % Route from current position to target activity at server
526 for m=1:nreplicas
527 if jobPos == atClient
528 P{curClass, aidxClass{target_aidx}}(clientDelay, serverStation{m}) = fwd_prob / nreplicas;
529 else
530 P{curClass, aidxClass{target_aidx}}(serverStation{m}, serverStation{m}) = fwd_prob;
531 end
532 serverStation{m}.setService(aidxClass{target_aidx}, lqn.hostdem{target_aidx});
533 end
534 self.servt_classes_updmap{idx}(end+1,:) = [idx, target_aidx, 2, aidxClass{target_aidx}.index];
535
536 jobPos = atServer;
537 curClass = aidxClass{target_aidx};
538
539 % Check if target entry also has forwarding (chained forwarding)
540 target_nextaidxs = find(lqn.graph(target_entry, :));
541 for target_nextaidx = target_nextaidxs
542 if target_nextaidx ~= target_aidx % Skip the bound activity we just processed
543 % Check if this is another forwarding call
544 target_cidx = matchrow(lqn.callpair, [target_entry, target_nextaidx]);
545 if ~isempty(target_cidx) && lqn.calltype(target_cidx) == CallType.FWD
546 % Recursively handle chained forwarding
547 [P, curClass, jobPos] = recurActGraph(P, tidx_caller, target_entry, curClass, jobPos);
548 end
549 end
550 end
551 end
552 end
553 end
554 else
555 % at this point, we have processed all calls, let us do the
556 % activities local to the task next
557 if isempty(intersect(lqn.eshift+(1:lqn.nentries), nextaidxs))
558 % if next activity is not an entry
559 jobPos = jobPosKey(aidx);
560 curClass = curClassKey{aidx};
561 else
562 if ismember(nextaidxs(find(nextaidxs==nextaidx)-1), lqn.eshift+(1:lqn.nentries))
563 curClassC = curClass;
564 end
565 jobPos = atClient;
566 curClass = curClassC;
567 end
568 if jobPos == atClient % at client node
569 if ishostlayer
570 if ~iscachelayer
571 for m=1:nreplicas
572 if isNextPrecFork(aidx)
573 % if next activity is a post-and
574 P{curClass, curClass}(clientDelay, forkNode) = 1.0;
575 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
576 forkClassStack(end+1) = curClass.index;
577 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
578 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
579 else
580 if isPreAndAct(aidx)
581 % before entering the job we go back to the entry class at the last fork
582 forkClass = model.classes{forkClassStack(end)};
583 forkClassStack(end) = [];
584 P{curClass, forkClass}(clientDelay,joinNode) = 1.0;
585 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
586 else
587 P{curClass, aidxClass{nextaidx}}(clientDelay,serverStation{m}) = full(lqn.graph(aidx,nextaidx));
588 end
589 end
590 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
591 if isfunctionlayer
592 serverStation{m}.setDelayOff(aidxClass{nextaidx}, lqn.setuptime{lqn.parent(nextaidx)}, lqn.delayofftime{lqn.parent(nextaidx)});
593 end
594 end
595 jobPos = atServer;
596 curClass = aidxClass{nextaidx};
597 self.servt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 2, aidxClass{nextaidx}.index];
598 else
599 P{curClass, aidxClass{nextaidx}}(clientDelay,cacheNode) = full(lqn.graph(aidx,nextaidx));
600
601 cacheNode.setReadItemEntry(aidxClass{nextaidx},lqn.itemproc{aidx},lqn.nitems(aidx));
602 lqn.hitmissaidx = find(lqn.graph(nextaidx,:));
603 lqn.hitaidx = lqn.hitmissaidx(1);
604 lqn.missaidx = lqn.hitmissaidx(2);
605
606 cacheNode.setHitClass(aidxClass{nextaidx},aidxClass{lqn.hitaidx});
607 cacheNode.setMissClass(aidxClass{nextaidx},aidxClass{lqn.missaidx});
608
609 jobPos = atCache; % cache
610 curClass = aidxClass{nextaidx};
611 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, lqn.hitaidx, 3, 3, aidxClass{nextaidx}.index, aidxClass{lqn.hitaidx}.index];
612 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, lqn.missaidx, 3, 3, aidxClass{nextaidx}.index, aidxClass{lqn.missaidx}.index];
613 end
614 else % not ishostlayer
615 if isNextPrecFork(aidx)
616 % if next activity is a post-and
617 P{curClass, curClass}(clientDelay, forkNode) = 1.0;
618 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
619 forkClassStack(end+1) = curClass.index;
620 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
621 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
622 else
623 if isPreAndAct(aidx)
624 % before entering the job we go back to the entry class at the last fork
625 forkClass = model.classes{forkClassStack(end)};
626 forkClassStack(end) = [];
627 P{curClass, forkClass}(clientDelay,joinNode) = 1.0;
628 P{forkClass, aidxClass{nextaidx}}(joinNode,clientDelay) = 1.0;
629 else
630 P{curClass, aidxClass{nextaidx}}(clientDelay,clientDelay) = full(lqn.graph(aidx,nextaidx));
631 end
632 end
633 jobPos = atClient;
634 curClass = aidxClass{nextaidx};
635 clientDelay.setService(aidxClass{nextaidx}, self.servtproc{nextaidx});
636 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 1, aidxClass{nextaidx}.index];
637 end
638 elseif jobPos == atServer || jobPos == atCache % at server station
639 if ishostlayer
640 if iscachelayer
641 curClass = aidxClass{nextaidx};
642 for m=1:nreplicas
643 if isNextPrecFork(aidx)
644 % if next activity is a post-and
645 P{curClass, curClass}(cacheNode, forkNode) = 1.0;
646 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
647 forkClassStack(end+1) = curClass.index;
648 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
649 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
650 else
651 if isPreAndAct(aidx)
652 % before entering the job we go back to the entry class at the last fork
653 forkClass = model.classes{forkClassStack(end)};
654 forkClassStack(end) = [];
655
656 P{curClass, forkClass}(cacheNode,joinNode) = 1.0;
657 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
658 else
659 P{curClass, aidxClass{nextaidx}}(cacheNode,serverStation{m}) = full(lqn.graph(aidx,nextaidx));
660 end
661 end
662 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
663 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, nextaidx, 3, 2, aidxClass{nextaidx}.index, aidxClass{nextaidx}.index];
664 end
665 else
666 for m=1:nreplicas
667 if isNextPrecFork(aidx)
668 % if next activity is a post-and
669 P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
670 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
671 forkClassStack(end+1) = curClass.index;
672 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
673 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
674 else
675 if isPreAndAct(aidx)
676 % before entering the job we go back to the entry class at the last fork
677 forkClass = model.classes{forkClassStack(end)};
678 forkClassStack(end) = [];
679 P{curClass, forkClass}(serverStation{m},joinNode) = 1.0;
680 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
681 else
682 P{curClass, aidxClass{nextaidx}}(serverStation{m},serverStation{m}) = full(lqn.graph(aidx,nextaidx));
683 end
684 end
685 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
686 end
687 end
688 jobPos = atServer;
689 curClass = aidxClass{nextaidx};
690 self.servt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 2, aidxClass{nextaidx}.index];
691 else
692 for m=1:nreplicas
693 if isNextPrecFork(aidx)
694 % if next activity is a post-and
695 P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
696 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
697 forkClassStack(end+1) = curClass.index;
698 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
699 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
700 else
701 if isPreAndAct(aidx)
702 % before entering the job we go back to the entry class at the last fork
703 forkClass = model.classes{forkClassStack(end)};
704 forkClassStack(end) = [];
705 P{curClass, forkClass}(serverStation{m},joinNode) = 1.0;
706 P{forkClass, aidxClass{nextaidx}}(joinNode,clientDelay) = 1.0;
707 else
708 P{curClass, aidxClass{nextaidx}}(serverStation{m},clientDelay) = full(lqn.graph(aidx,nextaidx));
709 end
710 end
711 end
712 jobPos = atClient;
713 curClass = aidxClass{nextaidx};
714 clientDelay.setService(aidxClass{nextaidx}, self.servtproc{nextaidx});
715 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 1, aidxClass{nextaidx}.index];
716 end
717 end
718 if aidx ~= nextaidx && ~isLoop
719 %% now recursively build the rest of the routing matrix graph
720 [P, curClass, jobPos] = recurActGraph(P, tidx_caller, nextaidx, curClass, jobPos);
721
722 % At this point curClassRec is the last class in the
723 % recursive branch, which we now close with a reply
724 if jobPos == atClient
725 P{curClass, aidxClass{tidx_caller}}(clientDelay,clientDelay) = 1;
726 if ~strcmp(curClass.name(end-3:end),'.Aux')
727 curClass.completes = true;
728 end
729 else
730 for m=1:nreplicas
731 P{curClass, aidxClass{tidx_caller}}(serverStation{m},clientDelay) = 1;
732 end
733 if ~strcmp(curClass.name(end-3:end),'.Aux')
734 curClass.completes = true;
735 end
736 end
737 end
738 end
739 end
740 end % nextaidx
741 end
742
743 function [P, jobPos, curClass] = routeSynchCall(P, jobPos, curClass)
744 switch jobPos
745 case atClient
746 if lqn.parent(lqn.callpair(cidx,2)) == idx
747 % if a call to an entry of the server in this layer
748 if callmean(cidx) < nreplicas
749 P{curClass, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1 - callmean(cidx); % note that callmean(cidx) < nreplicas
750 for m=1:nreplicas
751 % if isNextPrecFork(aidx)
752 % end
753 % % if next activity is a post-and
754 % P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
755 % forkStackClass(end+1) = curClass.index;
756 % f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
757 % P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
758 % P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
759 % else
760 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = callmean(cidx) / nreplicas;
761 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},clientDelay) = 1.0; % not needed, just to avoid leaving the Aux class disconnected
762 end
763 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,clientDelay) = 1.0; % not needed, just to avoid leaving the Aux class disconnected
764 elseif callmean(cidx) == nreplicas
765 for m=1:nreplicas
766 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1 / nreplicas;
767 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},clientDelay) = 1.0;
768 end
769 else % callmean(cidx) > nreplicas
770 for m=1:nreplicas
771 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1 / nreplicas;
772 P{cidxClass{cidx}, cidxAuxClass{cidx}}(serverStation{m},clientDelay) = 1.0 ;
773 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1.0 - 1.0 / (callmean(cidx) / nreplicas);
774 end
775 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,clientDelay) = 1.0 / (callmean(cidx));
776 end
777 jobPos = atClient;
778 clientDelay.setService(cidxClass{cidx}, Immediate.getInstance());
779 for m=1:nreplicas
780 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
781 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), cidxClass{cidx}.index];
782 end
783 curClass = cidxClass{cidx};
784 else
785 % if it is not a call to an entry of the server
786 if callmean(cidx) < nreplicas
787 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = callmean(cidx)/nreplicas; % the mean number of calls is now embedded in the demand
788 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1; % the mean number of calls is now embedded in the demand
789 P{curClass, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1 - callmean(cidx)/nreplicas; % the mean number of calls is now embedded in the demand
790 curClass = cidxAuxClass{cidx};
791 elseif callmean(cidx) == nreplicas
792 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = 1;
793 curClass = cidxClass{cidx};
794 else % callmean(cidx) > nreplicas
795 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = 1; % the mean number of calls is now embedded in the demand
796 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;% / (callmean(cidx)/nreplicas); % the mean number of calls is now embedded in the demand
797 curClass = cidxAuxClass{cidx};
798 end
799 jobPos = atClient;
800 clientDelay.setService(cidxClass{cidx}, callservtproc{cidx});
801 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, 1, cidxClass{cidx}.index];
802 end
803 case atServer % job at server
804 if lqn.parent(lqn.callpair(cidx,2)) == idx
805 % if it is a call to an entry of the server
806 if callmean(cidx) < nreplicas
807 for m=1:nreplicas
808 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1 - callmean(cidx);
809 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = callmean(cidx);
810 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
811 end
812 jobPos = atClient;
813 curClass = cidxAuxClass{cidx};
814 elseif callmean(cidx) == nreplicas
815 for m=1:nreplicas
816 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1;
817 end
818 jobPos = atServer;
819 curClass = cidxClass{cidx};
820 else % callmean(cidx) > nreplicas
821 for m=1:nreplicas
822 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1;
823 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1 - 1 / (callmean(cidx));
824 P{cidxClass{cidx}, cidxAuxClass{cidx}}(serverStation{m},clientDelay) = 1 / (callmean(cidx));
825 end
826 jobPos = atClient;
827 curClass = cidxAuxClass{cidx};
828 end
829 for m=1:nreplicas
830 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
831 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), cidxClass{cidx}.index];
832 end
833 else
834 % if it is not a call to an entry of the server
835 % callmean not needed since we switched
836 % to ResidT to model service time at client
837 if callmean(cidx) < nreplicas
838 for m=1:nreplicas
839 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
840 end
841 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;
842 curClass = cidxAuxClass{cidx};
843 elseif callmean(cidx) == nreplicas
844 for m=1:nreplicas
845 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
846 end
847 curClass = cidxClass{cidx};
848 else % callmean(cidx) > nreplicas
849 for m=1:nreplicas
850 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
851 end
852 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;
853 curClass = cidxAuxClass{cidx};
854 end
855 jobPos = atClient;
856 clientDelay.setService(cidxClass{cidx}, callservtproc{cidx});
857 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, 1, cidxClass{cidx}.index];
858 end
859 end
860
861 % After synch call returns, route through activity think-time class if applicable
862 callingAidx = lqn.callpair(cidx, 1); % source activity of the call
863 if ~isempty(aidxThinkClass{callingAidx})
864 % Route from current class to think-time class at client
865 P{curClass, aidxThinkClass{callingAidx}}(clientDelay, clientDelay) = 1.0;
866 curClass = aidxThinkClass{callingAidx};
867 jobPos = atClient;
868 end
869 end
870end
Definition mmt.m:93