1function buildLayersRecursive(self, idx, callers, ishostlayer)
3jobPosKey = zeros(lqn.nidx,1);
4curClassKey = cell(lqn.nidx,1);
5% Fan-out check: when all callers have fan-out >= nreplicas
for this task,
6% each replica sees the full caller traffic (fork-join semantics).
7% Model a single representative replica; updateThinkTimes multiplies by K.
8% For host layers:
if all caller tasks have the same replication as the host,
9% the host
is co-replicated with the task, so also use single-replica modeling.
10rawReplicas = lqn.repl(idx);
12if rawReplicas > 1 && ~isempty(callers)
13 if ~ishostlayer && isfield(lqn,
'fanout') && ~isempty(lqn.fanout)
16 if lqn.fanout(c, idx) < rawReplicas
24 if lqn.repl(c) ~= rawReplicas
34 self.singleReplicaTasks(end+1) = idx;
37 nreplicas = rawReplicas;
40mult = lqn.maxmult; % this removes spare capacity that cannot be used
42callservtproc = self.callservtproc;
43model = Network(lqn.hashnames{idx});
44model.setChecks(
false); % fast mode
45model.attribute =
struct(
'hosts',[],
'tasks',[],
'entries',[],
'activities',[],
'calls',[],
'serverIdx',0);
46if ishostlayer | any(any(lqn.issynccaller(callers, lqn.entriesof{idx}))) | any(any(lqn.isasynccaller(callers, lqn.entriesof{idx}))) %#ok<OR2>
47 clientDelay = Delay(model,
'Clients');
48 model.attribute.clientIdx = 1;
49 model.attribute.serverIdx = 2;
50 model.attribute.sourceIdx = NaN;
52 model.attribute.serverIdx = 1;
53 model.attribute.clientIdx = NaN;
54 model.attribute.sourceIdx = NaN;
56serverStation = cell(1,nreplicas);
57isfunctionlayer = all(lqn.isfunction(callers)) && ishostlayer;
60 serverStation{m} = Queue(model,lqn.hashnames{idx}, lqn.sched(idx));
62 serverStation{m} = Queue(model,[lqn.hashnames{idx},
'.',num2str(m)], lqn.sched(idx));
64 serverStation{m}.setNumberOfServers(mult(idx));
65 serverStation{m}.attribute.ishost = ishostlayer;
66 serverStation{m}.attribute.idx = idx;
67 % LQN successive activities on the same host retain the server: model
68 % the resulting
class-switching self-loops as immediate feedback so the
69 % simulator does not re-queue the job behind other waiting jobs.
70 serverStation{m}.setImmediateFeedback(
true);
73iscachelayer = all(lqn.iscache(callers)) && ishostlayer;
75 cacheNode = Cache(model, lqn.hashnames{callers}, lqn.nitems(callers), lqn.itemcap{callers}, lqn.replacestrat(callers));
78actsInCaller = [lqn.actsof{callers}];
79isPostAndAct = full(lqn.actposttype)==ActivityPrecedenceType.POST_AND;
80isPreAndAct = full(lqn.actpretype)==ActivityPrecedenceType.PRE_AND;
81hasfork = any(intersect(find(isPostAndAct),actsInCaller));
83maxfanout = 1; % maximum output parallelism level of fork
nodes
84for aidx = actsInCaller(:)
'
85 successors = find(lqn.graph(aidx,:));
86 if any(isPostAndAct(successors))
87 maxfanout = max(maxfanout, sum(isPostAndAct(successors)));
92 forkNode = Fork(model, 'Fork_PostAnd
');
94 forkOutputRouter{f} = Router(model, ['Fork_PostAnd_
',num2str(f)]);
96 forkClassStack = []; % stack with the entry class at the visited forks, the last visited is end of the list.
99isPreAndAct = full(lqn.actpretype)==ActivityPrecedenceType.PRE_AND;
100hasjoin = any(isPreAndAct(actsInCaller));
102 joinNode = Join(model, 'Join_PreAnd
', forkNode);
105aidxClass = cell(1, lqn.nidx);
106aidxThinkClass = cell(1, lqn.nidx); % auxiliary classes for activity think-time
107cidxClass = cell(1,0);
108cidxAuxClass = cell(1,0);
110self.servt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % server classes to update
111self.thinkt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % client classes to update
112self.actthinkt_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % activity think-time classes to update
113self.arvproc_classes_updmap{idx} = zeros(0,4); % [modelidx, actidx, node, class] % classes to update in the next iteration for asynch calls
114self.call_classes_updmap{idx} = zeros(0,4); % [modelidx, callidx, node, class] % calls classes to update in the next iteration (includes calls in client classes)
115self.route_prob_updmap{idx} = zeros(0,7); % [modelidx, actidxfrom, actidxto, nodefrom, nodeto, classfrom, classto] % routing probabilities to update in the next iteration
118 model.attribute.hosts(end+1,:) = [NaN, model.attribute.serverIdx ];
120 model.attribute.tasks(end+1,:) = [NaN, model.attribute.serverIdx ];
123hasSource = false; % flag whether a source is needed
125entryOpenClasses = []; % track entry-level open arrivals
126% first pass: create the classes
127for tidx_caller = callers
128 % For host layers, check if the task has any entries with sync/async callers
129 % or has open arrivals, OR if any entry is a forwarding target.
130 hasDirectCallers = false;
131 isForwardingTarget = false;
133 % Check if the task is a reference task (always create closed class)
134 if lqn.isref(tidx_caller)
135 hasDirectCallers = true;
137 % Check if any entry of this task has sync or async callers
138 for eidx = lqn.entriesof{tidx_caller}
139 if any(full(lqn.issynccaller(:, eidx))) || any(full(lqn.isasynccaller(:, eidx)))
140 hasDirectCallers = true;
143 % Also check for open arrivals on this entry
144 if isfield(lqn, 'arrival
') && ~isempty(lqn.arrival) && ...
145 iscell(lqn.arrival) && eidx <= length(lqn.arrival) && ...
146 ~isempty(lqn.arrival{eidx})
147 hasDirectCallers = true;
150 % Check if this entry is a forwarding target
151 for cidx = 1:lqn.ncalls
152 if full(lqn.calltype(cidx)) == CallType.FWD && full(lqn.callpair(cidx, 2)) == eidx
153 isForwardingTarget = true;
160 if (ishostlayer && (hasDirectCallers || isForwardingTarget)) | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2> % if it is only an asynch caller the closed classes are not needed
161 if self.njobs(tidx_caller,idx) == 0
162 % for each entry of the calling task
163 % determine job population
164 % this block matches the corresponding calculations in
166 % Use single-replica njobs if this layer or the caller is in single-replica mode
167 callerIsSingleReplica = reduceFanout || any(self.singleReplicaTasks == tidx_caller);
168 if callerIsSingleReplica
169 njobs = mult(tidx_caller);
171 njobs = mult(tidx_caller)*lqn.repl(tidx_caller);
174 callers_of_tidx_caller = find(lqn.taskgraph(:,tidx_caller));
175 njobs = sum(mult(callers_of_tidx_caller)); %#ok<FNDSB>
177 % if also the callers of tidx_caller are inf servers, then use
179 njobs = min(sum(mult(isfinite(mult)) .* lqn.repl(isfinite(mult))),1000); % Python parity: cap at 1000
182 self.njobs(tidx_caller,idx) = njobs;
184 njobs = self.njobs(tidx_caller,idx);
186 caller_name = lqn.hashnames{tidx_caller};
187 aidxClass{tidx_caller} = ClosedClass(model, caller_name, njobs, clientDelay);
188 clientDelay.setService(aidxClass{tidx_caller}, Disabled.getInstance());
190 serverStation{m}.setService(aidxClass{tidx_caller}, Disabled.getInstance());
192 aidxClass{tidx_caller}.completes = false;
193 aidxClass{tidx_caller}.setReferenceClass(true); % renormalize residence times using the visits to the task
194 aidxClass{tidx_caller}.attribute = [LayeredNetworkElement.TASK, tidx_caller];
195 model.attribute.tasks(end+1,:) = [aidxClass{tidx_caller}.index, tidx_caller];
196 clientDelay.setService(aidxClass{tidx_caller}, self.thinkproc{tidx_caller});
197 if ~lqn.isref(tidx_caller)
198 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, tidx_caller, 1, aidxClass{tidx_caller}.index];
200 for eidx = lqn.entriesof{tidx_caller}
202 aidxClass{eidx} = ClosedClass(model, lqn.hashnames{eidx}, 0, clientDelay);
203 clientDelay.setService(aidxClass{eidx}, Disabled.getInstance());
205 serverStation{m}.setService(aidxClass{eidx}, Disabled.getInstance());
207 aidxClass{eidx}.completes = false;
208 aidxClass{eidx}.attribute = [LayeredNetworkElement.ENTRY, eidx];
209 model.attribute.entries(end+1,:) = [aidxClass{eidx}.index, eidx];
210 [singleton, javasingleton] = Immediate.getInstance();
211 if isempty(model.obj)
212 clientDelay.setService(aidxClass{eidx}, singleton);
214 clientDelay.setService(aidxClass{eidx}, javasingleton);
217 % Check for open arrival distribution on this entry
218 if isfield(lqn, 'arrival
') && ~isempty(lqn.arrival) && ...
219 iscell(lqn.arrival) && eidx <= length(lqn.arrival) && ...
220 ~isempty(lqn.arrival{eidx})
224 model.attribute.sourceIdx = length(model.nodes)+1;
225 sourceStation = Source(model,'Source
');
226 sinkStation = Sink(model,'Sink
');
229 % Create open class for this entry
230 openClassForEntry = OpenClass(model, [lqn.hashnames{eidx}, '_Open
'], 0);
231 sourceStation.setArrival(openClassForEntry, lqn.arrival{eidx});
232 clientDelay.setService(openClassForEntry, Disabled.getInstance());
234 % Use bound activity's service time (entries themselves have Immediate service)
235 % Find activities bound to this entry via graph
236 bound_act_indices = find(lqn.graph(eidx,:) > 0);
237 if ~isempty(bound_act_indices)
238 % Use first bound activity
's service time
239 bound_aidx = bound_act_indices(1);
241 serverStation{m}.setService(openClassForEntry, self.servtproc{bound_aidx});
244 % Fallback to entry service (should not happen in well-formed models)
246 serverStation{m}.setService(openClassForEntry, self.servtproc{eidx});
250 % Track for routing setup later: [class_index, entry_index]
251 entryOpenClasses(end+1,:) = [openClassForEntry.index, eidx];
253 % Track: Use negative entry index to distinguish from call arrivals
254 self.arvproc_classes_updmap{idx}(end+1,:) = [idx, -eidx, ...
255 model.getNodeIndex(sourceStation), openClassForEntry.index];
257 openClassForEntry.completes = false;
258 openClassForEntry.attribute = [LayeredNetworkElement.ENTRY, eidx];
263 % for each activity of the calling task
264 for aidx = lqn.actsof{tidx_caller}
265 if ishostlayer | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2>
267 aidxClass{aidx} = ClosedClass(model, lqn.hashnames{aidx}, 0, clientDelay);
268 clientDelay.setService(aidxClass{aidx}, Disabled.getInstance());
270 serverStation{m}.setService(aidxClass{aidx}, Disabled.getInstance());
272 aidxClass{aidx}.completes = false;
273 aidxClass{aidx}.attribute = [LayeredNetworkElement.ACTIVITY, aidx];
274 model.attribute.activities(end+1,:) = [aidxClass{aidx}.index, aidx];
275 hidx = lqn.parent(lqn.parent(aidx)); % index of host processor
276 if ~(ishostlayer && (hidx == idx))
277 % set the host demand for the activity
278 clientDelay.setService(aidxClass{aidx}, self.servtproc{aidx});
280 if lqn.sched(tidx_caller)~=SchedStrategy.REF % in 'ref
' case the service activity is constant
281 % updmap(end+1,:) = [idx, aidx, 1, idxClass{aidx}.index];
283 if iscachelayer && full(lqn.graph(eidx,aidx))
284 clientDelay.setService(aidxClass{aidx}, self.servtproc{aidx});
287 % Create auxiliary think-time class if activity has think-time
288 if ~isempty(lqn.actthink{aidx}) && lqn.actthink{aidx}.getMean() > GlobalConstants.FineTol
289 aidxThinkClass{aidx} = ClosedClass(model, [lqn.hashnames{aidx},'.Think
'], 0, clientDelay);
290 aidxThinkClass{aidx}.completes = false;
291 aidxThinkClass{aidx}.attribute = [LayeredNetworkElement.ACTIVITY, aidx];
292 clientDelay.setService(aidxThinkClass{aidx}, lqn.actthink{aidx});
294 serverStation{m}.setService(aidxThinkClass{aidx}, Disabled.getInstance());
296 self.actthinkt_classes_updmap{idx}(end+1,:) = [idx, aidx, 1, aidxThinkClass{aidx}.index];
299 % add a class for each outgoing call from this activity
300 for cidx = lqn.callsof{aidx}
301 callmean(cidx) = lqn.callproc{cidx}.getMean;
302 switch lqn.calltype(cidx)
304 if lqn.parent(lqn.callpair(cidx,2)) == idx % add only if the target is serverStation
305 if ~hasSource % we need to add source and sink to the model
307 model.attribute.sourceIdx = length(model.nodes)+1;
308 sourceStation = Source(model,'Source
');
309 sinkStation = Sink(model,'Sink
');
311 cidxClass{cidx} = OpenClass(model, lqn.callhashnames{cidx}, 0);
312 sourceStation.setArrival(cidxClass{cidx}, Immediate.getInstance());
313 clientDelay.setService(cidxClass{cidx}, Disabled.getInstance());
315 serverStation{m}.setService(cidxClass{cidx}, Immediate.getInstance());
317 openClasses(end+1,:) = [cidxClass{cidx}.index, callmean(cidx), cidx];
318 model.attribute.calls(end+1,:) = [cidxClass{cidx}.index, cidx, lqn.callpair(cidx,1), lqn.callpair(cidx,2)];
319 cidxClass{cidx}.completes = false;
320 cidxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
322 for tidx_act = lqn.actsof{idx}
323 minRespT = minRespT + lqn.hostdem{tidx_act}.getMean; % upper bound, uses all activities not just the ones reachable by this entry
326 serverStation{m}.setService(cidxClass{cidx}, Exp.fitMean(minRespT));
330 cidxClass{cidx} = ClosedClass(model, lqn.callhashnames{cidx}, 0, clientDelay);
331 clientDelay.setService(cidxClass{cidx}, Disabled.getInstance());
333 serverStation{m}.setService(cidxClass{cidx}, Disabled.getInstance());
335 model.attribute.calls(end+1,:) = [cidxClass{cidx}.index, cidx, lqn.callpair(cidx,1), lqn.callpair(cidx,2)];
336 cidxClass{cidx}.completes = false;
337 cidxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
339 for tidx_act = lqn.actsof{idx}
340 minRespT = minRespT + lqn.hostdem{tidx_act}.getMean; % upper bound, uses all activities not just the ones reachable by this entry
343 serverStation{m}.setService(cidxClass{cidx}, Exp.fitMean(minRespT));
347 if callmean(cidx) ~= nreplicas
348 switch lqn.calltype(cidx)
350 cidxAuxClass{cidx} = ClosedClass(model, [lqn.callhashnames{cidx},'.Aux
'], 0, clientDelay);
351 cidxAuxClass{cidx}.completes = false;
352 cidxAuxClass{cidx}.attribute = [LayeredNetworkElement.CALL, cidx];
353 clientDelay.setService(cidxAuxClass{cidx}, Immediate.getInstance());
355 serverStation{m}.setService(cidxAuxClass{cidx}, Disabled.getInstance());
360 % For SYNC calls in task layers, create classes for forwarding
361 % calls from the target entry. This implements synthetic
362 % synchronization: the caller blocks until the forwarding
363 % target completes, modeling contention correctly.
364 % FWD calls have the source ENTRY (not activity) in callpair(:,1),
365 % so we scan all calls to find FWD calls from the target entry.
366 % For chain forwarding (e0->e1->e2), recursively follow the chain.
367 if ~ishostlayer && lqn.calltype(cidx) == CallType.SYNC
368 % Collect all FWD calls reachable through forwarding chains
369 entries_to_scan = lqn.callpair(cidx, 2); % start with sync target
370 scanned_entries = [];
371 while ~isempty(entries_to_scan)
372 target_eidx = entries_to_scan(1);
373 entries_to_scan(1) = [];
374 if ismember(target_eidx, scanned_entries)
375 continue; % avoid cycles
377 scanned_entries(end+1) = target_eidx; %#ok<AGROW>
378 for fwd_cidx_iter = 1:lqn.ncalls
379 if lqn.calltype(fwd_cidx_iter) == CallType.FWD && lqn.callpair(fwd_cidx_iter, 1) == target_eidx
380 callmean(fwd_cidx_iter) = lqn.callproc{fwd_cidx_iter}.getMean;
381 cidxClass{fwd_cidx_iter} = ClosedClass(model, lqn.callhashnames{fwd_cidx_iter}, 0, clientDelay);
382 cidxClass{fwd_cidx_iter}.completes = false;
383 cidxClass{fwd_cidx_iter}.attribute = [LayeredNetworkElement.CALL, fwd_cidx_iter];
384 clientDelay.setService(cidxClass{fwd_cidx_iter}, Disabled.getInstance());
386 serverStation{m}.setService(cidxClass{fwd_cidx_iter}, Disabled.getInstance());
388 model.attribute.calls(end+1,:) = [cidxClass{fwd_cidx_iter}.index, fwd_cidx_iter, lqn.callpair(fwd_cidx_iter,1), lqn.callpair(fwd_cidx_iter,2)];
389 % Follow the chain: scan the FWD target for further FWD calls
390 fwd_target_eidx = lqn.callpair(fwd_cidx_iter, 2);
391 if ~ismember(fwd_target_eidx, scanned_entries) && ~ismember(fwd_target_eidx, entries_to_scan)
392 entries_to_scan(end+1) = fwd_target_eidx; %#ok<AGROW>
402% Ensure Source's sourceClasses and arrivalProcess arrays are properly sized
for all
classes
403% This
is needed because the Source may be created during
class iteration
404% when only some
classes exist, and
new closed
classes added afterwards
405% won
't have corresponding entries in sourceClasses/arrivalProcess
407 nClasses = model.getNumberOfClasses();
409 if k > length(sourceStation.input.sourceClasses) || isempty(sourceStation.input.sourceClasses{k})
410 sourceStation.input.sourceClasses{k} = {[], ServiceStrategy.LI, Disabled.getInstance()};
412 if k > length(sourceStation.arrivalProcess) || isempty(sourceStation.arrivalProcess{k})
413 sourceStation.arrivalProcess{k} = Disabled.getInstance();
418P = model.initRoutingMatrix;
420 for o = 1:size(openClasses,1)
421 oidx = openClasses(o,1);
422 p = 1 / openClasses(o,2); % divide by mean number of calls, they go to a server at random
424 P{model.classes{oidx}, model.classes{oidx}}(sourceStation,serverStation{m}) = 1/nreplicas;
426 P{model.classes{oidx}, model.classes{oidx}}(serverStation{m},serverStation{n}) = (1-p)/nreplicas;
428 P{model.classes{oidx}, model.classes{oidx}}(serverStation{m},sinkStation) = p;
430 cidx = openClasses(o,3); % 3 = source
431 self.arvproc_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(sourceStation), oidx];
433 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), oidx];
438%% job positions are encoded as follows: 1=client, 2=any of the nreplicas server stations, 3=cache node, 4=fork node, 5=join node
443jobPos = atClient; % start at client
444% second pass: setup the routing out of entries
445for tidx_caller = callers
446 % Use same condition as first pass - only process if closed class was created
447 hasDirectCallers = false;
448 isForwardingTarget = false;
450 if lqn.isref(tidx_caller)
451 hasDirectCallers = true;
453 for eidx_check = lqn.entriesof{tidx_caller}
454 if any(full(lqn.issynccaller(:, eidx_check))) || any(full(lqn.isasynccaller(:, eidx_check)))
455 hasDirectCallers = true;
458 if isfield(lqn, 'arrival
') && ~isempty(lqn.arrival) && ...
459 iscell(lqn.arrival) && eidx_check <= length(lqn.arrival) && ...
460 ~isempty(lqn.arrival{eidx_check})
461 hasDirectCallers = true;
464 % Check if this entry is a forwarding target
465 for cidx_fwd = 1:lqn.ncalls
466 if full(lqn.calltype(cidx_fwd)) == CallType.FWD && full(lqn.callpair(cidx_fwd, 2)) == eidx_check
467 isForwardingTarget = true;
474 if (ishostlayer && (hasDirectCallers || isForwardingTarget)) | any(any(lqn.issynccaller(tidx_caller, lqn.entriesof{idx}))) %#ok<OR2>
475 % for each entry of the calling task
476 ncaller_entries = length(lqn.entriesof{tidx_caller});
477 for eidx = lqn.entriesof{tidx_caller}
478 aidxClass_eidx = aidxClass{eidx};
479 aidxClass_tidx_caller = aidxClass{tidx_caller};
480 % initialize the probability to select an entry to be identical
481 P{aidxClass_tidx_caller, aidxClass_eidx}(clientDelay, clientDelay) = 1 / ncaller_entries;
482 if ncaller_entries > 1
483 % at successive iterations make sure to replace this with throughput ratio
484 self.route_prob_updmap{idx}(end+1,:) = [idx, tidx_caller, eidx, 1, 1, aidxClass_tidx_caller.index, aidxClass_eidx.index];
486 P = recurActGraph(P, tidx_caller, eidx, aidxClass_eidx, jobPos);
491% Setup routing for entry-level open arrivals (AFTER recurActGraph to avoid being overwritten)
492if hasSource && ~isempty(entryOpenClasses)
493 for e = 1:size(entryOpenClasses,1)
494 eoidx = entryOpenClasses(e,1); % class index
495 openClass = model.classes{eoidx};
497 % Explicitly set routing: ONLY Source → Server → Sink
498 % Zero out all routing for this class first
499 for node1 = 1:length(model.nodes)
500 for node2 = 1:length(model.nodes)
501 P{openClass, openClass}(node1, node2) = 0;
505 % Now set the correct routing
507 % Route: Source → ServerStation → Sink
508 P{openClass, openClass}(sourceStation,serverStation{m}) = 1/nreplicas;
509 P{openClass, openClass}(serverStation{m},sinkStation) = 1.0;
515self.ensemble{idx} = model;
517 function [P, curClass, jobPos] = recurActGraph(P, tidx_caller, aidx, curClass, jobPos)
518 jobPosKey(aidx) = jobPos;
519 curClassKey{aidx} = curClass;
520 nextaidxs = find(lqn.graph(aidx,:)); % these include the called entries
521 if ~isempty(nextaidxs)
522 isNextPrecFork(aidx) = any(isPostAndAct(nextaidxs)); % indexed on aidx to avoid losing it during the recursion
523 % Save curClass/jobPos before fork branch loop so each branch
524 % starts with the same pre-fork state (prevents curClass
525 % corruption across parallel branches)
526 if isNextPrecFork(aidx)
527 forkSaveCurClass = curClass;
528 forkSaveJobPos = jobPos;
532 for nextaidx = nextaidxs % for all successor activities
533 if ~isempty(nextaidx)
534 % Restore pre-fork state for each branch iteration
535 if isNextPrecFork(aidx)
536 curClass = forkSaveCurClass;
537 jobPos = forkSaveJobPos;
540 % in the activity graph, the following if is entered only
541 % by an edge that is the return from a LOOP activity
542 if (lqn.graph(aidx,nextaidx) ~= lqn.dag(aidx,nextaidx))
545 if ~(lqn.parent(aidx) == lqn.parent(nextaidx)) % if different parent task
546 % if the successor activity is an entry of another task, this is a call
547 cidx = matchrow(lqn.callpair,[aidx,nextaidx]); % find the call index
548 switch lqn.calltype(cidx)
550 % Async calls don't modify caller routing - caller continues immediately without blocking.
551 % Arrival rate at destination
is handled via arvproc_classes_updmap (lines 170-195, 230-248).
553 [
P, jobPos, curClass] = routeSynchCall(
P, jobPos, curClass);
554 % Synthetic forwarding: model blocking through
555 % forwarding chains. Following LQNS, the caller
556 % blocks through each hop in the chain. Process
557 % forwarding calls level-by-level (grouped by
558 % source entry) to handle both chains and
559 % multi-destination forwarding.
561 sync_target_eidx = lqn.callpair(cidx, 2);
562 % Process forwarding levels in BFS order
563 cur_level_entries = sync_target_eidx;
564 visited_entries = [];
566 while ~isempty(cur_level_entries)
567 next_level_entries = [];
568 for le = 1:length(cur_level_entries)
569 src_eidx = cur_level_entries(le);
570 if ismember(src_eidx, visited_entries),
continue; end
571 visited_entries(end+1) = src_eidx; %#ok<AGROW>
572 % Collect FWD calls from
this entry
574 for fc_iter = 1:lqn.ncalls
575 if lqn.calltype(fc_iter) == CallType.FWD && lqn.callpair(fc_iter, 1) == src_eidx ...
576 && length(cidxClass) >= fc_iter && ~isempty(cidxClass{fc_iter})
577 level_calls(end+1) = fc_iter; %#ok<AGROW>
580 if isempty(level_calls),
continue; end
582 total_prob = sum(callmean(level_calls));
583 need_merge = (length(level_calls) > 1) || (total_prob < 1.0 - GlobalConstants.FineTol);
585 fwdMergeClass = ClosedClass(model, [
'FwdMerge_', lqn.hashnames{src_eidx}], 0, clientDelay);
586 fwdMergeClass.completes =
false;
587 fwdMergeClass.attribute = [-1, -1]; % synthetic, skipped in getEnsembleAvg
588 clientDelay.setService(fwdMergeClass, Immediate.getInstance());
590 serverStation{m}.setService(fwdMergeClass, Disabled.getInstance());
592 for fk = 1:length(level_calls)
593 lc = level_calls(fk);
594 P{curClass, cidxClass{lc}}(clientDelay, clientDelay) = callmean(lc);
595 P{cidxClass{lc}, fwdMergeClass}(clientDelay, clientDelay) = 1.0;
596 clientDelay.setService(cidxClass{lc}, callservtproc{lc});
597 self.call_classes_updmap{idx}(end+1,:) = [idx, lc, 1, cidxClass{lc}.index];
599 fwd_tgt = lqn.callpair(lc, 2);
600 if ~ismember(fwd_tgt, visited_entries) && ~ismember(fwd_tgt, next_level_entries)
601 next_level_entries(end+1) = fwd_tgt; %#ok<AGROW>
604 if total_prob < 1.0 - GlobalConstants.FineTol
605 P{curClass, fwdMergeClass}(clientDelay, clientDelay) = 1.0 - total_prob;
607 curClass = fwdMergeClass;
609 % Single FWD with prob 1.0
610 fwd_cidx_iter = level_calls(1);
611 P{curClass, cidxClass{fwd_cidx_iter}}(clientDelay, clientDelay) = 1.0;
612 clientDelay.setService(cidxClass{fwd_cidx_iter}, callservtproc{fwd_cidx_iter});
613 self.call_classes_updmap{idx}(end+1,:) = [idx, fwd_cidx_iter, 1, cidxClass{fwd_cidx_iter}.index];
614 curClass = cidxClass{fwd_cidx_iter};
616 fwd_tgt = lqn.callpair(fwd_cidx_iter, 2);
617 if ~ismember(fwd_tgt, visited_entries) && ~ismember(fwd_tgt, next_level_entries)
618 next_level_entries(end+1) = fwd_tgt; %#ok<AGROW>
622 cur_level_entries = next_level_entries;
629 % In the host layer, forwarding targets have their own
630 % jobs competing
for the processor independently.
631 % The source task
's job returns to think time after
632 % completing its own activities; no routing through
633 % the forwarding target's activities
is needed.
634 % curClass and jobPos remain unchanged.
637 % at
this point, we have processed all calls, let us
do the
638 % activities local to the task next
639 if isempty(intersect(lqn.eshift+(1:lqn.nentries), nextaidxs))
640 % if next activity
is not an entry
641 jobPos = jobPosKey(aidx);
642 curClass = curClassKey{aidx};
644 if ismember(nextaidxs(find(nextaidxs==nextaidx)-1), lqn.eshift+(1:lqn.nentries))
645 curClassC = curClass;
648 curClass = curClassC;
650 if jobPos == atClient % at client node
654 if isNextPrecFork(aidx)
655 %
if next activity
is a post-and
656 P{curClass, curClass}(clientDelay, forkNode) = 1.0;
657 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
658 forkClassStack(end+1) = curClass.index;
659 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
660 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
663 % before entering the job we go back to the entry
class at the last fork
664 forkClass = model.classes{forkClassStack(end)};
665 forkClassStack(end) = [];
666 P{curClass, forkClass}(clientDelay,joinNode) = 1.0;
667 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
669 P{curClass, aidxClass{nextaidx}}(clientDelay,serverStation{m}) = full(lqn.graph(aidx,nextaidx));
672 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
674 serverStation{m}.setDelayOff(aidxClass{nextaidx}, lqn.setuptime{lqn.parent(nextaidx)}, lqn.delayofftime{lqn.parent(nextaidx)});
678 curClass = aidxClass{nextaidx};
679 self.servt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 2, aidxClass{nextaidx}.index];
681 P{curClass, aidxClass{nextaidx}}(clientDelay,cacheNode) = full(lqn.graph(aidx,nextaidx));
683 cacheNode.setReadItemEntry(aidxClass{nextaidx},lqn.itemproc{aidx},lqn.nitems(aidx));
684 lqn.hitmissaidx = find(lqn.graph(nextaidx,:));
685 lqn.hitaidx = lqn.hitmissaidx(1);
686 lqn.missaidx = lqn.hitmissaidx(2);
688 cacheNode.setHitClass(aidxClass{nextaidx},aidxClass{lqn.hitaidx});
689 cacheNode.setMissClass(aidxClass{nextaidx},aidxClass{lqn.missaidx});
691 jobPos = atCache; % cache
692 curClass = aidxClass{nextaidx};
693 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, lqn.hitaidx, 3, 3, aidxClass{nextaidx}.index, aidxClass{lqn.hitaidx}.index];
694 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, lqn.missaidx, 3, 3, aidxClass{nextaidx}.index, aidxClass{lqn.missaidx}.index];
696 else % not ishostlayer
697 if isNextPrecFork(aidx)
698 %
if next activity
is a post-and
699 P{curClass, curClass}(clientDelay, forkNode) = 1.0;
700 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
701 forkClassStack(end+1) = curClass.index;
702 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
703 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
706 % before entering the job we go back to the entry
class at the last fork
707 forkClass = model.classes{forkClassStack(end)};
708 forkClassStack(end) = [];
709 P{curClass, forkClass}(clientDelay,joinNode) = 1.0;
710 P{forkClass, aidxClass{nextaidx}}(joinNode,clientDelay) = 1.0;
712 P{curClass, aidxClass{nextaidx}}(clientDelay,clientDelay) = full(lqn.graph(aidx,nextaidx));
716 curClass = aidxClass{nextaidx};
717 clientDelay.setService(aidxClass{nextaidx}, self.servtproc{nextaidx});
718 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 1, aidxClass{nextaidx}.index];
720 elseif jobPos == atServer || jobPos == atCache % at server station
723 curClass = aidxClass{nextaidx};
725 if isNextPrecFork(aidx)
726 %
if next activity
is a post-and
727 P{curClass, curClass}(cacheNode, forkNode) = 1.0;
728 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
729 forkClassStack(end+1) = curClass.index;
730 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
731 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
734 % before entering the job we go back to the entry
class at the last fork
735 forkClass = model.classes{forkClassStack(end)};
736 forkClassStack(end) = [];
738 P{curClass, forkClass}(cacheNode,joinNode) = 1.0;
739 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
741 P{curClass, aidxClass{nextaidx}}(cacheNode,serverStation{m}) = full(lqn.graph(aidx,nextaidx));
744 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
745 %self.route_prob_updmap{idx}(end+1,:) = [idx, nextaidx, nextaidx, 3, 2, aidxClass{nextaidx}.index, aidxClass{nextaidx}.index];
749 if isNextPrecFork(aidx)
750 %
if next activity
is a post-and
751 P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
752 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
753 forkClassStack(end+1) = curClass.index;
754 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
755 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, serverStation{m}) = 1.0;
758 % before entering the job we go back to the entry
class at the last fork
759 forkClass = model.classes{forkClassStack(end)};
760 forkClassStack(end) = [];
761 P{curClass, forkClass}(serverStation{m},joinNode) = 1.0;
762 P{forkClass, aidxClass{nextaidx}}(joinNode,serverStation{m}) = 1.0;
764 P{curClass, aidxClass{nextaidx}}(serverStation{m},serverStation{m}) = full(lqn.graph(aidx,nextaidx));
767 serverStation{m}.setService(aidxClass{nextaidx}, lqn.hostdem{nextaidx});
771 curClass = aidxClass{nextaidx};
772 self.servt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 2, aidxClass{nextaidx}.index];
775 if isNextPrecFork(aidx)
776 %
if next activity
is a post-and
777 P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
778 f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
779 forkClassStack(end+1) = curClass.index;
780 P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
781 P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
784 % before entering the job we go back to the entry
class at the last fork
785 forkClass = model.classes{forkClassStack(end)};
786 forkClassStack(end) = [];
787 P{curClass, forkClass}(serverStation{m},joinNode) = 1.0;
788 P{forkClass, aidxClass{nextaidx}}(joinNode,clientDelay) = 1.0;
790 P{curClass, aidxClass{nextaidx}}(serverStation{m},clientDelay) = full(lqn.graph(aidx,nextaidx));
795 curClass = aidxClass{nextaidx};
796 clientDelay.setService(aidxClass{nextaidx}, self.servtproc{nextaidx});
797 self.thinkt_classes_updmap{idx}(end+1,:) = [idx, nextaidx, 1, aidxClass{nextaidx}.index];
800 if aidx ~= nextaidx && ~isLoop
801 %% now recursively build the rest of the routing matrix graph
802 [
P, curClass, jobPos] = recurActGraph(
P, tidx_caller, nextaidx, curClass, jobPos);
804 % At
this point curClassRec
is the last
class in the
805 % recursive branch, which we now close with a reply
806 if jobPos == atClient
807 P{curClass, aidxClass{tidx_caller}}(clientDelay,clientDelay) = 1;
808 if ~strcmp(curClass.name(end-3:end),
'.Aux')
809 curClass.completes = true;
813 P{curClass, aidxClass{tidx_caller}}(serverStation{m},clientDelay) = 1;
815 if ~strcmp(curClass.name(end-3:end),
'.Aux')
816 curClass.completes = true;
825 function [
P, jobPos, curClass] = routeSynchCall(
P, jobPos, curClass)
828 if lqn.parent(lqn.callpair(cidx,2)) == idx
829 %
if a call to an entry of the server in
this layer
830 if callmean(cidx) < nreplicas
831 P{curClass, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1 - callmean(cidx); % note that callmean(cidx) < nreplicas
833 %
if isNextPrecFork(aidx)
835 % %
if next activity
is a post-and
836 %
P{curClass, curClass}(serverStation{m}, forkNode) = 1.0;
837 % forkStackClass(end+1) = curClass.index;
838 % f = find(nextaidx == nextaidxs(isPostAndAct(nextaidxs)));
839 %
P{curClass, curClass}(forkNode, forkOutputRouter{f}) = 1.0;
840 %
P{curClass, aidxClass{nextaidx}}(forkOutputRouter{f}, clientDelay) = 1.0;
842 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = callmean(cidx) / nreplicas;
843 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},clientDelay) = 1.0; % not needed, just to avoid leaving the Aux
class disconnected
845 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,clientDelay) = 1.0; % not needed, just to avoid leaving the Aux
class disconnected
846 elseif callmean(cidx) == nreplicas
848 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1 / nreplicas;
849 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},clientDelay) = 1.0;
851 else % callmean(cidx) > nreplicas
853 P{curClass, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1 / nreplicas;
854 P{cidxClass{cidx}, cidxAuxClass{cidx}}(serverStation{m},clientDelay) = 1.0 ;
855 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,serverStation{m}) = 1.0 - 1.0 / (callmean(cidx) / nreplicas);
857 P{cidxAuxClass{cidx}, cidxClass{cidx}}(clientDelay,clientDelay) = 1.0 / (callmean(cidx));
860 clientDelay.setService(cidxClass{cidx}, Immediate.getInstance());
862 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
863 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), cidxClass{cidx}.index];
865 curClass = cidxClass{cidx};
867 %
if it
is not a call to an entry of the server
868 if callmean(cidx) < nreplicas
869 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = callmean(cidx)/nreplicas; % the mean number of calls
is now embedded in the demand
870 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1; % the mean number of calls
is now embedded in the demand
871 P{curClass, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1 - callmean(cidx)/nreplicas; % the mean number of calls
is now embedded in the demand
872 curClass = cidxAuxClass{cidx};
873 elseif callmean(cidx) == nreplicas
874 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = 1;
875 curClass = cidxClass{cidx};
876 else % callmean(cidx) > nreplicas
877 P{curClass, cidxClass{cidx}}(clientDelay,clientDelay) = 1; % the mean number of calls
is now embedded in the demand
878 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;% / (callmean(cidx)/nreplicas); % the mean number of calls
is now embedded in the demand
879 curClass = cidxAuxClass{cidx};
882 clientDelay.setService(cidxClass{cidx}, callservtproc{cidx});
883 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, 1, cidxClass{cidx}.index];
885 case atServer % job at server
886 if lqn.parent(lqn.callpair(cidx,2)) == idx
887 %
if it
is a call to an entry of the server
888 if callmean(cidx) < nreplicas
890 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1 - callmean(cidx);
891 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = callmean(cidx);
892 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
895 curClass = cidxAuxClass{cidx};
896 elseif callmean(cidx) == nreplicas
898 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1;
901 curClass = cidxClass{cidx};
902 else % callmean(cidx) > nreplicas
904 P{curClass, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1;
905 P{cidxClass{cidx}, cidxClass{cidx}}(serverStation{m},serverStation{m}) = 1 - 1 / (callmean(cidx));
906 P{cidxClass{cidx}, cidxAuxClass{cidx}}(serverStation{m},clientDelay) = 1 / (callmean(cidx));
909 curClass = cidxAuxClass{cidx};
912 serverStation{m}.setService(cidxClass{cidx}, callservtproc{cidx});
913 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, model.getNodeIndex(serverStation{m}), cidxClass{cidx}.index];
916 %
if it
is not a call to an entry of the server
917 % callmean not needed since we switched
918 % to ResidT to model service time at client
919 if callmean(cidx) < nreplicas
921 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
923 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;
924 curClass = cidxAuxClass{cidx};
925 elseif callmean(cidx) == nreplicas
927 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
929 curClass = cidxClass{cidx};
930 else % callmean(cidx) > nreplicas
932 P{curClass, cidxClass{cidx}}(serverStation{m},clientDelay) = 1;
934 P{cidxClass{cidx}, cidxAuxClass{cidx}}(clientDelay,clientDelay) = 1;
935 curClass = cidxAuxClass{cidx};
938 clientDelay.setService(cidxClass{cidx}, callservtproc{cidx});
939 self.call_classes_updmap{idx}(end+1,:) = [idx, cidx, 1, cidxClass{cidx}.index];
943 % After synch call returns, route through activity think-time
class if applicable
944 callingAidx = lqn.callpair(cidx, 1); % source activity of the call
945 if ~isempty(aidxThinkClass{callingAidx})
946 % Route from current
class to think-time
class at client
947 P{curClass, aidxThinkClass{callingAidx}}(clientDelay, clientDelay) = 1.0;
948 curClass = aidxThinkClass{callingAidx};