dlg

dlg.ddap_protocol

class dlg.ddap_protocol.AppDROPStates

An enumeration of the different execution states an AppDROP can be found in. AppDROPs start in the NOT_RUN state, and move to the RUNNING state when they are started. Depending on the execution result they eventually move to the FINISHED or ERROR state.

class dlg.ddap_protocol.ChecksumTypes

An enumeration of different methods to calculate the checksum of a piece of data. DROPs (in certain conditions) calculate and keep the checksum of the data they represent, and therefore also know the method used to calculate it.

class dlg.ddap_protocol.DROPLinkType

An enumeration of the different relationships that can exist between DROPs.

Although not explicitly stated in this enumeration, each link type has a corresponding inverse. This way, if X is a consumer of Y, Y is an input of X. The full list is: * CONSUMER / INPUT * STREAMING_CONSUMER / STREAMING_INPUT * PRODUCER / OUTPUT * PARENT / CHILD

class dlg.ddap_protocol.DROPPhases

An enumeration of the different phases a DROP can be found in. Phases represent the persistence of the data associated to a DROP and the presence of replicas. Phases range from PLASMA (no replicas, volatile storage) to SOLID (fully backed up replica available).

class dlg.ddap_protocol.DROPRel(lhs, rel, rhs)
property lhs

Alias for field number 0

property rel

Alias for field number 1

property rhs

Alias for field number 2

class dlg.ddap_protocol.DROPStates

An enumeration of the different states a DROP can be found in. DROPs start in the INITIALIZED state, go optionally through WRITING and arrive to COMPLETED. Later, they transition through EXPIRED, eventually arriving to DELETED.

class dlg.ddap_protocol.ExecutionMode

Execution modes for a DROP. DROP means that a DROP will trigger its consumers automatically when it becomes COMPLETED. EXTERNAL means that a DROP will not trigger its consumers automatically, and instead this should be done by an external entity, probably by subscribing to changes on the DROP’s status.

This value exists per DROP, and therefore we can achieve a mixed execution mode for the entire graph, where some DROPs trigger automatically their consumers, while others must be manually executed from the outside.

Note that if all DROPs in a graph have ExecutionMode == DROP it means that the graph effectively drives its own execution without external intervention.

dlg.drop

Module containing the core DROP classes.

class dlg.drop.AbstractDROP(oid, uid, **kwargs)

Base class for all DROP implementations.

A DROP is a representation of a piece of data. DROPs are created, written once, potentially read many times, and they finally potentially expire and get deleted. Subclasses implement different storage mechanisms to hold the data represented by the DROP.

If the data represented by this DROP is written through this object (i.e., calling the write method), this DROP will keep track of the data’s size and checksum. If the data is written externally, the size and checksum can be fed into this object for future reference.

DROPs can have consumers attached to them. ‘Normal’ consumers will wait until the DROP they ‘consume’ (their ‘input’) moves to the COMPLETED state and then will consume it, most typically by opening it and reading its contents, but any other operation could also be performed. How the consumption is triggered depends on the producer’s executionMode flag, which dictates whether it should trigger the consumption itself or if it should be manually triggered by an external entity. On the other hand, streaming consumers receive the data that is written into its input as it gets written. This mechanism is driven always by the DROP that acts as a streaming input. Apart from receiving the data as it gets written into the DROP, streaming consumers are also notified when the DROPs moves to the COMPLETED state, at which point no more data should be expected to arrive at the consumer side.

DROPs’ data can be expired automatically by the system after the DROP has transitioned to the COMPLETED state if they are created by a DROP Manager. Expiration can either be triggered by an interval relative to the creation time of the DROP (via the lifespan keyword), or by specifying that the DROP should be expired after all its consumers have finished (via the expireAfterUse keyword). These two methods are mutually exclusive. If none is specified no expiration occurs.

addConsumer(consumer, back=True)

Adds a consumer to this DROP.

Consumers are normally (but not necessarily) AppDROPs that get notified when this DROP moves into the COMPLETED or ERROR states. This is done by firing an event of type dropCompleted to which the consumer subscribes to.

This is one of the key mechanisms by which the DROP graph is executed automatically. If AppDROP B consumes DROP A, then as soon as A transitions to COMPLETED B will be notified and will probably start its execution.

addProducer(producer, back=True)

Adds a producer to this DROP.

Producers are AppDROPs that write into this DROP; from the producers’ point of view, this DROP is one of its many outputs.

