dlg.dropmake

Prototypical implementation of DataFlow Manager https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4+Data+Flow+Manager The sub-modules are based on the proposed (latest) product tree as of 8 Dec 2015

dlg.dropmake.pg_generator

https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4.2+DFM+Resource+Manager

DFM resource managr uses the requested logical graphs, the available resources and the profiling information and turns it into the partitioned physical graph, which will then be deployed and monitored by the Physical Graph Manager

Examples of logical graph node JSON representation

{ u’category’: u’memory’,
u’data_volume’: 25, u’group’: -58, u’key’: -59, u’loc’: u‘40.96484375000006 -250.53115793863992’, u’text’: u’Channel @

All Day’},

{ u’Arg01’: u’‘,
u’Arg02’: u’‘, u’Arg03’: u’‘, u’Arg04’: u’‘, u’category’: u’Component’, u’execution_time’: 20, u’group’: -60, u’key’: -56, u’loc’: u‘571.6718750000005 268.0000000000004’, u’text’: u’DD Calibration’}
exception dlg.dropmake.pg_generator.GInvalidNode
exception dlg.dropmake.pg_generator.GPGTException
exception dlg.dropmake.pg_generator.GPGTNoNeedMergeException
exception dlg.dropmake.pg_generator.GraphException
class dlg.dropmake.pg_generator.LG(f, ssid=None)

An object representation of Logical Graph

lgn_to_pgn(lgn, iid='0', lpcxt=None)

convert logical graph node to physical graph node without considering pg links

iid: instance id (string) lpcxt: Loop context

unroll_to_tpl()

Not thread-safe!

  1. just create pgn anyway
  2. sort out the links
class dlg.dropmake.pg_generator.MetisPGTP(drop_list, num_partitions=1, min_goal=0, par_label='Partition', ptype=0, ufactor=10, merge_parts=False)

DROP and GOJS representations of Physical Graph Template with Partitions Based on METIS http://glaros.dtc.umn.edu/gkhome/metis/metis/overview

get_partition_info()

partition parameter and log entry return a string

merge_partitions(new_num_parts, form_island=False, island_type=0, visual=False)

This is called during resource mapping - deploying partitioned PGT to a list of nodes

form_island: If True, the merging will form new_num_parts logical

islands on top of existing partitions (i.e. nodes). this is also known as “reference-based merging”

If False, the merging will physically merge current partitions into new_num_parts new partitions (i.e. nodes) Thus, there will be no node-island ‘hierarchies’ created

island_type: integer, 0 - data island, 1 - compute island

to_gojs_json(string_rep=True, outdict=None, visual=False)

Partition the PGT into a real “PGT with Partitions”, thus PGTP, using METIS built-in functions

See METIS usage:
http://metis.readthedocs.io/en/latest/index.html
to_partition_input(outf=None)

Convert to METIS format for mapping and decomposition NOTE - Since METIS only supports Undirected Graph, we have to produce both upstream and downstream nodes to fit its input format

class dlg.dropmake.pg_generator.MinNumPartsPGTP(drop_list, deadline, num_partitions=0, par_label='Partition', max_dop=8, merge_parts=False, optimistic_factor=0.5)
get_partition_info()

partition parameter and log entry return a string

class dlg.dropmake.pg_generator.MySarkarPGTP(drop_list, num_partitions=0, par_label='Partition', max_dop=8, merge_parts=False)

use the MySarkarScheduler to produce the PGTP

get_partition_info()

partition parameter and log entry return a string

merge_partitions(new_num_parts, form_island=False, island_type=0, visual=False)

This is called during resource mapping - deploying partitioned PGT to a list of nodes

form_island: If True, the merging will form new_num_parts logical

islands on top of existing partitions (i.e. nodes)

If False, the merging will physically merge current partitions into new_num_parts new partitions (i.e. nodes) Thus, there will be no node-island ‘hierarchies’ created

