LINE Solver
MATLAB API documentation
Loading...
Searching...
No Matches
WfCommonsLoader.m
1classdef WfCommonsLoader < handle
2 % WfCommonsLoader - Load WfCommons JSON workflows into LINE Workflow objects.
3 %
4 % WfCommonsLoader provides static methods to load workflow traces from
5 % the WfCommons format (https://github.com/wfcommons/workflow-schema) into
6 % LINE Workflow objects for queueing analysis.
7 %
8 % Supported schema versions: 1.4, 1.5
9 %
10 % Example:
11 % wf = WfCommonsLoader.load('workflow.json');
12 % ph = wf.toPH();
13 %
14 % Example with options:
15 % options.distributionType = 'exp';
16 % options.defaultRuntime = 1.0;
17 % wf = WfCommonsLoader.load('workflow.json', options);
18 %
19 % Copyright (c) 2012-2026, Imperial College London
20 % All rights reserved.
21
22 properties (Constant)
23 SUPPORTED_SCHEMA_VERSIONS = {'1.3', '1.4', '1.5'};
24 end
25
26 methods (Static)
27 function wf = load(jsonFile, options)
28 % LOAD Load a WfCommons JSON file into a Workflow object.
29 %
30 % WF = WFCOMMONSLOADER.LOAD(JSONFILE)
31 % WF = WFCOMMONSLOADER.LOAD(JSONFILE, OPTIONS)
32 %
33 % Parameters:
34 % jsonFile - Path to WfCommons JSON file
35 % options - Optional struct with:
36 % .distributionType - 'exp' (default), 'det', 'aph', 'hyperexp'
37 % .defaultSCV - Default SCV for APH/HyperExp (default: 1.0)
38 % .defaultRuntime - Default runtime when missing (default: 1.0)
39 % .useExecutionData - Use execution data if available (default: true)
40 % .storeMetadata - Store WfCommons metadata (default: true)
41 %
42 % Returns:
43 % wf - Workflow object
44
45 if nargin < 2
46 options = struct();
47 end
48 options = WfCommonsLoader.parseOptions(options);
49
50 % Read and parse JSON
51 jsonStr = fileread(jsonFile);
52 data = jsondecode(jsonStr);
53
54 % Validate schema
55 WfCommonsLoader.validateSchema(data);
56
57 % Extract workflow name
58 workflowName = WfCommonsLoader.extractName(data, jsonFile);
59
60 % Build workflow
61 wf = WfCommonsLoader.buildWorkflow(data, workflowName, options);
62 end
63
64 function wf = loadFromStruct(data, options)
65 % LOADFROMSTRUCT Load from pre-parsed struct.
66 %
67 % WF = WFCOMMONSLOADER.LOADFROMSTRUCT(DATA)
68 % WF = WFCOMMONSLOADER.LOADFROMSTRUCT(DATA, OPTIONS)
69 %
70 % Parameters:
71 % data - Struct from jsondecode
72 % options - Options struct (see load method)
73 %
74 % Returns:
75 % wf - Workflow object
76
77 if nargin < 2
78 options = struct();
79 end
80 options = WfCommonsLoader.parseOptions(options);
81
82 WfCommonsLoader.validateSchema(data);
83 workflowName = WfCommonsLoader.extractName(data, 'struct_input');
84 wf = WfCommonsLoader.buildWorkflow(data, workflowName, options);
85 end
86
87 function wf = loadFromUrl(urlString, options)
88 % LOADFROMURL Load WfCommons JSON from a URL.
89 %
90 % WF = WFCOMMONSLOADER.LOADFROMURL(URLSTRING)
91 % WF = WFCOMMONSLOADER.LOADFROMURL(URLSTRING, OPTIONS)
92 %
93 % Useful for loading workflows directly from repositories like
94 % wfcommons/pegasus-instances.
95 %
96 % Parameters:
97 % urlString - URL pointing to WfCommons JSON file
98 % options - Options struct (see load method)
99 %
100 % Returns:
101 % wf - Workflow object
102
103 if nargin < 2
104 options = struct();
105 end
106 options = WfCommonsLoader.parseOptions(options);
107
108 % Fetch content from URL
109 jsonStr = webread(urlString);
110
111 % Parse JSON
112 data = jsondecode(jsonStr);
113
114 % Validate schema
115 WfCommonsLoader.validateSchema(data);
116
117 % Extract name from URL if not in JSON
118 [~, defaultName, ~] = fileparts(urlString);
119
120 % Build workflow
121 workflowName = WfCommonsLoader.extractName(data, defaultName);
122 wf = WfCommonsLoader.buildWorkflow(data, workflowName, options);
123 end
124
125 function isValid = validateFile(jsonFile)
126 % VALIDATEFILE Check if file is valid WfCommons schema.
127 %
128 % ISVALID = WFCOMMONSLOADER.VALIDATEFILE(JSONFILE)
129 %
130 % Returns:
131 % isValid - true if file is valid WfCommons format
132
133 try
134 jsonStr = fileread(jsonFile);
135 data = jsondecode(jsonStr);
136 WfCommonsLoader.validateSchema(data);
137 isValid = true;
138 catch
139 isValid = false;
140 end
141 end
142 end
143
144 methods (Static, Access = private)
145 function options = parseOptions(options)
146 % PARSEOPTIONS Set default option values.
147
148 if ~isfield(options, 'distributionType')
149 options.distributionType = 'exp';
150 end
151 if ~isfield(options, 'defaultSCV')
152 options.defaultSCV = 1.0;
153 end
154 if ~isfield(options, 'defaultRuntime')
155 options.defaultRuntime = 1.0;
156 end
157 if ~isfield(options, 'useExecutionData')
158 options.useExecutionData = true;
159 end
160 if ~isfield(options, 'storeMetadata')
161 options.storeMetadata = true;
162 end
163 end
164
165 function validateSchema(data)
166 % VALIDATESCHEMA Validate required fields in WfCommons schema.
167
168 % Check schemaVersion
169 if ~isfield(data, 'schemaVersion')
170 line_error(mfilename, 'Missing schemaVersion field in WfCommons JSON.');
171 end
172
173 version = data.schemaVersion;
174 if ~any(strcmp(version, WfCommonsLoader.SUPPORTED_SCHEMA_VERSIONS))
175 line_warning(mfilename, 'Schema version %s may not be fully supported.', version);
176 end
177
178 % Check workflow structure
179 if ~isfield(data, 'workflow')
180 line_error(mfilename, 'Missing workflow field in WfCommons JSON.');
181 end
182
183 % Schema 1.4 and earlier use workflow.tasks directly
184 % Schema 1.5+ uses workflow.specification.tasks
185 hasTasks = false;
186 if isfield(data.workflow, 'specification')
187 if isfield(data.workflow.specification, 'tasks') && ~isempty(data.workflow.specification.tasks)
188 hasTasks = true;
189 end
190 elseif isfield(data.workflow, 'tasks') && ~isempty(data.workflow.tasks)
191 % Schema 1.4 format: tasks directly under workflow
192 hasTasks = true;
193 end
194
195 if ~hasTasks
196 line_error(mfilename, 'Workflow must have at least one task.');
197 end
198 end
199
200 function name = extractName(data, defaultName)
201 % EXTRACTNAME Extract workflow name from data or use default.
202
203 if isfield(data, 'name') && ~isempty(data.name)
204 name = data.name;
205 else
206 [~, name, ~] = fileparts(defaultName);
207 end
208 % Sanitize name for MATLAB
209 name = regexprep(name, '[^a-zA-Z0-9_]', '_');
210 if isempty(name)
211 name = 'Workflow';
212 end
213 end
214
215 function wf = buildWorkflow(data, workflowName, options)
216 % BUILDWORKFLOW Build Workflow object from parsed data.
217
218 wf = Workflow(workflowName);
219
220 % Detect schema format: 1.5+ uses specification.tasks, 1.4 uses tasks directly
221 isLegacySchema = ~isfield(data.workflow, 'specification');
222 if isLegacySchema
223 % Schema 1.4 format: tasks directly under workflow with embedded runtime
224 tasks = data.workflow.tasks;
225 else
226 % Schema 1.5+ format: tasks under specification
227 spec = data.workflow.specification;
228 tasks = spec.tasks;
229 end
230
231 % Build execution data map if available
232 execMap = containers.Map();
233 if options.useExecutionData
234 if isLegacySchema
235 % Schema 1.4: runtime data is embedded in task objects
236 if isstruct(tasks)
237 for i = 1:length(tasks)
238 task = tasks(i);
239 taskId = WfCommonsLoader.getTaskId(task, i);
240 execMap(taskId) = task;
241 end
242 elseif iscell(tasks)
243 for i = 1:length(tasks)
244 task = tasks{i};
245 taskId = WfCommonsLoader.getTaskId(task, i);
246 execMap(taskId) = task;
247 end
248 end
249 elseif isfield(data.workflow, 'execution')
250 % Schema 1.5+: separate execution section
251 exec = data.workflow.execution;
252 if isfield(exec, 'tasks')
253 execTasks = exec.tasks;
254 if isstruct(execTasks)
255 for i = 1:length(execTasks)
256 execMap(execTasks(i).id) = execTasks(i);
257 end
258 elseif iscell(execTasks)
259 for i = 1:length(execTasks)
260 execMap(execTasks{i}.id) = execTasks{i};
261 end
262 end
263 end
264 end
265 end
266
267 % Phase 1: Create all activities
268 taskMap = containers.Map();
269 taskIdxMap = containers.Map();
270 n = length(tasks);
271
272 for i = 1:n
273 if isstruct(tasks)
274 task = tasks(i);
275 else
276 task = tasks{i};
277 end
278
279 % Get task ID (schema 1.4 may use 'name' instead of 'id')
280 taskId = WfCommonsLoader.getTaskId(task, i);
281
282 % Get runtime from execution data
283 runtime = options.defaultRuntime;
284 if isKey(execMap, taskId)
285 execData = execMap(taskId);
286 if isfield(execData, 'runtimeInSeconds')
287 runtime = execData.runtimeInSeconds;
288 end
289 end
290
291 % Fit distribution
292 dist = WfCommonsLoader.fitDistribution(runtime, options);
293
294 % Create activity
295 act = wf.addActivity(taskId, dist);
296 taskMap(taskId) = act;
297 taskIdxMap(taskId) = i;
298
299 % Store metadata if requested
300 if options.storeMetadata
301 act.metadata = WfCommonsLoader.extractMetadata(task, taskId, execMap);
302 end
303 end
304
305 % Phase 2: Build adjacency structures
306 [adjList, inDeg, outDeg, predList] = WfCommonsLoader.buildAdjacency(tasks, taskIdxMap);
307
308 % Phase 3: Detect and add precedences
309 WfCommonsLoader.addPrecedences(wf, tasks, taskMap, adjList, inDeg, outDeg, predList);
310 end
311
312 function dist = fitDistribution(runtime, options)
313 % FITDISTRIBUTION Fit a distribution from runtime.
314
315 if runtime <= GlobalConstants.FineTol
316 dist = Immediate.getInstance();
317 else
318 switch lower(options.distributionType)
319 case 'exp'
320 dist = Exp.fitMean(runtime);
321 case 'det'
322 dist = Det(runtime);
323 case 'aph'
324 dist = APH.fitMeanAndSCV(runtime, options.defaultSCV);
325 case 'hyperexp'
326 if options.defaultSCV > 1.0
327 dist = HyperExp.fitMeanAndSCV(runtime, options.defaultSCV);
328 else
329 dist = Exp.fitMean(runtime);
330 end
331 otherwise
332 dist = Exp.fitMean(runtime);
333 end
334 end
335 end
336
337 function taskId = getTaskId(task, idx)
338 % GETTASKID Extract task ID from task object.
339 % Schema 1.4 may use 'name' as identifier, Schema 1.5+ uses 'id'
340 if isfield(task, 'id')
341 taskId = task.id;
342 elseif isfield(task, 'name')
343 taskId = task.name;
344 else
345 taskId = sprintf('task_%d', idx);
346 end
347 end
348
349 function metadata = extractMetadata(task, taskId, execMap)
350 % EXTRACTMETADATA Extract WfCommons metadata from task.
351
352 metadata = struct();
353 metadata.taskId = taskId;
354
355 if isfield(task, 'name')
356 metadata.name = task.name;
357 end
358 if isfield(task, 'inputFiles')
359 metadata.inputFiles = task.inputFiles;
360 end
361 if isfield(task, 'outputFiles')
362 metadata.outputFiles = task.outputFiles;
363 end
364
365 if isKey(execMap, taskId)
366 exec = execMap(taskId);
367 fields = {'executedAt', 'command', 'coreCount', 'avgCPU', ...
368 'readBytes', 'writtenBytes', 'memoryInBytes', ...
369 'energyInKWh', 'avgPowerInW', 'priority', 'machines'};
370 for j = 1:length(fields)
371 f = fields{j};
372 if isfield(exec, f)
373 metadata.(f) = exec.(f);
374 end
375 end
376 end
377 end
378
379 function [adjList, inDeg, outDeg, predList] = buildAdjacency(tasks, taskIdxMap)
380 % BUILDADJACENCY Build adjacency structures from tasks.
381
382 n = length(tasks);
383 adjList = cell(n, 1);
384 predList = cell(n, 1);
385 inDeg = zeros(n, 1);
386 outDeg = zeros(n, 1);
387
388 for i = 1:n
389 adjList{i} = [];
390 predList{i} = [];
391 end
392
393 for i = 1:n
394 if isstruct(tasks)
395 task = tasks(i);
396 else
397 task = tasks{i};
398 end
399
400 % Process children
401 if isfield(task, 'children') && ~isempty(task.children)
402 children = task.children;
403 if ischar(children)
404 children = {children};
405 end
406 for j = 1:length(children)
407 childId = children{j};
408 if isKey(taskIdxMap, childId)
409 childIdx = taskIdxMap(childId);
410 adjList{i} = [adjList{i}, childIdx];
411 predList{childIdx} = [predList{childIdx}, i];
412 outDeg(i) = outDeg(i) + 1;
413 inDeg(childIdx) = inDeg(childIdx) + 1;
414 end
415 end
416 end
417 end
418 end
419
420 function addPrecedences(wf, tasks, taskMap, adjList, inDeg, outDeg, predList)
421 % ADDPRECEDENCES Detect patterns and add precedences to workflow.
422
423 n = length(tasks);
424 processed = false(n, n); % Track processed edges
425
426 % Step 1: Identify and process fork-join pairs
427 for i = 1:n
428 if outDeg(i) > 1
429 % Potential AND-fork
430 children = adjList{i};
431
432 % Find common join point
433 joinPoint = WfCommonsLoader.findCommonJoin(children, adjList, inDeg, n);
434
435 if ~isempty(joinPoint)
436 % Valid fork-join structure
437 if isstruct(tasks)
438 preTask = tasks(i);
439 postTask = tasks(joinPoint);
440 else
441 preTask = tasks{i};
442 postTask = tasks{joinPoint};
443 end
444
445 preTaskId = WfCommonsLoader.getTaskId(preTask, i);
446 preAct = taskMap(preTaskId);
447 postActs = cell(1, length(children));
448 for j = 1:length(children)
449 if isstruct(tasks)
450 childTask = tasks(children(j));
451 else
452 childTask = tasks{children(j)};
453 end
454 childTaskId = WfCommonsLoader.getTaskId(childTask, children(j));
455 postActs{j} = taskMap(childTaskId);
456 end
457 postTaskId = WfCommonsLoader.getTaskId(postTask, joinPoint);
458 postAct = taskMap(postTaskId);
459
460 % Add fork and join
461 wf.addPrecedence(Workflow.AndFork(preAct, postActs));
462 wf.addPrecedence(Workflow.AndJoin(postActs, postAct));
463
464 % Mark edges as processed
465 for j = 1:length(children)
466 processed(i, children(j)) = true;
467 processed(children(j), joinPoint) = true;
468 end
469 end
470 end
471 end
472
473 % Step 2: Add remaining edges as serial connections
474 for i = 1:n
475 for j = adjList{i}
476 if ~processed(i, j)
477 if isstruct(tasks)
478 preTask = tasks(i);
479 postTask = tasks(j);
480 else
481 preTask = tasks{i};
482 postTask = tasks{j};
483 end
484
485 preTaskId = WfCommonsLoader.getTaskId(preTask, i);
486 postTaskId = WfCommonsLoader.getTaskId(postTask, j);
487 preAct = taskMap(preTaskId);
488 postAct = taskMap(postTaskId);
489 wf.addPrecedence(Workflow.Serial(preAct, postAct));
490 end
491 end
492 end
493 end
494
495 function joinPoint = findCommonJoin(children, adjList, inDeg, n)
496 % FINDCOMMONJOIN Find a common join point for all children.
497
498 if isempty(children)
499 joinPoint = [];
500 return;
501 end
502
503 % Use BFS from each child to find reachable nodes
504 reachable = cell(1, length(children));
505 for i = 1:length(children)
506 reachable{i} = WfCommonsLoader.getReachableNodes(children(i), adjList, n);
507 end
508
509 % Find intersection
510 common = reachable{1};
511 for i = 2:length(children)
512 common = intersect(common, reachable{i});
513 end
514
515 if isempty(common)
516 joinPoint = [];
517 return;
518 end
519
520 % Find the first common node that has all children as predecessors
521 % (i.e., in-degree matches number of children from this fork)
522 for node = common(:)'
523 if inDeg(node) >= length(children)
524 % Check if all children can reach this node directly
525 allDirectChild = true;
526 for i = 1:length(children)
527 if ~ismember(node, adjList{children(i)})
528 allDirectChild = false;
529 break;
530 end
531 end
532 if allDirectChild
533 joinPoint = node;
534 return;
535 end
536 end
537 end
538
539 joinPoint = [];
540 end
541
542 function reachable = getReachableNodes(start, adjList, n)
543 % GETREACHABLENODES BFS to find all reachable nodes.
544
545 visited = false(1, n);
546 queue = start;
547 visited(start) = true;
548 reachable = [];
549
550 while ~isempty(queue)
551 current = queue(1);
552 queue(1) = [];
553
554 for next = adjList{current}
555 if ~visited(next)
556 visited(next) = true;
557 queue = [queue, next];
558 reachable = [reachable, next];
559 end
560 end
561 end
562 end
563 end
564end