When a producer has finished its execution, this DROP will be notified via the self.producerFinished() method.

addStreamingConsumer(streamingConsumer, back=True)

Adds a streaming consumer to this DROP.

Streaming consumers are AppDROPs that receive the data written into this DROP as it gets written, and therefore do not need to wait until this DROP has been moved to the COMPLETED state.

autofill_environment_variables()

Runs through all parameters here, fetching those which match the env-var syntax when discovered.

cancel()

Moves this drop to the CANCELLED state closing any writers we opened

commit()

Generates the MerkleRoot of this DROP Should only be called once this DROP is completed.

completedrop()

Builds final reproducibility data for this drop and fires a ‘dropComplete’ event. This should be called once a drop is finished in success or error :return:

property consumers

The list of ‘normal’ consumers held by this DROP.

See

self.addConsumer()

dropReproComplete(uid, reprodata)

Callback invoved when a DROP with UID uid has finishing processing its reproducibility information. Importantly, this is independent of that drop being completed.

property executionMode

The execution mode of this DROP. If ExecutionMode.DROP it means that this DROP will automatically trigger the execution of all its consumers. If ExecutionMode.EXTERNAL it means that this DROP will not trigger its consumers, and therefore an external entity will have to do it.

generate_merkle_data()

Provides a serialized summary of data as a list. Fields constitute a single entry in this list. Wraps several methods dependent on this DROPs reproducibility level Some of these are abstract. :return: A dictionary of elements constituting a summary of this drop

generate_recompute_data()

Provides a dictionary containing recompute data. At runtime, recomputing, like repeating and rerunning, by default, only shows success or failure. We anticipate that any further implemented behaviour be done at a lower class. :return: A dictionary containing runtime exclusive recompute values.

generate_repeat_data()

Provides a list of Repeat data. At runtime, repeating, like rerunning only requires execution success or failure. :return: A dictionary containing runtime exclusive repetition values.

generate_replicate_comp_data()

Provides a list of computational replication data. This is by definition a merging of both reproduction and recompute data :return: A dictionary containing runtime exclusive computational replication data.

generate_replicate_sci_data()

Provides a list of scientific replication data. This is by definition a merging of both reproduction and rerun data :return: A dictionary containing runtime exclusive scientific replication data.

generate_replicate_total_data()

Provides a list of total replication data. This is by definition a merging of reproduction and repetition data :return: A dictionary containing runtime exclusive total replication data.

generate_reproduce_data()

Provides a list of Reproducibility data (specifically). The default behaviour is to return nothing. Per-class behaviour is to be achieved by overriding this method. :return: A dictionary containing runtime exclusive reproducibility data.

generate_rerun_data()

Provides a serailized list of Rerun data. At runtime, Rerunning only requires execution success or failure. :return: A dictionary containing rerun values

get_consumers_nodes()

Gets the physical node address(s) of the consumer of this drop.

get_environment_variable(key: str)

Expects keys of the form $store_name.var_name $store_name.var_name.sub_var_name will query store_name for var_name.sub_var_name

get_environment_variables(keys: list)

Expects multiple instances of the single key form

handleEvent(e)

Handles the arrival of a new event. Events are delivered from those objects this DROP is subscribed to.

handleInterest(drop)

Main mechanism through which a DROP handles its interest in a second DROP it isn’t directly related to.

A call to this method should be expected for each DROP this DROP is interested in. The default implementation does nothing, but implementations are free to perform any action, such as subscribing to events or storing information.

At this layer only the handling of such an interest exists. The expression of such interest, and the invocation of this method wherever necessary, is currently left as a responsibility of the entity creating the DROPs. In the case of a Session in a DROPManager for example this step would be performed using deployment-time information contained in the dropspec dictionaries held in the session.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

isCompleted()

Checks whether this DROP is currently in the COMPLETED state or not

property oid

The DROP’s Object ID (OID). OIDs are unique identifiers given to semantically different DROPs (and by consequence the data they represent). This means that different DROPs that point to the same data semantically speaking, either in the same or in a different storage, will share the same OID.

property parent

The DROP that acts as the parent of the current one. This parent/child relationship is created by ContainerDROPs, which are a specific kind of DROP.

property persist

Whether this DROP should be considered persisted after completion

property phase

This DROP’s phase. The phase indicates the resilience of a DROP.