island_type: integer, 0 - data island, 1 - compute island

to_gojs_json(string_rep=True, outdict=None, visual=False)

Partition the PGT into a real “PGT with Partitions”, thus PGTP

to_partition_input(outf)

Convert to format for mapping and decomposition

class dlg.dropmake.pg_generator.PGT(drop_list, build_dag=True)

A DROP representation of Physical Graph Template

dag
Return the networkx nx.DiGraph object

The weight of the same edge (u, v) also depends. If it is called after the partitioning, it could have been zeroed if both u and v is allocated to the same DropIsland

data_movement

Return the TOTAL data movement

get_opt_num_parts()

dummy for now

json

Return the JSON string representation of the PGT for visualisation

pred_exec_time(app_drop_only=False, wk='weight', force_answer=False)

Predict execution time using the longest path length

to_gojs_json(string_rep=True, outdict=None, visual=False)

Convert PGT (without any partitions) to JSON for visualisation in GOJS

Sub-class PGTPs will override this function, and replace this with actual partitioning, and the visulisation becomes an option

to_partition_input(outf)

Convert to format for mapping and decomposition

to_pg_spec(node_list, ret_str=True, num_islands=1, tpl_nodes_len=0)

convert pgt to pg specification, and map that to the hardware resources

node_list:
A list of nodes (list), whose length == (num_islands + num_node_mgrs) We assume that the MasterDropManager’s node is NOT in the node_list
num_islands:

>1 - Partitions are “conceptually” clustered into Islands 1 - Partitions MAY BE physically merged without generating islands

depending on the length of node_list
class dlg.dropmake.pg_generator.PSOPGTP(drop_list, par_label='Partition', max_dop=8, deadline=None, topk=30, swarm_size=40, merge_parts=False)
get_partition_info()

partition parameter and log entry return a string

dlg.dropmake.pg_generator.partition(pgt, algo, num_partitions=1, num_islands=1, partition_label='partition', show_gojs=False, **algo_params)

Partitions a Physical Graph Template

dlg.dropmake.pg_generator.unroll(lg, oid_prefix=None)

Unrolls a logical graph

dlg.dropmake.scheduler

class dlg.dropmake.scheduler.DAGUtil

Helper functions dealing with DAG

static build_dag_from_drops(drop_list, embed_drop=True, fake_super_root=False)

return a networkx Digraph (DAG) fake_super_root: whether to create a fake super root node in the DAG

If set to True, it enables edge zero-based scheduling agorithms to make more aggressive merging

tw - task weight dw - data weight / volume

static ganttchart_matrix(G, topo_sort=None)

