Implementation of a DataFlow Manager based on the original SKA SDP architecture.


The DALiuGE resource manager uses the requested logical graphs, the available resources and the profiling information and turns it into the partitioned physical graph, which will then be deployed and monitored by the Physical Graph Manager

dlg.dropmake.pg_generator.fill(lg, params)

Logical Graph + params -> Filled Logical Graph

dlg.dropmake.pg_generator.partition(pgt, algo, num_partitions=1, num_islands=1, partition_label='partition', show_gojs=False, **algo_params)

Partitions a Physical Graph Template

dlg.dropmake.pg_generator.resource_map(pgt, nodes, num_islands=1, co_host_dim=True)

Maps a Physical Graph Template pgt to nodes

dlg.dropmake.pg_generator.unroll(lg, oid_prefix=None, zerorun=False, app=None)

Unrolls a logical graph


class dlg.dropmake.scheduler.DAGUtil

Helper functions dealing with DAG

static build_dag_from_drops(drop_list, embed_drop=True, fake_super_root=False)

return a networkx Digraph (DAG) :param: fake_super_root whether to create a fake super root node in the DAG If set to True, it enables edge zero-based scheduling agorithms to make more aggressive merging

static ganttchart_matrix(G, topo_sort=None)

Return a M (# of DROPs) by N (longest path length) matrix

static get_longest_path(G, weight='weight', default_weight=1, show_path=True, topo_sort=None)

Ported from: https://github.com/networkx/networkx/blob/master/networkx/algorithms/dag.py Added node weight

Returns the longest path in a DAG If G has edges with ‘weight’ attribute the edge data are used as weight values. :param: G Graph (NetworkX DiGraph) :param: weight Edge data key to use for weight (string) :param: default_weight The weight of edges that do not have a weight attribute (integer) :return: a tuple with two elements: path (list), the longest path, and path_length (float) the length of the longest path.

static get_max_antichains(G)

return a list of antichains with Top-2 lengths

static get_max_dop(G)

Get the maximum degree of parallelism of this DAG return : int

static get_max_width(G, weight='weight', default_weight=1)

Get the antichain with the maximum “weighted” width of this DAG weight: float (for example, it could be RAM consumption in GB) Return : float

static label_schedule(G, weight='weight', topo_sort=None)

for each node, label its start and end time

static metis_part(G, num_partitions)

Use metis binary executable (instead of library) This is used only for testing when libmetis halts unexpectedly

static prune_antichains(antichains)

Prune a list of antichains to keep those with Top-2 lengths antichains is a Generator (not a list!)

class dlg.dropmake.scheduler.KFamilyPartition(gid, max_dop, global_dag=None)

A special case (K = 1) of the Maximum Weighted K-families based on the Theorem 3.1 in http://fmdb.cs.ucla.edu/Treports/930014.pdf


Add a single node u to the partition

class dlg.dropmake.scheduler.MinNumPartsScheduler(drop_list, deadline, max_dop=8, dag=None, optimistic_factor=0.5)

A special type of partition that aims to schedule the DAG on time but at minimum cost. In this particular case, the cost is the number of partitions that will be generated. The assumption is # of partitions (with certain DoP) more or less represents resource footprint.

is_time_critical(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)

This is called ONLY IF either can_add on partition has returned “False” or the new critical path is longer than the old one at each iteration


u - node u, v - node v, uw - weight of node u, vw - weight of node v curr_lpl - current longest path length, ow - current edge weight rem_el - remainig edges to be zeroed ow - original edge length



It looks ahead to compute the probability of time being critical and compares that with the _optimistic_factor probility = (num of edges need to be zeroed to meet the deadline) / (num of remaining unzeroed edges)


Whether this scheduler will override the False result from Partition.can_add()

class dlg.dropmake.scheduler.MySarkarScheduler(drop_list, max_dop=8, dag=None, dump_progress=False)

Based on “V. Sarkar, Partitioning and Scheduling Parallel Programs for Execution on Multiprocessors. Cambridge, MA: MIT Press, 1989.”

Main change We do not order independent tasks within the same cluster. This could blow the cluster, therefore we allow for a cost constraint on the number of concurrent tasks (e.g. # of cores) within each cluster

Why 1. we only need to topologically sort the DAG once since we do not add new edges in the cluster 2. closer to requirements 3. adjustable for local schedulers

Similar ideas: http://stackoverflow.com/questions/3974731

is_time_critical(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)


MySarkarScheduler always returns False


Whether this scheduler will override the False result from Partition.can_add()

Return a tuple of
  1. the # of partitions formed (int)

  2. the parallel time (longest path, int)

  3. partition time (seconds, float)

reduce_partitions(parts, g_dict, G)

further reduce the number of partitions by merging partitions whose max_dop is less than capacity

step 1 - sort partition list based on their

_max_dop of num_cpus as default

step 2 - enumerate each partition p to see merging

between p and its neighbour is feasible

class dlg.dropmake.scheduler.PSOScheduler(drop_list, max_dop=8, dag=None, deadline=None, topk=30, swarm_size=40)

Use the Particle Swarm Optimisation to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Particle_swarm_optimization

The idea is to let “edgezeroing” becomes the search variable X The number of dimensions of X is the number of edges in DAG Possible values for each dimension is a discrete set {1, 2, 3} where: * 10 - no zero (2 in base10) + 1 * 00 - zero w/o linearisation (0 in base10) + 1 * 01 - zero with linearisation (1 in base10) + 1

if (deadline is present):
the objective function sets up a partition scheme such that
  1. DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation

  2. returns num_of_partitions

constrain function:
  1. makespan < deadline

the objective function sets up a partition scheme such that
  1. DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation

  2. returns makespan


Deadline - critical_path >= 0


x is a list of values, each taking one of the 3 integers: 0,1,2 for an edge indices of x is identical to the indices in G.edges().sort(key=’weight’)

Returns a tuple of:
  1. the # of partitions formed (int)

  2. the parallel time (longest path, int)

  3. partition time (seconds, float)

  4. a list of partitions (Partition)

class dlg.dropmake.scheduler.Partition(gid, max_dop)

Logical partition, multiple (1 ~ N) of these can be placed onto a single physical resource unit

Logical partition can be nested, and it somewhat resembles the dlg.manager.drop_manager

add(u, v, gu, gv, sequential=False, global_dag=None)

Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains

add_node(u, weight)

Add a single node u to the partition

can_add(u, v, gu, gv)

Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.

probe_max_dop(u, v, unew, vnew, update=False)

An incremental antichain (which appears significantly more efficient than the networkx antichains) But only works for DoP, not for weighted width


Remove node n from the partition

property schedule

Get the schedule assocaited with this partition

class dlg.dropmake.scheduler.Schedule(dag, max_dop)

The scheduling solution with schedule-related properties

property efficiency

resource usage percentage (integer)

property schedule_matrix
Return: a self._lpl x self._max_dop matrix

(X - time, Y - resource unit / parallel lane)

property workload
Return: (integer)

the mean # of resource units per time unit consumed by the graph/partition

class dlg.dropmake.scheduler.Scheduler(drop_list, max_dop=8, dag=None)

Static Scheduling consists of three steps: 1. partition the DAG into an optimal number (M) of partitions goal - minimising execution time while maintaining intra-partition DoP 2. merge partitions into a given number (N) of partitions (if M > N) goal - minimise logical communication cost while maintaining load balancing 3. map each merged partition to a resource unit goal - minimise physical communication cost amongst resource units


map logical partitions to physical resources

merge_partitions(num_partitions, bal_cond=1)
Merge M partitions into N partitions where N < M

implemented using METIS for now

bal_cond: load balance condition (integer):

0 - workload, 1 - CPU count (faster to evaluate than workload)

exception dlg.dropmake.scheduler.SchedulerException


Refer to https://confluence.ska-sdp.org/display/PRODUCTTREE/C.

class dlg.dropmake.pg_manager.PGManager(root_dir)

Physical Graph Manager

add_pgt(pgt, lg_name)

Dummy impl. using file system for now (thread safe) TODO - use proper graph databases to manage all PGTs


A unique PGT id (handle)

get_gantt_chart(pgt_id, json_str=True)

the gantt chart matrix (numarray) given a PGT id


The PGT object given its PGT id

get_schedule_matrices(pgt_id, json_str=True)

a list of schedule matrices (numarrays) given a PGT id

class dlg.dropmake.pg_manager.PGUtil

Helper functions dealing with Physical Graphs

static vstack_mat(A, B, separator=False)

Vertically stack two matrices that may have different # of colums


A matrix A (2d numpy array)


B matrix B (2d numy array)


separator whether to add an empty row separator between the two matrices (boolean)


the vertically stacked matrix (2d numpy array)