producerFinished(uid, drop_state)

Method called each time one of the producers of this DROP finishes its execution. Once all producers have finished this DROP moves to the COMPLETED state (or to ERROR if one of the producers is on the ERROR state).

This is one of the key mechanisms through which the execution of a DROP graph is accomplished. If AppDROP A produces DROP B, as soon as A finishes its execution B will be notified and will move itself to COMPLETED.

property producers

The list of producers that write to this DROP

See

self.addProducer()

setCompleted()

Moves this DROP to the COMPLETED state. This can be used when not all the expected data has arrived for a given DROP, but it should still be moved to COMPLETED, or when the expected amount of data held by a DROP is not known in advanced.

setError()

Moves this DROP to the ERROR state.

property size

The size of the data pointed by this DROP. Its value is automatically calculated if the data was actually written through this DROP (using the self.write() method directly or indirectly). In the case that the data has been externally written, the size can be set externally after the DROP has been moved to COMPLETED or beyond.

skip()

Moves this drop to the SKIPPED state closing any writers we opened

property status

The current status of this DROP.

property streamingConsumers

The list of ‘streaming’ consumers held by this DROP.

See

self.addStreamingConsumer()

property uid

The DROP’s Unique ID (UID). Unlike the OID, the UID is globally different for all DROP instances, regardless of the data they point to.

class dlg.drop.ListAsDict(my_set)

A list that adds drop UIDs to a set as they get appended to the list

append(drop)

Append object to the end of the list.

dlg.droputils

Utility methods and classes to be used when interacting with DROPs

class dlg.droputils.DROPFile(drop)

A file-like object (currently only supporting the read() operation, more to be added in the future) that wraps the DROP given at construction time.

Depending on the underlying storage of the data the file-like object returned by this method will directly access the data pointed by the DROP if possible, or will access it through the DROP methods instead.

Objects of this class will automatically close themselves when no referenced anymore (i.e., when __del__ is called), but users should still try to invoke close() eagerly to free underlying resources.

Objects of this class can also be used in a with context.

class dlg.droputils.DROPWaiterCtx(test, drops, timeout=1, expected_states=[])

Class used by unit tests to trigger the execution of a physical graph and wait until the given set of DROPs have reached its COMPLETED status.

It does so by appending an EvtConsumer consumer to each DROP before they are used in the execution, and finally checking that the events have been set. It should be used like this inside a test class:

# There is a physical graph that looks like: a -> b -> c
with DROPWaiterCtx(self, c):
    a.write('a')
    a.setCompleted()
class dlg.droputils.EvtConsumer(evt, expected_states=[])

Small utility class that sets the internal flag of the given threading.Event object when consuming a DROP. Used throughout the tests as a barrier to wait until all DROPs of a given graph have executed.

dlg.droputils.allDropContents(drop, bufsize=65536) bytes

Returns all the data contained in a given DROP

dlg.droputils.breadFirstTraverse(toVisit)

Breadth-first iterator for a DROP graph.

This iterator yields a tuple where the first item is the node being visited, and the second is a list of nodes that will be visited subsequently. Callers can alter this list in order to remove certain nodes from the graph traversal process.

This implementation is non-recursive.

dlg.droputils.copyDropContents(source: DataDROP, target: DataDROP, bufsize: int = 65536)

Manually copies data from one DROP into another, in bufsize steps

dlg.droputils.depthFirstTraverse(node: AbstractDROP, visited=[])

Depth-first iterator for a DROP graph.

This iterator yields a tuple where the first item is the node being visited, and the second is a list of nodes that will be visited subsequently. Callers can alter this list in order to remove certain nodes from the graph traversal process.

This implementation is recursive.

dlg.droputils.getDownstreamObjects(drop)

Returns a list of all direct “downstream” DROPs for the given DROP. A DROP A is “downstream” with respect to DROP B if any of the following conditions are true: * A is an output of B (therefore B is an AppDROP) * A is a normal or streaming consumer of B (and A is therefore an AppDROP)

In practice if A is a downstream DROP of B means that it cannot advance to the COMPLETED state until B does so.

dlg.droputils.getLeafNodes(drops)

Returns a list of all the “leaf nodes” of the graph pointed by drops. drops is either a single DROP, or a list of DROPs.

dlg.droputils.getUpstreamObjects(drop: AbstractDROP)

