Workflow Documentation

This documentation provides information on the Worfklow class and its API reference.

If you are new to developing Xi-CAM plugins, it is recommended that you follow the quick-start documentation first.

For more general development resources, see the Resources page.

Note that the examples in this documentation can be run in a python interpreter outside of Xi-CAM (for demonstration purposes). Auxiliary support code to be able to do this is marked with a comment # Only need if not running xicam. When developing within Xi-CAM, you will not need the lines of code marked with that comment.

What Is a Workflow?

In Xi-CAM, a Workflow is represents a sequence of one or more OperationPlugins to execute. Basically, it allows you to process data through some pipeline of operations. Multiple operations can be linked together in a Workflow, provided that the connection between any two operations is compatible (based on inputs and outputs). Execution can be performed asynchronously or synchronously.

Where Is Workflow?

xicam.core.execution.Workflow

What Does a Workflow Look Like?

As mentioned previously, a Workflow can be thought of as a graph-like structure. We can add operations (nodes) and connect them with links (edges).

Example

from xicam.core import execution  # Only need if not running xicam
from xicam.core.execution import localexecutor  # Only need if not running xicam
from xicam.core.execution import Workflow
from xicam.plugins.operationplugin import operation, output_names

execution.executor = localexecutor.LocalExecutor()  # Only need if not running xicam

# Define our operations
@operation
@output_names("sum")
def my_add(x, y):
    return x + y

@operation
@output_names("square_root")
def my_sqrt(n):
    from math import sqrt
    return sqrt(n)

# Instanciate operations
add_op = my_add()
sqrt_op = my_sqrt()

# Create a Workflow and add our operation instances to it
workflow = Workflow()
workflow.add_operations(add_op, sqrt_op)

# Link the "sum" output of add_op to the "n" input of sqrt_op
workflow.add_link(add_op, sqrt_op, "sum", "n")

# Execute the workflow, sending 1 and 3 as initial inputs to add_op (the first operation)
# This should give us sqrt(1 + 3) -> 2.0.
result = workflow.execute_synchronous(x=1, y=3)
print(result)  # Should be ({"square_root": 2.0},)

In this example, we use an addition operation and a square root operation in our Workflow. We want to add two numbers, then take the square root of the sum.

First, we instanciate our two operation types. This gives us an add_op operation object and a sqrt_op operation object.

Next, we add our operations to the workflow.

We then want to link the operations together so we first add two numbers, then take the square root of the result. We do this by connecting add_op’s “sum” output to sqrt_op’s “n” input.

Now that we have added our operations and connected them as we like, we can run our workflow. In this case, we will use execute_synchronous (there are other methods for execution which will be explained later).

However, if we just were to try workflow.execute_synchronous(), the workflow wouldn’t know what the “x” and “y” inputs are supposed to be for the first operation, add_op.

We can either:

  1. pass in data into the first operation(s)’ inputs when we call an execute method on the workflow

  2. have a GUI widget that exposes the operations through the GUI (such as WorkflowEditor), which can provide values directly to the operations’ inputs

In this example, we used option 1 (for an example of option 2, see the ExamplePlugin’s use of WorkflowEditor in the quick-start documentation). To do this, we passed x=1 and y=3 to our execute_synchronous call, which provided values for the invert operation’s x and y input arguments.

Useful Methods for Modifying the Workflow

Here is a condensed version of the various ways to modify a Workflow’s operation and links. For more information, see the API Reference.

Adding, Inspecting, and Removing Operations

Adding operations:

  • add_operation – add an operation to the Workflow

  • add_operations – add multiple operations to the Workflow

  • insert_operation – insert an operation at a specific index in the Workflow

Inspecting operations:

  • operations – get the operations currently in the Workflow

Removing operations:

  • remove_operation – remove an operation from the Workflow

  • clear_operations – remove all operations from the Workflow

Enabling and Disabling an Operation

It is possible to enable or disable operations. By default, all operations added to a Workflow are enabled. For more information, see the API Reference.

Executing a Workflow

When you execute a Workflow, the operations are executed based on how they are linked together.

There are a few ways to run a Workflow: execute, execute_synchronous, and execute_all.

Synchronous Execution

As we saw in our example earlier, we can use execute_synchronous to run a Workflow as a normal snippet of Python code. When this method is run, the we wait until we get a result back before the interpreter can continue running code.

