dlg.apps

This package contains several general-purpose applications in form of DROPs that we have developed as examples and for real-life use. Most of them are based on the BarrierAppDROP.

dlg.apps.bash_shell_app

Module containing bash-related AppDrops

The module contains four classes that offer running bash commands in different execution modes; that is, in fully batch mode, or with its input and/or output as a stream of data to the previous/next application.

class dlg.apps.bash_shell_app.BashShellApp(**kwargs)

An app that runs a bash command in batch mode; that is, it waits until all its inputs are COMPLETED. It also doesn’t output a stream of data; see StreamingOutputBashApp for those cases.

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

class dlg.apps.bash_shell_app.BashShellBase

Common class for BashShell apps. It simply requires a command to be specified.

class dlg.apps.bash_shell_app.StreamingInputBashApp(**kwargs)

An app that runs a bash command that consumes data from stdin.

The streaming of data that appears on stdin takes place outside the framework; what is streamed through the framework is the information needed to establish the streaming channel. This information is also used to kick this application off.

class dlg.apps.bash_shell_app.StreamingInputBashAppBase(**kwargs)

Base class for bash command applications that consume a stream of incoming data.

dataWritten(uid, data)

Callback invoked when data has been written into the DROP with UID uid (which is one of the streaming inputs of this AppDROP). By default no action is performed

dropCompleted(uid, drop_state)

Callback invoked when the DROP with UID uid (which is either a normal or a streaming input of this AppDROP) has moved to the COMPLETED or ERROR state. By default no action is performed.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

class dlg.apps.bash_shell_app.StreamingInputOutputBashApp(**kwargs)

Like StreamingInputBashApp, but its stdout is also a stream of data that is fed into the next application.

class dlg.apps.bash_shell_app.StreamingOutputBashApp(**kwargs)

Like BashShellApp, but its stdout is a stream of data that is fed into the next application.

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

dlg.apps.bash_shell_app.prepare_input_channel(data)

Prepares an input channel that will serve as the stdin of a bash command. Depending on the contents of data the channel will be a named pipe or a socket.

dlg.apps.bash_shell_app.prepare_output_channel(this_node, out_drop)

Prepares an output channel that will serve as the stdout of a bash command. Depending on the values of this_node and out_drop the channel will be a named pipe or a socket.

dlg.apps.bash_shell_app.run_bash(cmd, app_uid, session_id, inputs, outputs, stdin=None, stdout=-1)

Runs the given cmd. If any inputs and/or outputs are given (dictionaries of uid:drop elements) they are used to replace any placeholder value in cmd with either drop paths or dataURLs.

stdin indicates at file descriptor or file object to use as the standard input of the bash process. If not given no stdin is given to the process.

Similarly, stdout is a file descriptor or file object where the standard output of the process is piped to. If not given it is consumed by this method and potentially logged.

dlg.apps.dynlib

class dlg.apps.dynlib.CDlgApp
class dlg.apps.dynlib.CDlgInput
class dlg.apps.dynlib.CDlgOutput
class dlg.apps.dynlib.CDlgStreamingInput
class dlg.apps.dynlib.DynlibApp(**kwargs)

Loads a dynamic library into the current process and runs it

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

class dlg.apps.dynlib.DynlibProcApp(**kwargs)

Loads a dynamic library in a different process and runs it

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

class dlg.apps.dynlib.DynlibStreamApp(**kwargs)
dataWritten(uid, data)

Callback invoked when data has been written into the DROP with UID uid (which is one of the streaming inputs of this AppDROP). By default no action is performed

dropCompleted(uid, drop_state)

Callback invoked when the DROP with UID uid (which is either a normal or a streaming input of this AppDROP) has moved to the COMPLETED or ERROR state. By default no action is performed.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

exception dlg.apps.dynlib.InvalidLibrary
exception dlg.apps.dynlib.finish_subprocess
dlg.apps.dynlib.get_from_subprocess(proc, q)

Gets elements from the queue, checking that the process is still alive

dlg.apps.dynlib.load_and_init(libname, oid, uid, params)

Loads and initializes libname with the given parameters, prepares the corresponding C application structure, and returns both objects

dlg.apps.dynlib.prepare_c_inputs(c_app, inputs)

Converts all inputs to its C equivalents and sets them into c_app

dlg.apps.dynlib.prepare_c_outputs(c_app, outputs)

Converts all outputs to its C equivalents and sets them into c_app