Returns a list of all direct “upstream” DROPs for the given+ DROP. An DROP A is “upstream” with respect to DROP B if any of the following conditions are true:

  • A is a producer of B (therefore A is an AppDROP)

  • A is a normal or streaming input of B (and B is therefore an AppDROP)

In practice if A is an upstream DROP of B means that it must be moved to the COMPLETED state before B can do so.

dlg.droputils.has_path(x)

Returns True if x has a path attribute

dlg.droputils.listify(o)

If o is already a list return it as is; if o is a tuple returns a list containing the elements contained in the tuple; otherwise returns a list with o being its only element

dlg.droputils.load_npy(drop: DataDROP, allow_pickle=False) <MagicMock id='140205600763536'>

Loads a numpy ndarray from a drop in npy format

dlg.droputils.load_pickle(drop: DataDROP) Any

Loads a pkl formatted data object stored in a DataDROP. Note: does not support streaming mode.

dlg.droputils.replace_dataurl_placeholders(cmd, inputs, outputs)

Replaces any placeholder found in cmd with the dataURL property of the respective input or output Drop from inputs or outputs. Placeholders have the different formats:

  • %iDataURLN, with N starting from 0, indicates the path of the N-th element from the inputs argument; likewise for %oDataURLN.

  • %iDataURL[X] indicates the path of the input with UID X; likewise for %oDataURL[X].

dlg.droputils.replace_path_placeholders(cmd, inputs, outputs)

Replaces any placeholder found in cmd with the path of the respective input or output Drop from inputs or outputs. Placeholders have the different formats:

  • %iN, with N starting from 0, indicates the path of the N-th element from the inputs argument; likewise for %oN.

  • %i[X] indicates the path of the input with UID X; likewise for %o[X].

dlg.droputils.save_npy(drop: DataDROP, ndarray: <MagicMock id='140205600760528'>, allow_pickle=False)

Saves a numpy ndarray to a drop in npy format

dlg.droputils.save_pickle(drop: DataDROP, data: Any)

Saves a python object in pkl format

dlg.event

class dlg.event.Event(type: str)

An event sent through the DALiuGE framework.

Events have at least a field describing the type of event they are (instead of having subclasses of the Event class), and therefore this class makes sure that at least that field exists. Any other piece of information can be attached to individual instances of this class, depending on the event type.

class dlg.event.EventFirer

An object that fires events.

Objects that have an interest on receiving events from this object subscribe to it via the subscribe method; likewise they can unsubscribe from it via the unsubscribe method. Events are handled to the listeners by calling their handleEvent method with the event as its sole argument.

Listeners can specify the type of event they listen to at subscription time, or can also prefer to receive all events fired by this object if they wish so.

subscribe(listener: EventHandler, eventType: Optional[str] = None)

Subscribes listener to events fired by this object. If eventType is not None then listener will only receive events of eventType that originate from this object, otherwise it will receive all events.

unsubscribe(listener: EventHandler, eventType: Optional[str] = None)

Unsubscribes listener from events fired by this object.

class dlg.event.EventHandler

dlg.graph_loader

Module containing functions to load a fully-functional DROP graph from its full JSON representation.

Adds a link from lhDropSpec to point to rhOID. The link type (e.g., a consumer) is signaled by linkType.

dlg.graph_loader.loadDropSpecs(dropSpecList)

Loads the DROP definitions from dropSpectList, checks that the DROPs are correctly specified, and return a dictionary containing all DROP specifications (i.e., a dictionary of dictionaries) keyed on the OID of each DROP. Unlike readObjectGraph and readObjectGraphS, this method doesn’t actually create the DROPs themselves.

Slices off graph-wise reproducibility data for later use

dlg.rpc

RPC support for DALiuGE

This module contains all client and server RPC classes for all the different technologies we support.

class dlg.rpc.DropProxy(rpc_client, hostname, port, sessionId, uid)

A proxy to a remote drop.

It forwards attribute requests and procedure calls through the given RPC client.

dlg.rpc.RPCClient

alias of ZeroRPCClient

class dlg.rpc.RPCClientBase

Base class for all RPC clients

class dlg.rpc.RPCObject

Base class for all RCP clients and server

dlg.rpc.RPCServer

alias of ZeroRPCServer

class dlg.rpc.RPCServerBase(host, port)

Base class for all RPC server

class dlg.rpc.ZeroRPCClient(*args, **kwargs)

ZeroRPC client support

class request(method, args, queue)
property args

