1classdef WfCommonsLoader < handle
2 % WfCommonsLoader - Load WfCommons JSON workflows into LINE Workflow objects.
4 % WfCommonsLoader provides
static methods to load workflow traces from
5 % the WfCommons format (https:
6 % LINE Workflow objects for queueing analysis.
8 % Supported schema versions: 1.4, 1.5
11 % wf = WfCommonsLoader.load(
'workflow.json');
14 % Example with options:
15 % options.distributionType =
'exp';
16 % options.defaultRuntime = 1.0;
17 % wf = WfCommonsLoader.load(
'workflow.json', options);
19 % Copyright (c) 2012-2026, Imperial College London
20 % All rights reserved.
23 SUPPORTED_SCHEMA_VERSIONS = {
'1.3',
'1.4',
'1.5'};
27 function wf = load(jsonFile, options)
28 % LOAD Load a WfCommons JSON file into a Workflow
object.
30 % WF = WFCOMMONSLOADER.LOAD(JSONFILE)
31 % WF = WFCOMMONSLOADER.LOAD(JSONFILE, OPTIONS)
34 % jsonFile - Path to WfCommons JSON file
35 % options - Optional
struct with:
36 % .distributionType - 'exp' (default), 'det', '
aph', 'hyperexp'
37 % .defaultSCV - Default SCV for APH/HyperExp (default: 1.0)
38 % .defaultRuntime - Default runtime when missing (default: 1.0)
39 % .useExecutionData - Use execution data if available (default: true)
40 % .storeMetadata - Store WfCommons metadata (default: true)
43 % wf - Workflow object
48 options = WfCommonsLoader.parseOptions(options);
51 jsonStr = fileread(jsonFile);
52 data = jsondecode(jsonStr);
55 WfCommonsLoader.validateSchema(data);
57 % Extract workflow name
58 workflowName = WfCommonsLoader.extractName(data, jsonFile);
61 wf = WfCommonsLoader.buildWorkflow(data, workflowName, options);
64 function wf = loadFromStruct(data, options)
65 % LOADFROMSTRUCT Load from pre-parsed struct.
67 % WF = WFCOMMONSLOADER.LOADFROMSTRUCT(DATA)
68 % WF = WFCOMMONSLOADER.LOADFROMSTRUCT(DATA, OPTIONS)
71 % data - Struct from jsondecode
72 % options - Options struct (see load method)
75 % wf - Workflow object
80 options = WfCommonsLoader.parseOptions(options);
82 WfCommonsLoader.validateSchema(data);
83 workflowName = WfCommonsLoader.extractName(data, 'struct_input');
84 wf = WfCommonsLoader.buildWorkflow(data, workflowName, options);
87 function wf = loadFromUrl(urlString, options)
88 % LOADFROMURL Load WfCommons JSON from a URL.
90 % WF = WFCOMMONSLOADER.LOADFROMURL(URLSTRING)
91 % WF = WFCOMMONSLOADER.LOADFROMURL(URLSTRING, OPTIONS)
93 % Useful for loading workflows directly from repositories like
94 % wfcommons/pegasus-instances.
97 % urlString - URL pointing to WfCommons JSON file
98 % options - Options struct (see load method)
101 % wf - Workflow object
106 options = WfCommonsLoader.parseOptions(options);
108 % Fetch content from URL
109 jsonStr = webread(urlString);
112 data = jsondecode(jsonStr);
115 WfCommonsLoader.validateSchema(data);
117 % Extract name from URL if not in JSON
118 [~, defaultName, ~] = fileparts(urlString);
121 workflowName = WfCommonsLoader.extractName(data, defaultName);
122 wf = WfCommonsLoader.buildWorkflow(data, workflowName, options);
125 function isValid = validateFile(jsonFile)
126 % VALIDATEFILE Check if file is valid WfCommons schema.
128 % ISVALID = WFCOMMONSLOADER.VALIDATEFILE(JSONFILE)
131 % isValid - true if file is valid WfCommons format
134 jsonStr = fileread(jsonFile);
135 data = jsondecode(jsonStr);
136 WfCommonsLoader.validateSchema(data);
144 methods (Static, Access = private)
145 function options = parseOptions(options)
146 % PARSEOPTIONS Set default option values.
148 if ~isfield(options, 'distributionType')
149 options.distributionType = 'exp';
151 if ~isfield(options, 'defaultSCV')
152 options.defaultSCV = 1.0;
154 if ~isfield(options, 'defaultRuntime')
155 options.defaultRuntime = 1.0;
157 if ~isfield(options, 'useExecutionData')
158 options.useExecutionData = true;
160 if ~isfield(options, 'storeMetadata')
161 options.storeMetadata = true;
165 function validateSchema(data)
166 % VALIDATESCHEMA Validate required fields in WfCommons schema.
168 % Check schemaVersion
169 if ~isfield(data, 'schemaVersion')
170 line_error(mfilename, 'Missing schemaVersion field in WfCommons JSON.');
173 version = data.schemaVersion;
174 if ~any(strcmp(version, WfCommonsLoader.SUPPORTED_SCHEMA_VERSIONS))
175 line_warning(mfilename, 'Schema version %s may not be fully supported.', version);
178 % Check workflow structure
179 if ~isfield(data, 'workflow')
180 line_error(mfilename, 'Missing workflow field in WfCommons JSON.');
183 % Schema 1.4 and earlier use workflow.tasks directly
184 % Schema 1.5+ uses workflow.specification.tasks
186 if isfield(data.workflow, 'specification')
187 if isfield(data.workflow.specification, 'tasks') && ~isempty(data.workflow.specification.tasks)
190 elseif isfield(data.workflow, 'tasks') && ~isempty(data.workflow.tasks)
191 % Schema 1.4 format: tasks directly under workflow
196 line_error(mfilename, 'Workflow must have at least one task.');
200 function name = extractName(data, defaultName)
201 % EXTRACTNAME Extract workflow name from data or use default.
203 if isfield(data, 'name') && ~isempty(data.name)
206 [~, name, ~] = fileparts(defaultName);
208 % Sanitize name for MATLAB
209 name = regexprep(name, '[^a-zA-Z0-9_]', '_');
215 function wf = buildWorkflow(data, workflowName, options)
216 % BUILDWORKFLOW Build Workflow object from parsed data.
218 wf = Workflow(workflowName);
220 % Detect schema format: 1.5+ uses specification.tasks, 1.4 uses tasks directly
221 isLegacySchema = ~isfield(data.workflow, 'specification');
223 % Schema 1.4 format: tasks directly under workflow with embedded runtime
224 tasks = data.workflow.tasks;
226 % Schema 1.5+ format: tasks under specification
227 spec = data.workflow.specification;
231 % Build execution data map if available
232 execMap = containers.Map();
233 if options.useExecutionData
235 % Schema 1.4: runtime data is embedded in task objects
237 for i = 1:length(tasks)
239 taskId = WfCommonsLoader.getTaskId(task, i);
240 execMap(taskId) = task;
243 for i = 1:length(tasks)
245 taskId = WfCommonsLoader.getTaskId(task, i);
246 execMap(taskId) = task;
249 elseif isfield(data.workflow, 'execution')
250 % Schema 1.5+: separate execution section
251 exec = data.workflow.execution;
252 if isfield(exec, 'tasks')
253 execTasks = exec.tasks;
254 if isstruct(execTasks)
255 for i = 1:length(execTasks)
256 execMap(execTasks(i).id) = execTasks(i);
258 elseif iscell(execTasks)
259 for i = 1:length(execTasks)
260 execMap(execTasks{i}.id) = execTasks{i};
267 % Phase 1: Create all activities
268 taskMap = containers.Map();
269 taskIdxMap = containers.Map();
279 % Get task ID (schema 1.4 may use 'name' instead of 'id')
280 taskId = WfCommonsLoader.getTaskId(task, i);
282 % Get runtime from execution data
283 runtime = options.defaultRuntime;
284 if isKey(execMap, taskId)
285 execData = execMap(taskId);
286 if isfield(execData, 'runtimeInSeconds')
287 runtime = execData.runtimeInSeconds;
292 dist = WfCommonsLoader.fitDistribution(runtime, options);
295 act = wf.addActivity(taskId, dist);
296 taskMap(taskId) = act;
297 taskIdxMap(taskId) = i;
299 % Store metadata if requested
300 if options.storeMetadata
301 act.metadata = WfCommonsLoader.extractMetadata(task, taskId, execMap);
305 % Phase 2: Build adjacency structures
306 [adjList, inDeg, outDeg, predList] = WfCommonsLoader.buildAdjacency(tasks, taskIdxMap);
308 % Phase 3: Detect and add precedences
309 WfCommonsLoader.addPrecedences(wf, tasks, taskMap, adjList, inDeg, outDeg, predList);
312 function dist = fitDistribution(runtime, options)
313 % FITDISTRIBUTION Fit a distribution from runtime.
315 if runtime <= GlobalConstants.FineTol
316 dist = Immediate.getInstance();
318 switch lower(options.distributionType)
320 dist = Exp.fitMean(runtime);
324 dist = APH.fitMeanAndSCV(runtime, options.defaultSCV);
326 if options.defaultSCV > 1.0
327 dist = HyperExp.fitMeanAndSCV(runtime, options.defaultSCV);
329 dist = Exp.fitMean(runtime);
332 dist = Exp.fitMean(runtime);
337 function taskId = getTaskId(task, idx)
338 % GETTASKID Extract task ID from task object.
339 % Schema 1.4 may use 'name' as identifier, Schema 1.5+ uses 'id'
340 if isfield(task, 'id')
342 elseif isfield(task, 'name')
345 taskId = sprintf('task_%d', idx);
349 function metadata = extractMetadata(task, taskId, execMap)
350 % EXTRACTMETADATA Extract WfCommons metadata from task.
353 metadata.taskId = taskId;
355 if isfield(task, 'name')
356 metadata.name = task.name;
358 if isfield(task, 'inputFiles')
359 metadata.inputFiles = task.inputFiles;
361 if isfield(task, 'outputFiles')
362 metadata.outputFiles = task.outputFiles;
365 if isKey(execMap, taskId)
366 exec = execMap(taskId);
367 fields = {'executedAt', 'command', 'coreCount', 'avgCPU', ...
368 'readBytes', 'writtenBytes', 'memoryInBytes', ...
369 'energyInKWh', 'avgPowerInW', 'priority', 'machines'};
370 for j = 1:length(fields)
373 metadata.(f) = exec.(f);
379 function [adjList, inDeg, outDeg, predList] = buildAdjacency(tasks, taskIdxMap)
380 % BUILDADJACENCY Build adjacency structures from tasks.
383 adjList = cell(n, 1);
384 predList = cell(n, 1);
386 outDeg = zeros(n, 1);
401 if isfield(task, 'children') && ~isempty(task.children)
402 children = task.children;
404 children = {children};
406 for j = 1:length(children)
407 childId = children{j};
408 if isKey(taskIdxMap, childId)
409 childIdx = taskIdxMap(childId);
410 adjList{i} = [adjList{i}, childIdx];
411 predList{childIdx} = [predList{childIdx}, i];
412 outDeg(i) = outDeg(i) + 1;
413 inDeg(childIdx) = inDeg(childIdx) + 1;
420 function addPrecedences(wf, tasks, taskMap, adjList, inDeg, outDeg, predList)
421 % ADDPRECEDENCES Detect patterns and add precedences to workflow.
424 processed = false(n, n); % Track processed edges
426 % Step 1: Identify and process fork-join pairs
430 children = adjList{i};
432 % Find common join point
433 joinPoint = WfCommonsLoader.findCommonJoin(children, adjList, inDeg, n);
435 if ~isempty(joinPoint)
436 % Valid fork-join structure
439 postTask = tasks(joinPoint);
442 postTask = tasks{joinPoint};
445 preTaskId = WfCommonsLoader.getTaskId(preTask, i);
446 preAct = taskMap(preTaskId);
447 postActs = cell(1, length(children));
448 for j = 1:length(children)
450 childTask = tasks(children(j));
452 childTask = tasks{children(j)};
454 childTaskId = WfCommonsLoader.getTaskId(childTask, children(j));
455 postActs{j} = taskMap(childTaskId);
457 postTaskId = WfCommonsLoader.getTaskId(postTask, joinPoint);
458 postAct = taskMap(postTaskId);
461 wf.addPrecedence(Workflow.AndFork(preAct, postActs));
462 wf.addPrecedence(Workflow.AndJoin(postActs, postAct));
464 % Mark edges as processed
465 for j = 1:length(children)
466 processed(i, children(j)) = true;
467 processed(children(j), joinPoint) = true;
473 % Step 2: Add remaining edges as serial connections
485 preTaskId = WfCommonsLoader.getTaskId(preTask, i);
486 postTaskId = WfCommonsLoader.getTaskId(postTask, j);
487 preAct = taskMap(preTaskId);
488 postAct = taskMap(postTaskId);
489 wf.addPrecedence(Workflow.Serial(preAct, postAct));
495 function joinPoint = findCommonJoin(children, adjList, inDeg, n)
496 % FINDCOMMONJOIN Find a common join point for all children.
503 % Use BFS from each child to find reachable nodes
504 reachable = cell(1, length(children));
505 for i = 1:length(children)
506 reachable{i} = WfCommonsLoader.getReachableNodes(children(i), adjList, n);
510 common = reachable{1};
511 for i = 2:length(children)
512 common = intersect(common, reachable{i});
520 % Find the first common node that has all children as predecessors
521 % (i.e., in-degree matches number of children from this fork)
522 for node = common(:)'
523 if inDeg(node) >= length(children)
524 % Check if all children can reach this node directly
525 allDirectChild = true;
526 for i = 1:length(children)
527 if ~ismember(node, adjList{children(i)})
528 allDirectChild = false;
542 function reachable = getReachableNodes(start, adjList, n)
543 % GETREACHABLENODES BFS to find all reachable nodes.
545 visited = false(1, n);
547 visited(start) = true;
550 while ~isempty(queue)
554 for next = adjList{current}
556 visited(next) = true;
557 queue = [queue, next];
558 reachable = [reachable, next];