dlg.dropmake¶
Prototypical implementation of DataFlow Manager https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4+Data+Flow+Manager The sub-modules are based on the proposed (latest) product tree as of 8 Dec 2015
Contents
dlg.dropmake.pg_generator¶
https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4.2+DFM+Resource+Manager
DFM resource managr uses the requested logical graphs, the available resources and the profiling information and turns it into the partitioned physical graph, which will then be deployed and monitored by the Physical Graph Manager
Examples of logical graph node JSON representation
- { u’category’: u’memory’,
- u’data_volume’: 25, u’group’: -58, u’key’: -59, u’loc’: u‘40.96484375000006 -250.53115793863992’, u’text’: u’Channel @
All Day’},
- { u’Arg01’: u’‘,
- u’Arg02’: u’‘, u’Arg03’: u’‘, u’Arg04’: u’‘, u’category’: u’Component’, u’execution_time’: 20, u’group’: -60, u’key’: -56, u’loc’: u‘571.6718750000005 268.0000000000004’, u’text’: u’DD Calibration’}
-
exception
dlg.dropmake.pg_generator.
GInvalidLink
¶
-
exception
dlg.dropmake.pg_generator.
GInvalidNode
¶
-
exception
dlg.dropmake.pg_generator.
GPGTException
¶
-
exception
dlg.dropmake.pg_generator.
GPGTNoNeedMergeException
¶
-
exception
dlg.dropmake.pg_generator.
GraphException
¶
-
class
dlg.dropmake.pg_generator.
LG
(f, ssid=None)¶ An object representation of Logical Graph
-
lgn_to_pgn
(lgn, iid='0', lpcxt=None)¶ convert logical graph node to physical graph node without considering pg links
iid: instance id (string) lpcxt: Loop context
-
unroll_to_tpl
()¶ Not thread-safe!
- just create pgn anyway
- sort out the links
-
-
class
dlg.dropmake.pg_generator.
MetisPGTP
(drop_list, num_partitions=1, min_goal=0, par_label='Partition', ptype=0, ufactor=10, merge_parts=False)¶ DROP and GOJS representations of Physical Graph Template with Partitions Based on METIS http://glaros.dtc.umn.edu/gkhome/metis/metis/overview
-
get_partition_info
()¶ partition parameter and log entry return a string
-
merge_partitions
(new_num_parts, form_island=False, island_type=0, visual=False)¶ This is called during resource mapping - deploying partitioned PGT to a list of nodes
- form_island: If True, the merging will form new_num_parts logical
islands on top of existing partitions (i.e. nodes). this is also known as “reference-based merging”
If False, the merging will physically merge current partitions into new_num_parts new partitions (i.e. nodes) Thus, there will be no node-island ‘hierarchies’ created
island_type: integer, 0 - data island, 1 - compute island
-
to_gojs_json
(string_rep=True, outdict=None, visual=False)¶ Partition the PGT into a real “PGT with Partitions”, thus PGTP, using METIS built-in functions
- See METIS usage:
- http://metis.readthedocs.io/en/latest/index.html
-
to_partition_input
(outf=None)¶ Convert to METIS format for mapping and decomposition NOTE - Since METIS only supports Undirected Graph, we have to produce both upstream and downstream nodes to fit its input format
-
-
class
dlg.dropmake.pg_generator.
MinNumPartsPGTP
(drop_list, deadline, num_partitions=0, par_label='Partition', max_dop=8, merge_parts=False, optimistic_factor=0.5)¶ -
get_partition_info
()¶ partition parameter and log entry return a string
-
-
class
dlg.dropmake.pg_generator.
MySarkarPGTP
(drop_list, num_partitions=0, par_label='Partition', max_dop=8, merge_parts=False)¶ use the MySarkarScheduler to produce the PGTP
-
get_partition_info
()¶ partition parameter and log entry return a string
-
merge_partitions
(new_num_parts, form_island=False, island_type=0, visual=False)¶ This is called during resource mapping - deploying partitioned PGT to a list of nodes
- form_island: If True, the merging will form new_num_parts logical
islands on top of existing partitions (i.e. nodes)
If False, the merging will physically merge current partitions into new_num_parts new partitions (i.e. nodes) Thus, there will be no node-island ‘hierarchies’ created
island_type: integer, 0 - data island, 1 - compute island
-
to_gojs_json
(string_rep=True, outdict=None, visual=False)¶ Partition the PGT into a real “PGT with Partitions”, thus PGTP
-
to_partition_input
(outf)¶ Convert to format for mapping and decomposition
-
-
class
dlg.dropmake.pg_generator.
PGT
(drop_list, build_dag=True)¶ A DROP representation of Physical Graph Template
-
dag
¶ - Return the networkx nx.DiGraph object
The weight of the same edge (u, v) also depends. If it is called after the partitioning, it could have been zeroed if both u and v is allocated to the same DropIsland
-
data_movement
¶ Return the TOTAL data movement
-
get_opt_num_parts
()¶ dummy for now
-
json
¶ Return the JSON string representation of the PGT for visualisation
-
pred_exec_time
(app_drop_only=False, wk='weight', force_answer=False)¶ Predict execution time using the longest path length
-
to_gojs_json
(string_rep=True, outdict=None, visual=False)¶ Convert PGT (without any partitions) to JSON for visualisation in GOJS
Sub-class PGTPs will override this function, and replace this with actual partitioning, and the visulisation becomes an option
-
to_partition_input
(outf)¶ Convert to format for mapping and decomposition
-
to_pg_spec
(node_list, ret_str=True, num_islands=1, tpl_nodes_len=0)¶ convert pgt to pg specification, and map that to the hardware resources
- node_list:
- A list of nodes (list), whose length == (num_islands + num_node_mgrs) We assume that the MasterDropManager’s node is NOT in the node_list
- num_islands:
>1 - Partitions are “conceptually” clustered into Islands 1 - Partitions MAY BE physically merged without generating islands
depending on the length of node_list
-
-
class
dlg.dropmake.pg_generator.
PSOPGTP
(drop_list, par_label='Partition', max_dop=8, deadline=None, topk=30, swarm_size=40, merge_parts=False)¶ -
get_partition_info
()¶ partition parameter and log entry return a string
-
-
dlg.dropmake.pg_generator.
partition
(pgt, algo, num_partitions=1, num_islands=1, partition_label='partition', show_gojs=False, **algo_params)¶ Partitions a Physical Graph Template
-
dlg.dropmake.pg_generator.
unroll
(lg, oid_prefix=None)¶ Unrolls a logical graph
dlg.dropmake.scheduler¶
-
class
dlg.dropmake.scheduler.
DAGUtil
¶ Helper functions dealing with DAG
-
static
build_dag_from_drops
(drop_list, embed_drop=True, fake_super_root=False)¶ return a networkx Digraph (DAG) fake_super_root: whether to create a fake super root node in the DAG
If set to True, it enables edge zero-based scheduling agorithms to make more aggressive mergingtw - task weight dw - data weight / volume
-
static
ganttchart_matrix
(G, topo_sort=None)¶ Return a M (# of DROPs) by N (longest path length) matrix
-
static
get_longest_path
(G, weight='weight', default_weight=1, show_path=True, topo_sort=None)¶ Ported from: https://github.com/networkx/networkx/blob/master/networkx/algorithms/dag.py Added node weight
Returns the longest path in a DAG If G has edges with ‘weight’ attribute the edge data are used as weight values. Parameters ———- G : NetworkX DiGraph
Graph- weight : string (default ‘weight’)
- Edge data key to use for weight
- default_weight : integer (default 1)
- The weight of edges that do not have a weight attribute
- path : list
- Longest path
- path_length : float
- The length of the longest path
-
static
get_max_antichains
(G)¶ return a list of antichains with Top-2 lengths
-
static
get_max_dop
(G)¶ Get the maximum degree of parallelism of this DAG return : int
-
static
get_max_width
(G, weight='weight', default_weight=1)¶ Get the antichain with the maximum “weighted” width of this DAG weight: float (for example, it could be RAM consumption in GB) Return : float
-
static
label_schedule
(G, weight='weight', topo_sort=None)¶ for each node, label its start and end time
-
static
metis_part
(G, num_partitions)¶ Use metis binary executable (instead of library) This is used only for testing when libmetis halts unexpectedly
-
static
prune_antichains
(antichains)¶ Prune a list of antichains to keep those with Top-2 lengths antichains is a Generator (not a list!)
-
static
-
class
dlg.dropmake.scheduler.
DSCScheduler
(drop_list)¶ Based on T. Yang and A. Gerasoulis, “DSC: scheduling parallel tasks on an unbounded number of processors,” in IEEE Transactions on Parallel and Distributed Systems, vol.5, no.9, pp.951-967, Sep 1994
-
class
dlg.dropmake.scheduler.
DilworthPartition
(gid, max_dop)¶ Use Dilworth theorem to determine DoP see https://en.wikipedia.org/wiki/Dilworth’s_theorem
The idea goes as follows: Let bpg = bipartite_graph(dag)
DoP == Poset Width == len(max_antichain) == len(min_num_chain) == (cardinality(dag) - len(max_matching(bpg)))
Note that cardinality(dag) == cardinality(bpg) / 2
See Section 3 of the paper http://opensource.uom.gr/teaching/distrubutedSite/eceutexas/dist2/ termPapers/Selma.pdf
Also http://codeforces.com/blog/entry/3781
The key is to incrementally construct the bipartite graph (bpg) from growing dag
-
add
(u, v, gu, gv, sequential=False, global_dag=None)¶ Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains
-
can_add
(u, v, gu, gv)¶ Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.
-
can_merge
(that)¶ - if (self._max_dop + that._max_dop <= self._ask_max_dop):
- return True
-
-
class
dlg.dropmake.scheduler.
GraphAnnealer
(state, scheduler, deadline=None, topk=None)¶ Use simulated annealing for a DAG/Graph scheduling problem. There are two ways to inject constraints:
- explicitly implement the meet_constraint function
- add an extra penalty term in the energy function
-
energy
()¶ Calculates the number of partitions
-
meet_constraint
()¶ Check if the contraint is met By default, it is always met
-
move
()¶ Select the neighbour, in this case Swaps two edges in the DAG if they are not the same and simply reduce by one for one of them if otherwise
-
class
dlg.dropmake.scheduler.
KFamilyPartition
(gid, max_dop, w_attr='num_cpus', global_dag=None)¶ A special case (K = 1) of the Maximum Weighted K-families based on the Theorem 3.1 in http://fmdb.cs.ucla.edu/Treports/930014.pdf
-
add_node
(u, weight)¶ Add a single node u to the partition
-
can_merge
(that, u, v)¶
-
-
class
dlg.dropmake.scheduler.
MCTSScheduler
(drop_list, max_dop=8, dag=None, deadline=None, max_moves=1000, max_calc_time=10)¶ Use Monte Carlo Tree Search to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Monte_Carlo_tree_search Use basic functions in PSOScheduler by inheriting it for convinence
-
partition_dag
()¶ Trigger the MCTS algorithm Returns a tuple of:
- the # of partitions formed (int)
- the parallel time (longest path, int)
- partition time (seconds, float)
- a list of partitions (Partition)
-
-
class
dlg.dropmake.scheduler.
MinNumPartsScheduler
(drop_list, deadline, max_dop=8, dag=None, optimistic_factor=0.5)¶ A special type of partition that aims to schedule the DAG on time but at minimum cost. In this particular case, the cost is the number of partitions that will be generated. The assumption is # of partitions (with certain DoP) more or less represents resource footprint.
-
is_time_critical
(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)¶ This is called ONLY IF either can_add on partition has returned “False” or the new critical path is longer than the old one at each iteration
- Parameters:
- u - node u, v - node v, uw - weight of node u, vw - weight of node v curr_lpl - current longest path length, ow - current edge weight rem_el - remainig edges to be zeroed ow - original edge length
- Returns:
- Boolean
It looks ahead to compute the probability of time being critical and compares that with the _optimistic_factor probility = (num of edges need to be zeroed to meet the deadline) / (num of remaining unzeroed edges)
-
override_cannot_add
()¶ Whether this scheduler will override the False result from Partition.can_add()
-
-
class
dlg.dropmake.scheduler.
MultiWeightPartition
(gid, max_dops, w_attrs=['num_cpus'], global_dag=None)¶ -
add
(u, v, gu, gv, sequential=False, global_dag=None)¶ Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains
-
can_add
(u, v, gu, gv)¶ Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.
-
can_merge
(that)¶
-
-
class
dlg.dropmake.scheduler.
MySarkarScheduler
(drop_list, max_dop=8, dag=None, dump_progress=False)¶ Based on “V. Sarkar, Partitioning and Scheduling Parallel Programs for Execution on Multiprocessors. Cambridge, MA: MIT Press, 1989.”
Main change We do not order independent tasks within the same cluster. This could blow the cluster, therefore we allow for a cost constraint on the number of concurrent tasks (e.g. # of cores) within each cluster
Why 1. we only need to topologically sort the DAG once since we do not add new edges in the cluster 2. closer to requirements 3. adjustable for local schedulers
Similar ideas: http://stackoverflow.com/questions/3974731
-
is_time_critical
(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)¶ This is called ONLY IF override_cannot_add has returned “True” Parameters:
u - node u, v - node v, uw - weight of node u, vw - weight of node v curr_lpl - current longest path length, ow - current edge weight rem_el - remainig edges to be zeroed ow - original edge length- Returns:
- Boolean
MySarkarScheduler always returns False
-
override_cannot_add
()¶ Whether this scheduler will override the False result from Partition.can_add()
-
partition_dag
()¶ - Return a tuple of
- the # of partitions formed (int)
- the parallel time (longest path, int)
- partition time (seconds, float)
-
-
class
dlg.dropmake.scheduler.
PSOScheduler
(drop_list, max_dop=8, dag=None, deadline=None, topk=30, swarm_size=40)¶ Use the Particle Swarm Optimisation to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Particle_swarm_optimization
The idea is to let “edgezeroing” becomes the search variable X The number of dimensions of X is the number of edges in DAG Possible values for each dimension is a discrete set {1, 2, 3} where
10 - no zero (2 in base10) + 1 00 - zero w/o linearisation (0 in base10) + 1 01 - zero with linearisation (1 in base10) + 1- if (deadline is present):
- the objective function sets up a partition scheme such that
- DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation
- returns num_of_partitions
- constrain function:
- makespan < deadline
- else:
- the objective function sets up a partition scheme such that
- DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation
- returns makespan
-
constrain_func
(x)¶ Deadline - critical_path >= 0
-
objective_func
(x)¶ x is a list of values, each taking one of the 3 integers: 0,1,2 for an edge indices of x is identical to the indices in G.edges().sort(key=’weight’)
-
partition_dag
()¶ - Returns a tuple of:
- the # of partitions formed (int)
- the parallel time (longest path, int)
- partition time (seconds, float)
- a list of partitions (Partition)
-
class
dlg.dropmake.scheduler.
Partition
(gid, max_dop)¶ Logical partition, multiple (1 ~ N) of these can be placed onto a single physical resource unit
Logical partition can be nested, and it somewhat resembles the dlg.manager.drop_manager
-
add
(u, v, gu, gv, sequential=False, global_dag=None)¶ Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains
-
add_node
(u, weight)¶ Add a single node u to the partition
-
can_add
(u, v, gu, gv)¶ Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.
-
probe_max_dop
(u, v, unew, vnew, update=False)¶ An incremental antichain (which appears significantly more efficient than the networkx antichains) But only works for DoP, not for weighted width
-
remove
(n)¶ Remove node n from the partition
-
schedule
¶ Get the schedule assocaited with this partition
-
-
class
dlg.dropmake.scheduler.
SAScheduler
(drop_list, max_dop=8, dag=None, deadline=None, topk=None, max_iter=6000)¶ Use Simulated Annealing to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Simulated_annealing http://apmonitor.com/me575/index.php/Main/SimulatedAnnealing Use basic functions in PSOScheduler by inheriting it for convinence
-
partition_dag
()¶ Trigger the SA algorithm Returns a tuple of:
- the # of partitions formed (int)
- the parallel time (longest path, int)
- partition time (seconds, float)
- a list of partitions (Partition)
-
-
class
dlg.dropmake.scheduler.
Schedule
(dag, max_dop)¶ The scheduling solution with schedule-related properties
-
efficiency
¶ resource usage percentage (integer)
-
schedule_matrix
¶ - Return: a self._lpl x self._max_dop matrix
- (X - time, Y - resource unit / parallel lane)
-
workload
¶ - Return: (integer)
- the mean # of resource units per time unit consumed by the graph/partition
-
-
class
dlg.dropmake.scheduler.
Scheduler
(drop_list, max_dop=8, dag=None)¶ Static Scheduling consists of three steps: 1. partition the DAG into an optimal number (M) of partitions
goal - minimising execution time while maintaining intra-partition DoP- merge partitions into a given number (N) of partitions (if M > N)
- goal - minimise logical communication cost while maintaining load balancing
- map each merged partition to a resource unit
- goal - minimise physical communication cost amongst resource units
-
map_partitions
()¶ map logical partitions to physical resources
-
merge_partitions
(num_partitions, bal_cond=0)¶ - Merge M partitions into N partitions where N < M
- implemented using METIS for now
- bal_cond: load balance condition (integer):
- 0 - workload, 1 - count
-
exception
dlg.dropmake.scheduler.
SchedulerException
¶
-
class
dlg.dropmake.scheduler.
WeightedDilworthPartition
(gid, max_dop, global_dag=None)¶ The extensions on DilworthPartition
- Support “weights” for each Drop’s DoP
- (e.g. CLEAN AppDrop uses 8 cores)
This requires a “weighted” maximal antichain. The solution is to create a weighted version of the bipartite graph without changing the original partition DAG. This allows us to use the same max matching algorithm to find the max antichain
It has an option (global_dag) to deal with global path reachability, which could be missing from the DAG inside the local partition. Such misses inflate DoP values, leading to more rejected partition merge requests, which in turn creates more partitions. Based on our experiment, without switching on this option, scheduling the “chiles_two_dev2” pipeline will create 45 partitions (i.e. 45 compute nodes, each has 8 cores). Turning on this option bring that number down to 24 within the same execution time.
Off - exec_time:92 - min_exec_time:67 - total_data_movement:510 - algo:Edge Zero - num_parts:45
On - exec_time:92 - min_exec_time:67 - total_data_movement:482 - algo:Edge Zero - num_parts:24
However “probing reachability” slows down partitioning by a factor of 3 in the case of the CHILES2 pipeline. Some techniques may be applicable e.g. http://www.sciencedirect.com/science/article/pii/S0196677483710175
-
can_add
(u, v, gu, gv)¶ Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.
-
can_merge
(that)¶ Need to merge split graph as well to speed up!
dlg.dropmake.pg_manager¶
Refer to https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4.4+DFM+Physical+Graph+Manager
-
class
dlg.dropmake.pg_manager.
PGManager
(root_dir)¶ Physical Graph Manager
-
add_pgt
(pgt, lg_name)¶ Dummy impl. using file system for now (thread safe) TODO - use proper graph databases to manage all PGTs
- Return:
- A unique PGT id (handle)
-
get_gantt_chart
(pgt_id, json_str=True)¶ - Return:
- the gantt chart matrix (numarray) given a PGT id
-
get_pgt
(pgt_id)¶ - Return:
- The PGT object given its PGT id
-
get_schedule_matrices
(pgt_id, json_str=True)¶ - Return:
- a list of schedule matrices (numarrays) given a PGT id
-
-
class
dlg.dropmake.pg_manager.
PGUtil
¶ Helper functions dealing with Physical Graphs
-
static
vstack_mat
(A, B, separator=False)¶ Vertically stack two matrices that may have different # of colums A:
matrix A (2d numpy array)- B:
- matrix B (2d numy array)
- separator:
- whether to add an empty row separator between the two matrices (boolean)
- Return:
- the vertically stacked matrix (2d numpy array)
-
static