dlg.apps.dynlib.run(lib, c_app, input_closers)

Invokes the run method on lib with the given c_app. After completion, all opened file descriptors are closed.

dlg.apps.dockerapp

Module containing docker-related applications and functions

class dlg.apps.dockerapp.ContainerIpWaiter(drop)

A class that remembers the target DROP’s uid and containerIp properties when its internal event has been set, and returns them when waitForIp is called, which previously waits for the event to be set.

class dlg.apps.dockerapp.DockerApp(**kwargs)

A BarrierAppDROP that represents a process running in a container hosted by a local docker daemon. Depending on the host system, the docker daemon might be automatically activated when a client tries to connect to it via its unix socket (like with systemd) or it needs to be brought up prior to any client operation (upstart). In any case, if the daemon is not present, this class will raise exceptions whenever it tries to connect to the server to perform some operation.

Docker containers are built from docker images, which are pulled to the host where the docker daemon runs either explicitly (via docker pull) or less visibly (e.g., when running docker run using an image that has not been fetched yet). This DockerApp application will explicitly pull the image at initialize time, meaning that the docker images will become available at the time the physical graph (which this application is part of) is deployed. Docker containers also need a command to be run in them, which should be an available program inside the image.

Input and output

The inputs and outputs used by the dockerized application are made available by mapping host directories and files as “data volumes”. Inputs are bound using their full path, but outputs are bound only up to their dirnames, because otherwise they would be created at container creation time by Docker. For example, the output /a/b/c will produce a binding to /dlg/a/b inside the docker container, where c will have to be written by the process running in the container.

Since the command to be run in the container receives most probably as arguments the paths of its inputs and outputs, and since these might not be known precisely until runtime, users should use placeholders for them in the command-line specification. Placeholders for input locations take the form of “%iX”, where X starts from 0 and refers to the X-th filesystem-related input. Likewise, output locations are specified as “%oX”. Alternatively, inputs and outputs can be referred to by their UIDs, in which case the placeholders will look like “%i[X]” and “%o[X]” respectively, where X is the UID of the input/output being referenced.

Data volumes are a file-specific feature. For this reason, volumes are setup for file-system based input/output DROPs only, namely the FileDROP and the DirectoryContainer types. Other DROP types can instead pass down their dataURL property via the command-line by using placeholders. Placeholders for input DROP dataURLs take the form of “%iDataURLX”, where X starts from 0 and refers to the X-th non-filesystem related input. Likewise, output dataURLs are specified as “%oDataURLX”. Alternatively users can refer to the dataURL of a specific input or output as “%iDataURL[X]” and “%oDataURL[X]” respectively, where X is the UID of the input/output being referenced.

Additional volume bindings can be specified via the keyword arguments when creating the DockerApp. The host file/directories must exist at the moment of creating the DockerApp; otherwise it will fail to initialize.

Users

A docker container usually runs as root by default. One of the major drawbacks of this is that the output generated by the containerized application will belong also to the root user of the host system, and not to the user running the DALiuGE framework. This DockerApp avoids to run containers as the root user because of this reason. Two parameters, given at construction time, control this behavior:

  • user
    If given indicates the user used to run the container. It is assumed that if a user is indicated, the user already exists in the docker image; otherwise the container will actually fail to start. Its default value is None, meaning that the container will run as the root user.
  • ensureUserAndSwitch
    If the container is run as the root user, this option indicates whether a non-root user with the same UID of the user running this process should be: a) searched for, b) created if it doesn’t exist, and c) used to run the command inside the container. This is achieved by prepending some shell commands to the initial user-specified command, which will run as root first, but that finally perform the switch within the container process. Its default value is True if user is None; False otherwise.

Using these two options one can thus control the user that will run the command inside the container.

Communication between containers

Although some containerized applications might run on their own, there are cases where applications need to talk to each other in order to advance (like in the case of client-server applications, or in the case of MPI applications). All containers started in the same host (and therefore, all applications running in them) belong by default to the same network, and therefore are already visible.

Applications needing to communicate with other applications should be able to specify the target’s IP in their command-line. Since the IP is not known until containers are created, this specification is done using the %containerIp[oid]% placeholder, with ‘oid’ being the OID of the target DockerApp.

This need to know other DockerApp’s IP imposes a sequential order on the startup of the containers, since one needs to be started in order to learn its IP, which is used to start the second. This is handled gracefully by the DockerApp code, with the condition that self.handleInterest is invoked where necessary. See self.handleInterest for more information about this mechanism.

