dlg.dropmake
Implementation of a DataFlow Manager based on the original SKA SDP architecture.
dlg.dropmake.pg_generator
The DALiuGE resource manager uses the requested logical graphs, the available resources and the profiling information and turns it into the partitioned physical graph, which will then be deployed and monitored by the Physical Graph Manager
- dlg.dropmake.pg_generator.fill(lg, params)
Logical Graph + params -> Filled Logical Graph
- dlg.dropmake.pg_generator.partition(pgt, algo, num_partitions=1, num_islands=1, partition_label='partition', show_gojs=False, **algo_params)
Partitions a Physical Graph Template
- dlg.dropmake.pg_generator.resource_map(pgt, nodes, num_islands=1, co_host_dim=True)
Maps a Physical Graph Template pgt to nodes
- dlg.dropmake.pg_generator.unroll(lg, oid_prefix=None, zerorun=False, app=None)
Unrolls a logical graph
dlg.dropmake.scheduler
- class dlg.dropmake.scheduler.DAGUtil
Helper functions dealing with DAG
- static build_dag_from_drops(drop_list, embed_drop=True, fake_super_root=False)
return a networkx Digraph (DAG) :param: fake_super_root whether to create a fake super root node in the DAG If set to True, it enables edge zero-based scheduling agorithms to make more aggressive merging
- static ganttchart_matrix(G, topo_sort=None)
Return a M (# of DROPs) by N (longest path length) matrix
- static get_longest_path(G, weight='weight', default_weight=1, show_path=True, topo_sort=None)
Ported from: https://github.com/networkx/networkx/blob/master/networkx/algorithms/dag.py Added node weight
Returns the longest path in a DAG If G has edges with ‘weight’ attribute the edge data are used as weight values. :param: G Graph (NetworkX DiGraph) :param: weight Edge data key to use for weight (string) :param: default_weight The weight of edges that do not have a weight attribute (integer) :return: a tuple with two elements: path (list), the longest path, and path_length (float) the length of the longest path.
- static get_max_antichains(G)
return a list of antichains with Top-2 lengths
- static get_max_dop(G)
Get the maximum degree of parallelism of this DAG return : int
- static get_max_width(G, weight='weight', default_weight=1)
Get the antichain with the maximum “weighted” width of this DAG weight: float (for example, it could be RAM consumption in GB) Return : float
- static label_schedule(G, weight='weight', topo_sort=None)
for each node, label its start and end time
- static metis_part(G, num_partitions)
Use metis binary executable (instead of library) This is used only for testing when libmetis halts unexpectedly
- static prune_antichains(antichains)
Prune a list of antichains to keep those with Top-2 lengths antichains is a Generator (not a list!)
- class dlg.dropmake.scheduler.KFamilyPartition(gid, max_dop, global_dag=None)
A special case (K = 1) of the Maximum Weighted K-families based on the Theorem 3.1 in http://fmdb.cs.ucla.edu/Treports/930014.pdf
- add_node(u)
Add a single node u to the partition
- class dlg.dropmake.scheduler.MinNumPartsScheduler(drop_list, deadline, max_dop=8, dag=None, optimistic_factor=0.5)
A special type of partition that aims to schedule the DAG on time but at minimum cost. In this particular case, the cost is the number of partitions that will be generated. The assumption is # of partitions (with certain DoP) more or less represents resource footprint.
- is_time_critical(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)
This is called ONLY IF either can_add on partition has returned “False” or the new critical path is longer than the old one at each iteration
- Parameters:
u - node u, v - node v, uw - weight of node u, vw - weight of node v curr_lpl - current longest path length, ow - current edge weight rem_el - remainig edges to be zeroed ow - original edge length
- Returns:
Boolean
It looks ahead to compute the probability of time being critical and compares that with the _optimistic_factor probility = (num of edges need to be zeroed to meet the deadline) / (num of remaining unzeroed edges)
- override_cannot_add()
Whether this scheduler will override the False result from Partition.can_add()
- class dlg.dropmake.scheduler.MySarkarScheduler(drop_list, max_dop=8, dag=None, dump_progress=False)
Based on “V. Sarkar, Partitioning and Scheduling Parallel Programs for Execution on Multiprocessors. Cambridge, MA: MIT Press, 1989.”
Main change We do not order independent tasks within the same cluster. This could blow the cluster, therefore we allow for a cost constraint on the number of concurrent tasks (e.g. # of cores) within each cluster
Why 1. we only need to topologically sort the DAG once since we do not add new edges in the cluster 2. closer to requirements 3. adjustable for local schedulers
Similar ideas: http://stackoverflow.com/questions/3974731
- is_time_critical(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)
- Returns:
True
MySarkarScheduler always returns False
- override_cannot_add()
Whether this scheduler will override the False result from Partition.can_add()
- partition_dag()
- Return a tuple of
the # of partitions formed (int)
the parallel time (longest path, int)
partition time (seconds, float)
- reduce_partitions(parts, g_dict, G)
further reduce the number of partitions by merging partitions whose max_dop is less than capacity
- step 1 - sort partition list based on their
_max_dop of num_cpus as default
- step 2 - enumerate each partition p to see merging
between p and its neighbour is feasible
- class dlg.dropmake.scheduler.PSOScheduler(drop_list, max_dop=8, dag=None, deadline=None, topk=30, swarm_size=40)
Use the Particle Swarm Optimisation to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Particle_swarm_optimization
The idea is to let “edgezeroing” becomes the search variable X The number of dimensions of X is the number of edges in DAG Possible values for each dimension is a discrete set {1, 2, 3} where: * 10 - no zero (2 in base10) + 1 * 00 - zero w/o linearisation (0 in base10) + 1 * 01 - zero with linearisation (1 in base10) + 1
- if (deadline is present):
- the objective function sets up a partition scheme such that
DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation
returns num_of_partitions
- constrain function:
makespan < deadline
- else:
- the objective function sets up a partition scheme such that
DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation
returns makespan
- constrain_func(x)
Deadline - critical_path >= 0
- objective_func(x)
x is a list of values, each taking one of the 3 integers: 0,1,2 for an edge indices of x is identical to the indices in G.edges().sort(key=’weight’)
- partition_dag()
- Returns a tuple of:
the # of partitions formed (int)
the parallel time (longest path, int)
partition time (seconds, float)
a list of partitions (Partition)
- class dlg.dropmake.scheduler.Partition(gid, max_dop)
Logical partition, multiple (1 ~ N) of these can be placed onto a single physical resource unit
Logical partition can be nested, and it somewhat resembles the dlg.manager.drop_manager
- add(u, v, gu, gv, sequential=False, global_dag=None)
Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains
- add_node(u, weight)
Add a single node u to the partition
- can_add(u, v, gu, gv)
Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.
- probe_max_dop(u, v, unew, vnew, update=False)
An incremental antichain (which appears significantly more efficient than the networkx antichains) But only works for DoP, not for weighted width
- remove(n)
Remove node n from the partition
- property schedule
Get the schedule assocaited with this partition
- class dlg.dropmake.scheduler.Schedule(dag, max_dop)
The scheduling solution with schedule-related properties
- property efficiency
resource usage percentage (integer)
- property schedule_matrix
- Return: a self._lpl x self._max_dop matrix
(X - time, Y - resource unit / parallel lane)
- property workload
- Return: (integer)
the mean # of resource units per time unit consumed by the graph/partition
- class dlg.dropmake.scheduler.Scheduler(drop_list, max_dop=8, dag=None)
Static Scheduling consists of three steps: 1. partition the DAG into an optimal number (M) of partitions goal - minimising execution time while maintaining intra-partition DoP 2. merge partitions into a given number (N) of partitions (if M > N) goal - minimise logical communication cost while maintaining load balancing 3. map each merged partition to a resource unit goal - minimise physical communication cost amongst resource units
- map_partitions()
map logical partitions to physical resources
- merge_partitions(num_partitions, bal_cond=1)
- Merge M partitions into N partitions where N < M
implemented using METIS for now
- bal_cond: load balance condition (integer):
0 - workload, 1 - CPU count (faster to evaluate than workload)
- exception dlg.dropmake.scheduler.SchedulerException
dlg.dropmake.pg_manager
Refer to https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4.4+DFM+Physical+Graph+Manager
- class dlg.dropmake.pg_manager.PGManager(root_dir)
Physical Graph Manager
- add_pgt(pgt, lg_name)
Dummy impl. using file system for now (thread safe) TODO - use proper graph databases to manage all PGTs
- Return:
A unique PGT id (handle)
- get_gantt_chart(pgt_id, json_str=True)
- Return:
the gantt chart matrix (numarray) given a PGT id
- get_pgt(pgt_id)
- Return:
The PGT object given its PGT id
- get_schedule_matrices(pgt_id, json_str=True)
- Return:
a list of schedule matrices (numarrays) given a PGT id
- class dlg.dropmake.pg_manager.PGUtil
Helper functions dealing with Physical Graphs
- static vstack_mat(A, B, separator=False)
Vertically stack two matrices that may have different # of colums
- Param:
A matrix A (2d numpy array)
- Param:
B matrix B (2d numy array)
- Param:
separator whether to add an empty row separator between the two matrices (boolean)
- Returns:
the vertically stacked matrix (2d numpy array)