Return a M (# of DROPs) by N (longest path length) matrix

static get_longest_path(G, weight='weight', default_weight=1, show_path=True, topo_sort=None)

Ported from: https://github.com/networkx/networkx/blob/master/networkx/algorithms/dag.py Added node weight

Returns the longest path in a DAG If G has edges with ‘weight’ attribute the edge data are used as weight values. Parameters ———- G : NetworkX DiGraph

Graph
weight : string (default ‘weight’)
Edge data key to use for weight
default_weight : integer (default 1)
The weight of edges that do not have a weight attribute
path : list
Longest path
path_length : float
The length of the longest path
static get_max_antichains(G)

return a list of antichains with Top-2 lengths

static get_max_dop(G)

Get the maximum degree of parallelism of this DAG return : int

static get_max_width(G, weight='weight', default_weight=1)

Get the antichain with the maximum “weighted” width of this DAG weight: float (for example, it could be RAM consumption in GB) Return : float

static label_schedule(G, weight='weight', topo_sort=None)

for each node, label its start and end time

static metis_part(G, num_partitions)

Use metis binary executable (instead of library) This is used only for testing when libmetis halts unexpectedly

static prune_antichains(antichains)

Prune a list of antichains to keep those with Top-2 lengths antichains is a Generator (not a list!)

class dlg.dropmake.scheduler.DSCScheduler(drop_list)

Based on T. Yang and A. Gerasoulis, “DSC: scheduling parallel tasks on an unbounded number of processors,” in IEEE Transactions on Parallel and Distributed Systems, vol.5, no.9, pp.951-967, Sep 1994

class dlg.dropmake.scheduler.DilworthPartition(gid, max_dop)

Use Dilworth theorem to determine DoP see https://en.wikipedia.org/wiki/Dilworth’s_theorem

The idea goes as follows: Let bpg = bipartite_graph(dag)

DoP == Poset Width == len(max_antichain) == len(min_num_chain) == (cardinality(dag) - len(max_matching(bpg)))

Note that cardinality(dag) == cardinality(bpg) / 2

See Section 3 of the paper http://opensource.uom.gr/teaching/distrubutedSite/eceutexas/dist2/ termPapers/Selma.pdf

Also http://codeforces.com/blog/entry/3781

The key is to incrementally construct the bipartite graph (bpg) from growing dag

add(u, v, gu, gv, sequential=False, global_dag=None)

Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains

can_add(u, v, gu, gv)

Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.

can_merge(that)
if (self._max_dop + that._max_dop <= self._ask_max_dop):
return True
class dlg.dropmake.scheduler.GraphAnnealer(state, scheduler, deadline=None, topk=None)

Use simulated annealing for a DAG/Graph scheduling problem. There are two ways to inject constraints:

  1. explicitly implement the meet_constraint function
  2. add an extra penalty term in the energy function
energy()

Calculates the number of partitions

meet_constraint()

Check if the contraint is met By default, it is always met

move()

Select the neighbour, in this case Swaps two edges in the DAG if they are not the same and simply reduce by one for one of them if otherwise

class dlg.dropmake.scheduler.KFamilyPartition(gid, max_dop, w_attr='num_cpus', global_dag=None)

A special case (K = 1) of the Maximum Weighted K-families based on the Theorem 3.1 in http://fmdb.cs.ucla.edu/Treports/930014.pdf

add_node(u, weight)

Add a single node u to the partition

can_merge(that, u, v)
class dlg.dropmake.scheduler.MCTSScheduler(drop_list, max_dop=8, dag=None, deadline=None, max_moves=1000, max_calc_time=10)

Use Monte Carlo Tree Search to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Monte_Carlo_tree_search Use basic functions in PSOScheduler by inheriting it for convinence

partition_dag()

Trigger the MCTS algorithm Returns a tuple of:

  1. the # of partitions formed (int)
  2. the parallel time (longest path, int)
  3. partition time (seconds, float)
  4. a list of partitions (Partition)
class dlg.dropmake.scheduler.MinNumPartsScheduler(drop_list, deadline, max_dop=8, dag=None, optimistic_factor=0.5)

A special type of partition that aims to schedule the DAG on time but at minimum cost. In this particular case, the cost is the number of partitions that will be generated. The assumption is # of partitions (with certain DoP) more or less represents resource footprint.

is_time_critical(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)

This is called ONLY IF either can_add on partition has returned “False” or the new critical path is longer than the old one at each iteration

Parameters:
u - node u, v - node v, uw - weight of node u, vw - weight of node v curr_lpl - current longest path length, ow - current edge weight rem_el - remainig edges to be zeroed ow - original edge length
Returns:
Boolean

It looks ahead to compute the probability of time being critical and compares that with the _optimistic_factor probility = (num of edges need to be zeroed to meet the deadline) / (num of remaining unzeroed edges)

override_cannot_add()

Whether this scheduler will override the False result from Partition.can_add()

class dlg.dropmake.scheduler.MultiWeightPartition(gid, max_dops, w_attrs=['num_cpus'], global_dag=None)
add(u, v, gu, gv, sequential=False, global_dag=None)

Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains

can_add(u, v, gu, gv)

Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.

can_merge(that)
class dlg.dropmake.scheduler.MySarkarScheduler(drop_list, max_dop=8, dag=None, dump_progress=False)

Based on “V. Sarkar, Partitioning and Scheduling Parallel Programs for Execution on Multiprocessors. Cambridge, MA: MIT Press, 1989.”

Main change We do not order independent tasks within the same cluster. This could blow the cluster, therefore we allow for a cost constraint on the number of concurrent tasks (e.g. # of cores) within each cluster

Why 1. we only need to topologically sort the DAG once since we do not add new edges in the cluster 2. closer to requirements 3. adjustable for local schedulers

Similar ideas: http://stackoverflow.com/questions/3974731

is_time_critical(u, uw, unew, v, vw, vnew, curr_lpl, ow, rem_el)

This is called ONLY IF override_cannot_add has returned “True” Parameters:

u - node u, v - node v, uw - weight of node u, vw - weight of node v curr_lpl - current longest path length, ow - current edge weight rem_el - remainig edges to be zeroed ow - original edge length
Returns:
Boolean

MySarkarScheduler always returns False

override_cannot_add()

Whether this scheduler will override the False result from Partition.can_add()

partition_dag()
Return a tuple of
  1. the # of partitions formed (int)
  2. the parallel time (longest path, int)
  3. partition time (seconds, float)
class dlg.dropmake.scheduler.PSOScheduler(drop_list, max_dop=8, dag=None, deadline=None, topk=30, swarm_size=40)

Use the Particle Swarm Optimisation to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Particle_swarm_optimization

The idea is to let “edgezeroing” becomes the search variable X The number of dimensions of X is the number of edges in DAG Possible values for each dimension is a discrete set {1, 2, 3} where

10 - no zero (2 in base10) + 1 00 - zero w/o linearisation (0 in base10) + 1 01 - zero with linearisation (1 in base10) + 1
if (deadline is present):
the objective function sets up a partition scheme such that
  1. DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation
  2. returns num_of_partitions
constrain function:
  1. makespan < deadline
else:
the objective function sets up a partition scheme such that
  1. DoP constrints for each partiiton are satisfied based on X[i] value, reject or linearisation
  2. returns makespan
constrain_func(x)

Deadline - critical_path >= 0

objective_func(x)

x is a list of values, each taking one of the 3 integers: 0,1,2 for an edge indices of x is identical to the indices in G.edges().sort(key=’weight’)

partition_dag()
Returns a tuple of:
  1. the # of partitions formed (int)
  2. the parallel time (longest path, int)
  3. partition time (seconds, float)
  4. a list of partitions (Partition)
class dlg.dropmake.scheduler.Partition(gid, max_dop)

Logical partition, multiple (1 ~ N) of these can be placed onto a single physical resource unit

Logical partition can be nested, and it somewhat resembles the dlg.manager.drop_manager

add(u, v, gu, gv, sequential=False, global_dag=None)

Add nodes u and/or v to the partition if sequential is True, break antichains to sequential chains

add_node(u, weight)

Add a single node u to the partition

can_add(u, v, gu, gv)

Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.

probe_max_dop(u, v, unew, vnew, update=False)

An incremental antichain (which appears significantly more efficient than the networkx antichains) But only works for DoP, not for weighted width

remove(n)

Remove node n from the partition

schedule

Get the schedule assocaited with this partition

class dlg.dropmake.scheduler.SAScheduler(drop_list, max_dop=8, dag=None, deadline=None, topk=None, max_iter=6000)

Use Simulated Annealing to guide the Sarkar algorithm https://en.wikipedia.org/wiki/Simulated_annealing http://apmonitor.com/me575/index.php/Main/SimulatedAnnealing Use basic functions in PSOScheduler by inheriting it for convinence

partition_dag()

Trigger the SA algorithm Returns a tuple of:

  1. the # of partitions formed (int)
  2. the parallel time (longest path, int)
  3. partition time (seconds, float)
  4. a list of partitions (Partition)
class dlg.dropmake.scheduler.Schedule(dag, max_dop)

The scheduling solution with schedule-related properties

efficiency

resource usage percentage (integer)

schedule_matrix
Return: a self._lpl x self._max_dop matrix
(X - time, Y - resource unit / parallel lane)
workload
Return: (integer)
the mean # of resource units per time unit consumed by the graph/partition
class dlg.dropmake.scheduler.Scheduler(drop_list, max_dop=8, dag=None)

Static Scheduling consists of three steps: 1. partition the DAG into an optimal number (M) of partitions

goal - minimising execution time while maintaining intra-partition DoP
  1. merge partitions into a given number (N) of partitions (if M > N)
    goal - minimise logical communication cost while maintaining load balancing
  2. map each merged partition to a resource unit
    goal - minimise physical communication cost amongst resource units
map_partitions()

map logical partitions to physical resources

merge_partitions(num_partitions, bal_cond=0)
Merge M partitions into N partitions where N < M
implemented using METIS for now
bal_cond: load balance condition (integer):
0 - workload, 1 - count
exception dlg.dropmake.scheduler.SchedulerException
class dlg.dropmake.scheduler.WeightedDilworthPartition(gid, max_dop, global_dag=None)

The extensions on DilworthPartition

Support “weights” for each Drop’s DoP
(e.g. CLEAN AppDrop uses 8 cores)

This requires a “weighted” maximal antichain. The solution is to create a weighted version of the bipartite graph without changing the original partition DAG. This allows us to use the same max matching algorithm to find the max antichain

It has an option (global_dag) to deal with global path reachability, which could be missing from the DAG inside the local partition. Such misses inflate DoP values, leading to more rejected partition merge requests, which in turn creates more partitions. Based on our experiment, without switching on this option, scheduling the “chiles_two_dev2” pipeline will create 45 partitions (i.e. 45 compute nodes, each has 8 cores). Turning on this option bring that number down to 24 within the same execution time.

Off - exec_time:92 - min_exec_time:67 - total_data_movement:510 - algo:Edge Zero - num_parts:45

On - exec_time:92 - min_exec_time:67 - total_data_movement:482 - algo:Edge Zero - num_parts:24

However “probing reachability” slows down partitioning by a factor of 3 in the case of the CHILES2 pipeline. Some techniques may be applicable e.g. http://www.sciencedirect.com/science/article/pii/S0196677483710175

can_add(u, v, gu, gv)

Check if nodes u and/or v can join this partition A node may be rejected due to reasons such as: DoP overflow or completion time deadline overdue, etc.

can_merge(that)

Need to merge split graph as well to speed up!

dlg.dropmake.pg_manager

Refer to https://confluence.ska-sdp.org/display/PRODUCTTREE/C.1.2.4.4.4+DFM+Physical+Graph+Manager

class dlg.dropmake.pg_manager.PGManager(root_dir)

Physical Graph Manager

add_pgt(pgt, lg_name)

Dummy impl. using file system for now (thread safe) TODO - use proper graph databases to manage all PGTs

Return:
A unique PGT id (handle)
get_gantt_chart(pgt_id, json_str=True)
Return:
the gantt chart matrix (numarray) given a PGT id
get_pgt(pgt_id)
Return:
The PGT object given its PGT id
get_schedule_matrices(pgt_id, json_str=True)
Return:
a list of schedule matrices (numarrays) given a PGT id
class dlg.dropmake.pg_manager.PGUtil

Helper functions dealing with Physical Graphs

static vstack_mat(A, B, separator=False)

Vertically stack two matrices that may have different # of colums A:

matrix A (2d numpy array)
B:
matrix B (2d numy array)
separator:
whether to add an empty row separator between the two matrices (boolean)
Return:
the vertically stacked matrix (2d numpy array)