API Reference

class xicam.core.execution.Workflow(name='', operations=None)

Bases: Graph

Add a link between two operations in the workflow.

Links are defined from an operation’s parameter to another operation’s parameter. This creates a connection between two operations during execution of a workflow.

Parameters
  • source (OperationPlugin) – The operation to link from.

  • dest (OperationPlugin) – The operation to link to.

  • source_param (str) – Name of the parameter in the source operation to link (source of the data; output).

  • dest_param (str) – Name of the parameter in the destination operation to link (where the data goes; input).

add_operation(operation: OperationPlugin)

Add a single operation into the workflow.

add_operations(*operations: OperationPlugin)

Add operations into the workflow.

This will add the list of operations to the end of the workflow.

as_dask_graph()

process from end tasks and into all dependent ones

Returns a tuple that represents the graph as a dask-compatible graph for processing. The second element of the tuple identifies the end node ids (i.e. nodes that do not have connected outputs).

Returns

A tuple with two-elements, the first being the dask graph, the second being the end task ids.

Return type

tuple

attach(observer: Callable)

Add an observer to the Workflow.

An observer is a callable that is called when the Workflow.notify method is called. In other words, the observer will be called whenever the Workflow state changes; for example, links are modified, operations are removed, etc. When notified, the observer is called.

Parameters

observer (Callable) – A callable to add from the Workflow.

auto_connect_all()

Attempts to automatically connect operations together by matching output names and input names.

Makes a best-effort to link operations based on the names of their outputs and inputs. If operation A has an output named “image”, and operation B has an input named “image”, then A “image” will link to B “image”. Outputs and inputs that have matching types in addition to matching names will be favored more for the auto-connection.

If there are no outputs with matching inputs (by name), no links will be added.

Remove all links from the workflow, but preserve the operations.

Remove all links for an operation.

clear_operations()

Remove all operations and links from the workflow.

detach(observer: Callable)

Remove an observer from the Workflow.

An observer is a callable that is called when the Workflow.notify method is called. In other words, the observer will be called whenever the Workflow state changes; for example, links are modified, operations are removed, etc. When notified, the observer is called.

Parameters

observer (Callable) – The callable to remove from the Workflow.

disabled(operation)

Indicate if the operation is disabled in the workflow.

Parameters

operation (OperationPlugin) – Operation to check if it is disabled or not.

Returns

Returns True if the operation is disabled in the Workflow; otherwise False.

Return type

bool

disabled_operations()

Returns the disabled operations (if any) in the workflow.

enabled(operation)

Indicate if the operation is enabled in the workflow.

Parameters

operation (OperationPlugin) – Operation to check if it is enabled or not.

Returns

Returns True if the operation is enabled in the Workflow; otherwise False.

Return type

bool

execute(executor=None, connection=None, callback_slot=None, finished_slot=None, except_slot=None, default_exhandle=True, lock=None, fill_kwargs=True, threadkey=None, **kwargs)

Execute this workflow on the specified host. Connection will be a Connection object (WIP) keeping a connection to a compute resource, include connection.hostname, connection.username…

Returns

A concurrent.futures-like qthread to monitor status. The future’s callback_slot receives the result.

Return type

QThreadFuture

execute_all(connection=None, executor=None, callback_slot=None, finished_slot=None, yield_slot=None, except_slot=None, default_exhandle=True, lock=None, fill_kwargs=True, threadkey=None, **kwargs)

Execute this workflow on the specified host. Connection will be a Connection object (WIP) keeping a connection to a compute resource, include connection.hostname, connection.username…

Each kwargs is expected to be an iterable of the same length; these values will be iterated over, zipped, and executed through the workflow.

Returns

A concurrent.futures-like qthread to monitor status. The future’s callback_slot receives the result.

Return type

QThreadFuture

fill_kwargs(**kwargs)

Fills in all empty inputs with names matching keys in kwargs.

Returns the links connected to the operation given (linked inputs of the operation).

The returned dict represents all operations that are connected to operation. Links are represented as a list of 2-element tuples, where the first element of the tuple is another operation’s output parameter, and the second element of the tuple is operation’s input parameter.

Using keys() will give all of the operations that connect to operation. Using values() will give all of the links from each operation to operation.

Parameters

operation (OperationPlugin) – Operation to get incoming links for (some operation -> operation).