Alias for field number 1

property method

Alias for field number 0

property queue

Alias for field number 2

class response(value, is_exception)
property is_exception

Alias for field number 1

property value

Alias for field number 0

class dlg.rpc.ZeroRPCServer(host, port)

ZeroRPC server support

dlg.runtime.delayed

dlg.runtime.delayed(x, *args, **kwargs)

Like dask.delayed, but quietly swallowing anything other than nout

dlg.utils

Module containing miscellaneous utility classes and functions.

class dlg.utils.ExistingProcess(pid)

A Popen-like class around an existing process

kill()

Send a KILL signal

poll()

Returns an exit status if the process finished, None if it exists

terminate()

Send a TERM signal

wait()

Wait until the process finishes

class dlg.utils.ZlibUncompressedStream(content)

A class that reads gzip-compressed content and returns uncompressed content each time its read() method is called.

dlg.utils.browse_service(zc, service_type_name, protocol, callback)

ZeroConf: Browse for services based on service type and protocol

callback signature: callback(zeroconf, service_type, name, state_change)

zeroconf: ZeroConf object service_type: zeroconf service name: service name state_change: ServiceStateChange type (Added, Removed)

Returns ZeroConf object

dlg.utils.createDirIfMissing(path)

Creates the given directory if it doesn’t exist

dlg.utils.deregister_service(zc, info)

ZeroConf: Deregister service

dlg.utils.escapeQuotes(s, singleQuotes=True, doubleQuotes=True)

Escapes single and double quotes in a string. Useful to include commands in a shell invocation or similar.

dlg.utils.fname_to_pipname(fname)

Converts a graph filename (extension .json or .graph) to its “pipeline” name (the basename without the extension).

dlg.utils.getDlgDir()

Returns the root of the directory structure used by the DALiuGE framework at runtime.

dlg.utils.getDlgLogsDir()

Returns the location of the directory used by the DALiuGE framework to store its logs. If createIfMissing is True, the directory will be created if it currently doesn’t exist

dlg.utils.getDlgPath()

Returns the location of the directory used by the DALiuGE framework to look for additional code. If createIfMissing is True, the directory will be created if it currently doesn’t exist

dlg.utils.getDlgPidDir()

Returns the location of the directory used by the DALiuGE framework to store its PIDs. If createIfMissing is True, the directory will be created if it currently doesn’t exist

dlg.utils.getDlgVariable(key: str)

Queries environment for variables assumed to start with ‘DLG’. Special case for DLG_ROOT, since this is easily identifiable.

dlg.utils.getDlgWorkDir()

Returns the location of the directory used by the DALiuGE framework to store results. If createIfMissing is True, the directory will be created if it currently doesn’t exist

dlg.utils.get_all_ipv4_addresses()

Get a list of all IPv4 interfaces found in this computer

dlg.utils.get_local_ip_addr()

Enumerate all interfaces and return bound IP addresses (exclude localhost)

dlg.utils.get_symbol(name)

Gets the global symbol name, which is an “absolute path” to a python name in the form of pkg.subpkg.subpkg.module.name

dlg.utils.isabs(path)

Like os.path.isabs, but handles None

dlg.utils.object_tracking(name)

Returns a decorator that helps classes track which object is currently under execution. This is done via a thread local object, which can be accessed via the ‘tlocal’ attribute of the returned decorator.

dlg.utils.prepare_sql(sql, paramstyle, data=()) Tuple[str, dict]

Prepares the given SQL statement for proper execution depending on the parameter style supported by the database driver. For this the SQL statement must be written using the “{X}” or “{}” placeholders in place for each, parameter which is a style-agnostic parameter notation.

This method returns a tuple containing the prepared SQL statement and the values to be bound into the query as required by the driver.

dlg.utils.register_service(zc, service_type_name, service_name, ipaddr, port, protocol='tcp')

ZeroConf: Register service type, protocol, ipaddr and port

Returns ZeroConf object and ServiceInfo object

dlg.utils.timed_import(module_name)

Imports module_name and log how long it took to import it

dlg.utils.to_externally_contactable_host(host, prefer_local=False)

Turns host, which is an address used to bind a local service, into a host that can be used to externally contact that service.

This should be used when there is no other way to find out how a client to that service is going to connect to it.

dlg.utils.zmq_safe(host_or_addr)

Converts host_or_addr to a format that is safe for ZMQ to use