TODO

Processes in containers might not always exit by themselves, and the containers might need to be manually stopped. This the case for example of an set of MPI processes, where the master container will run the MPI program and the slave containers will run an SSH daemon, where the SSH daemon will not quit automatically once the master process has ended.

Still, we probably will need to differentiate between a forced quit because of a timeout, and a good quit, and therefore we might impose that processes running in a container must quit themselves after successfully performing their task.

handleInterest(drop)

Main mechanism through which a DROP handles its interest in a second DROP it isn’t directly related to.

A call to this method should be expected for each DROP this DROP is interested in. The default implementation does nothing, but implementations are free to perform any action, such as subscribing to events or storing information.

At this layer only the handling of such an interest exists. The expression of such interest, and the invocation of this method wherever necessary, is currently left as a responsibility of the entity creating the DROPs. In the case of a Session in a DROPManager for example this step would be performed using deployment-time information contained in the dropspec dictionaries held in the session.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

class dlg.apps.dockerapp.DockerPath(path)
path

Alias for field number 0

dlg.apps.socket_listener

Module containing the SocketListenerApp, a simple application that listens for incoming data in a TCP socket.

class dlg.apps.socket_listener.SocketListenerApp(**kwargs)

A BarrierAppDROP that listens on a socket for data. The server-side socket expects only one client, and assumes that the client will close the connection after all its data has been sent.

This application expects no input DROPs, and therefore raises an exception whenever one is added. On the output side, one or more outputs can be specified with the restriction that they are not ContainerDROPs so data can be written into them through the framework.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

dlg.apps.spead_receiver

Module containing an (python) application that receives spead2 data

class dlg.apps.spead_receiver.SpeadReceiverApp(**kwargs)

A BarrierAppDROP that listens for data using the SPEAD protocol.

This application opens a stream and adds a UDP reader on a specific host and port to listen for the data of item itemId. The stream is listened until it is closed. Each heap sent through the stream is checked for the item, and once found its data is written into each output of this application.

Just like the SocketListenerApp, this application expects no input DROPs, and therefore raises an exception whenever one is added. On the output side, one or more outputs can be specified with the restriction that they are not ContainerDROPs so data can be written into them through the framework.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

dlg.apps.archiving

class dlg.apps.archiving.ExternalStoreApp(**kwargs)

An application that takes its input DROP (which must be one, and only one) and creates a copy of it in a completely external store, from the point of view of the DALiuGE framework.

Because this application copies the data to an external location, it also shouldn’t contain any output, making it a leaf node of the physical graph where it resides.

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

store(inputDrop)

Method implemented by subclasses. It should stores the contents of inputDrop into an external store.

class dlg.apps.archiving.NgasArchivingApp(**kwargs)

An ExternalStoreApp class that takes its input DROP and archives it in an NGAS server. It currently deals with non-container DROPs only.

The archiving to NGAS occurs through the framework and not by spawning a new NGAS client process. This way we can read the different storage types supported by the framework, and not only filesystem objects.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).

store(inDrop)

Method implemented by subclasses. It should stores the contents of inputDrop into an external store.

dlg.apps.crc

Module containing an example application that calculates a CRC value

class dlg.apps.crc.CRCApp(**kwargs)

An BarrierAppDROP that calculates the CRC of the single DROP it consumes. It assumes the DROP being consumed is not a container. This is a simple example of an BarrierAppDROP being implemented, and not something really intended to be used in a production system

run()

Run this application. It can be safely assumed that at this point all the required inputs are COMPLETED.

class dlg.apps.crc.CRCStreamApp(**kwargs)

Calculate CRC in the streaming mode i.e. A “streamingConsumer” of its predecessor in the graph

dataWritten(uid, data)

Callback invoked when data has been written into the DROP with UID uid (which is one of the streaming inputs of this AppDROP). By default no action is performed

dropCompleted(uid, status)

Callback invoked when the DROP with UID uid (which is either a normal or a streaming input of this AppDROP) has moved to the COMPLETED or ERROR state. By default no action is performed.

initialize(**kwargs)

Performs any specific subclass initialization.

kwargs contains all the keyword arguments given at construction time, except those used by the constructor itself. Implementations of this method should make sure that arguments in the kwargs dictionary are removed once they are interpreted so they are not interpreted by accident by another method implementations that might reside in the call hierarchy (in the case that a subclass implementation calls the parent method implementation, which is usually the case).