Returns

Returns a dictionary defining all of the links from any connected operations to operation.

Return type

defaultdict

Returns the links connected from the operation given (linked outputs of the operation).

The returned dict represents all the operations that operation connects to. Links are represented as a list of 2-element tuples, where the first element of the tuple is operation’s output parameter, and the second element of the tuple is another operation’s input parameter.

Using .keys() on the returned dict will give all of the operations that operation connects to. Using .values() on the returned dict will give all of the links from operation to each operation.

Parameters

operation (OperationPlugin) – Operation to get outgoing links for (operation -> some operation).

Returns

Returns a dictionary defining all of the links from operation to any connected operations.

Return type

defaultdict

insert_operation(index: int, operation: OperationPlugin)

Insert an operation at a specific index in the workflow.

Parameters
  • index (int) – Index where to insert the operation. 0 will add at the beginning; -1 will add to the end.

  • operation (OperationPlugin) – Operation to insert.

Returns all the links defined in the workflow.

Returns a list of tuples, each tuple representing a link as follows: source operation, destination operation, source parameter, destination parameter.

Note that the links are shown as outbound links.

Returns

Returns a list of the links (defined as outbound links) in the workflow.

Return type

list

notify()

Notify the observers; the observers will be called.

Returns the outbound links for an operation.

Returns a list of tuples, each tuple representing a link as follows: operation, destination operation, operation source parameter, destination parameter.

Returns

Returns a list of the links (defined as outbound links) for operation.

Return type

list

property operations

Returns the operations of this workflow.

Remove a link between two operations.

Parameters
  • source (OperationPlugin) – The source operation to remove a link from.

  • dest (OperationPlugin) – The destination operation to remove a link from.

  • source_param (str) – Name of the source parameter that is defining the link to remove.

  • dest_param (str) – Name of the destination parameter that is defining the link to remove.

remove_operation(operation, remove_orphan_links=True)

Remove an operation from the workflow.

Parameters
  • operation (OperationPlugin) – Operation to remove from the workflow.

  • remove_orphan_links (bool) – If True, removes all links that link to the operation to be removed. If False, does not remove any links for the operation and returns the removed operations links dict (default is True).

Returns

By default (remove_orphan_links is True), returns None. Otherwise, returns the links for the removed operation.

Return type

defaultdict

set_disabled(operation: OperationPlugin, value: bool = True, remove_orphan_links: bool = True, auto_connect_all: bool = True)

Set an operation’s disabled state in the workflow.

By default when disabling an operation, links connected to the operation will be removed (remove_orphan_links would be True). If value is False (re-enabling an operation), then no links are changed.

Parameters
  • operation (OperationPlugin) – The operation whose disabled state is being modified.

  • value (bool) – Indicates the disabled state (default is True, which disables the operation).

  • remove_orphan_links (bool) – If True and value is True, removes the links connected to the operation. Otherwise, no links are changed (default is True).

  • auto_connect_all (bool) – If True, then a best-effort attempt will be made to try to reconnect the operations in the workflow (default is True). See the Graph.auto_connect_all method for more information.

Returns

Returns a list of any orphaned links for an operation that is set to disabled. Default behavior will return an empty list (when remove_orphan_links is True). If enabling an operation (value is False), then an empty list is returned, as no links are changed.

Return type

list

stage(connection)

Stages required data resources to the compute resource. Connection will be a Connection object (WIP) keeping a connection to a compute resource, include connection.hostname, connection.username…

Returns

A concurrent.futures-like qthread to monitor status. Returns True if successful

Return type

QThreadFuture

toggle_disabled(operation: OperationPlugin, remove_orphan_links=True, auto_connect_all=True)

Toggle the disable state of an operation.

By default, when an operation is toggled to a disabled state, any links connected to the operation will be removed.

Parameters
  • operation (OperationPlugin) – The operation to toggle disable state for.

  • remove_orphan_links (bool) – If True, when the operation’s toggle state is toggled to disabled, any links connected to the operation will be removed (default is True).

Returns

Returns a list of any orphaned links for an operation. Default behavior will return an empty list. A non-empty list can be returned when remove_orphan_links is False and the connected operation is toggled to disabled.

Return type

list

validate()

Validate all of: - All required inputs are satisfied. - Connection is active. - ?

Returns

True if workflow is valid.

Return type

bool