Workflow Documentation¶
This documentation provides information on the Worfklow
class and its API reference.
If you are new to developing Xi-CAM plugins, it is recommended that you follow the quick-start documentation first.
For more general development resources, see the Resources page.
Note that the examples in this documentation can be run in a python interpreter outside of Xi-CAM
(for demonstration purposes).
Auxiliary support code to be able to do this is marked with a comment # Only need if not running xicam
.
When developing within Xi-CAM, you will not need the lines of code marked with that comment.
What Is a Workflow?¶
In Xi-CAM, a Workflow
is represents a sequence of one or more OperationPlugins
to execute.
Basically, it allows you to process data through some pipeline of operations.
Multiple operations can be linked together in a Workflow
,
provided that the connection between any two operations is compatible (based on inputs and outputs).
Execution can be performed asynchronously or synchronously.
Where Is Workflow?¶
xicam.core.execution.Workflow
What Does a Workflow Look Like?¶
As mentioned previously, a Workflow can be thought of as a graph-like structure. We can add operations (nodes) and connect them with links (edges).
Example¶
from xicam.core import execution # Only need if not running xicam
from xicam.core.execution import localexecutor # Only need if not running xicam
from xicam.core.execution import Workflow
from xicam.plugins.operationplugin import operation, output_names
execution.executor = localexecutor.LocalExecutor() # Only need if not running xicam
# Define our operations
@operation
@output_names("sum")
def my_add(x, y):
return x + y
@operation
@output_names("square_root")
def my_sqrt(n):
from math import sqrt
return sqrt(n)
# Instanciate operations
add_op = my_add()
sqrt_op = my_sqrt()
# Create a Workflow and add our operation instances to it
workflow = Workflow()
workflow.add_operations(add_op, sqrt_op)
# Link the "sum" output of add_op to the "n" input of sqrt_op
workflow.add_link(add_op, sqrt_op, "sum", "n")
# Execute the workflow, sending 1 and 3 as initial inputs to add_op (the first operation)
# This should give us sqrt(1 + 3) -> 2.0.
result = workflow.execute_synchronous(x=1, y=3)
print(result) # Should be ({"square_root": 2.0},)
In this example, we use an addition operation and a square root operation in our Workflow. We want to add two numbers, then take the square root of the sum.
First, we instanciate our two operation types.
This gives us an add_op
operation object and a sqrt_op
operation object.
Next, we add our operations to the workflow.
We then want to link the operations together so we first add two numbers,
then take the square root of the result.
We do this by connecting add_op
’s “sum” output to sqrt_op
’s “n” input.
Now that we have added our operations and connected them as we like,
we can run our workflow.
In this case, we will use execute_synchronous
(there are other methods for execution which will be explained later).
However, if we just were to try workflow.execute_synchronous()
,
the workflow wouldn’t know what the “x” and “y” inputs are supposed to be for the first operation, add_op
.
We can either:
pass in data into the first operation(s)’ inputs when we call an execute method on the workflow
have a GUI widget that exposes the operations through the GUI (such as
WorkflowEditor
), which can provide values directly to the operations’ inputs
In this example,
we used option 1
(for an example of option 2,
see the ExamplePlugin
’s use of WorkflowEditor
in the quick-start documentation).
To do this, we passed x=1
and y=3
to our execute_synchronous
call,
which provided values for the invert
operation’s x
and y
input arguments.
Useful Methods for Modifying the Workflow¶
Here is a condensed version of the various ways to modify a Workflow’s operation and links. For more information, see the API Reference.
Adding, Inspecting, and Removing Operations¶
Adding operations:
add_operation
– add an operation to theWorkflow
add_operations
– add multiple operations to theWorkflow
insert_operation
– insert an operation at a specific index in theWorkflow
Inspecting operations:
operations
– get the operations currently in theWorkflow
Removing operations:
remove_operation
– remove an operation from theWorkflow
clear_operations
– remove all operations from theWorkflow
Adding, Inspecting, and Removing Links¶
Adding links:
add_link
– add a link between one operation’s output and another’s inputauto_connect_all
– try to automatically connect all the operations based on input/output names
Inspecting links:
links
– get all links in theWorkflow
operation_links
– get all links connected to a specific operation in theWorkflow
get_inbound_links
– get all incoming links to a specific operation in theWorkflow
get_outbound_links
– get all outgoing links from a specific operation in theWorkflow
Removing links:
remove_link
– remove a link from theWorkflow
clear_operation_links
– remove all links for a specified operation in theWorkflow
clear_links
– remove all links in theWorkflow
Enabling and Disabling an Operation¶
It is possible to enable or disable operations.
By default, all operations added to a Workflow
are enabled.
For more information, see the API Reference.
Executing a Workflow¶
When you execute a Workflow, the operations are executed based on how they are linked together.
There are a few ways to run a Workflow: execute
, execute_synchronous
, and execute_all
.
Synchronous Execution¶
As we saw in our example earlier, we can use execute_synchronous
to run a Workflow as a normal snippet of Python code.
When this method is run, the we wait until we get a result back before the interpreter can continue running code.
Asynchronous Execution (Recommended)¶
The execute
and execute_all
methods are asynchronous, so they run in a separate thread.
This is highly beneficial in a GUI environment like Xi-CAM,
since we don’t want to block Xi-CAM’s UI from responding,
and we could potentially offload execution onto a remote device.
These methods take in several parameters; for now, we will focus on three of these parameters:
callback_slot
– Function to execute when the results of the Workflow are ready. The callback_slot gives you access to these results as a positional argument. This is invoked for each result. For example, let’s say you have a crop operation that takes in an image (array) as an input parameter. You could pass in a list of images to crop toWorkflow.execute_all()
, and the callback_slot will be invoked for each of the images in the passed list. Basically, you will get a cropped image for each image sent into the workflow.finished_slot
– Function to execute when the internal thread in the Workflow has finished its execution (all of the operations are done). This occurs once during a Workflow’s execution.kwargs
– Any additional keyword arguments to pass into the method; these usually correspond with the entry operations’ inputs (as we saw in our example earlier).
The primary difference between Workflow.execute
and Workflow.execute_all
is
that execute_all
will run multiple times for the kwargs
passed in.
This means the kwargs
must have an iterable value.
Let’s look at some examples.
Example for execute¶
Let’s revisit our addition and square root workflow from earlier but make it asynchronous:
from qtpy.QtWidgets import QApplication # Only need if not running xicam
from xicam.core import execution # Only need if not running xicam
from xicam.core.execution import localexecutor # Only need if not running xicam
from xicam.core.execution import Workflow
from xicam.plugins.operationplugin import operation, output_names
qapp = QApplication([]) # Only need if not running xicam
execution.executor = localexecutor.LocalExecutor() # Only need if not running xicam
# Define our operations
@operation
@output_names("sum")
def my_add(x, y):
return x + y
@operation
@output_names("square_root")
def my_sqrt(n):
from math import sqrt
return sqrt(n)
# Define callback slot (when a result is ready)
def print_result(*results):
print(results)
# Define finished slot (when the workflow is entirely finished)
def finished():
print("Workflow finished.")
# Instanciate operations
add_op = my_add()
sqrt_op = my_sqrt()
# Create a Workflow and add our operation instances to it
workflow = Workflow()
workflow.add_operations(add_op, sqrt_op)
# Link the "sum" output of add_op to the "n" input of sqrt_op
workflow.add_link(add_op, sqrt_op, "sum", "n")
# Execute the workflow, sending 1 and 3 as initial inputs to add_op (the first operation)
# This should give us sqrt(1 + 3) -> 2.0.
workflow.execute(callback_slot=print_result,
finished_slot=finished,
x=1,
y=3)
This will print out:
({'square_root': 2.0},)
Workflow finished.
Notice that we’ve added two new functions for our callback slot and our finished slot.
print_result
will be called when the workflow has finished its execution and the result is ready.
finished
will be called when the workflow has finished execution for all of its input data.
In this case, we have only one set of input data, x=1
and y=3
.
(Also note that we have an additional import and that we are creating a QApplication; this is not needed when working within Xi-CAM).
Example for execute_all¶
Now, let’s say we want to do this addition and square root workflow for multiple sets of x and y inputs.
We can use execute_all
to do this:
from qtpy.QtWidgets import QApplication # Only need if not running xicam
from xicam.core import execution # Only need if not running xicam
from xicam.core.execution import localexecutor # Only need if not running xicam
from xicam.core.execution import Workflow
from xicam.plugins.operationplugin import operation, output_names
qapp = QApplication([]) # Only need if not running xicam
execution.executor = localexecutor.LocalExecutor() # Only need if not running xicam
# Define our operations
@operation
@output_names("sum")
def my_add(x, y):
return x + y
@operation
@output_names("square_root")
def my_sqrt(n):
from math import sqrt
return sqrt(n)
# Define callback slot (when a result is ready)
def print_result(*results):
print(results)
# Define finished slot (when the workflow is entirely finished)
def finished():
print("Workflow finished.")
# Instanciate operations
add_op = my_add()
sqrt_op = my_sqrt()
# Create a Workflow and add our operation instances to it
workflow = Workflow()
workflow.add_operations(add_op, sqrt_op)
# Link the "sum" output of add_op to the "n" input of sqrt_op
workflow.add_link(add_op, sqrt_op, "sum", "n")
# Execute the workflow, sending the inputs x=1,y=3; x=10,y=15; x=50,y=50.
# This should give us 2.0, 5.0, and 10.0.
workflow.execute_all(callback_slot=print_result,
finished_slot=finished,
x=[1, 10, 50],
y=[3, 15, 50])
This will print out:
({'square_root': 2.0},)
({'square_root': 5.0},)
({'square_root': 10.0},)
Workflow finished.
Notice that we've just changed `execute` to `execute_all`, and we've modified the `x` and `y` values to be lists.
Now, we will have three executions: `x=1 y=3`, `x=10 y=15`, and `x=50 y=50`.
Each time one of these executions finishes, our callback slot `print_result` is called.
When the workflow is finished executing everything, then our finished slot `finished` is called.
API Reference¶
- class xicam.core.execution.Workflow(name='', operations=None)¶
Bases:
Graph
- add_link(source, dest, source_param, dest_param)¶
Add a link between two operations in the workflow.
Links are defined from an operation’s parameter to another operation’s parameter. This creates a connection between two operations during execution of a workflow.
- Parameters
source (OperationPlugin) – The operation to link from.
dest (OperationPlugin) – The operation to link to.
source_param (str) – Name of the parameter in the source operation to link (source of the data; output).
dest_param (str) – Name of the parameter in the destination operation to link (where the data goes; input).
- add_operation(operation: OperationPlugin)¶
Add a single operation into the workflow.
- add_operations(*operations: OperationPlugin)¶
Add operations into the workflow.
This will add the list of operations to the end of the workflow.
- as_dask_graph()¶
process from end tasks and into all dependent ones
Returns a tuple that represents the graph as a dask-compatible graph for processing. The second element of the tuple identifies the end node ids (i.e. nodes that do not have connected outputs).
- Returns
A tuple with two-elements, the first being the dask graph, the second being the end task ids.
- Return type
tuple
- attach(observer: Callable)¶
Add an observer to the Workflow.
An observer is a callable that is called when the Workflow.notify method is called. In other words, the observer will be called whenever the Workflow state changes; for example, links are modified, operations are removed, etc. When notified, the observer is called.
- Parameters
observer (Callable) – A callable to add from the Workflow.
- auto_connect_all()¶
Attempts to automatically connect operations together by matching output names and input names.
Makes a best-effort to link operations based on the names of their outputs and inputs. If operation A has an output named “image”, and operation B has an input named “image”, then A “image” will link to B “image”. Outputs and inputs that have matching types in addition to matching names will be favored more for the auto-connection.
If there are no outputs with matching inputs (by name), no links will be added.
- clear_links()¶
Remove all links from the workflow, but preserve the operations.
- clear_operation_links(operation, clear_outbound=True, clear_inbound=True)¶
Remove all links for an operation.
- clear_operations()¶
Remove all operations and links from the workflow.
- detach(observer: Callable)¶
Remove an observer from the Workflow.
An observer is a callable that is called when the Workflow.notify method is called. In other words, the observer will be called whenever the Workflow state changes; for example, links are modified, operations are removed, etc. When notified, the observer is called.
- Parameters
observer (Callable) – The callable to remove from the Workflow.
- disabled(operation)¶
Indicate if the operation is disabled in the workflow.
- Parameters
operation (OperationPlugin) – Operation to check if it is disabled or not.
- Returns
Returns True if the operation is disabled in the Workflow; otherwise False.
- Return type
bool
- disabled_operations()¶
Returns the disabled operations (if any) in the workflow.
- enabled(operation)¶
Indicate if the operation is enabled in the workflow.
- Parameters
operation (OperationPlugin) – Operation to check if it is enabled or not.
- Returns
Returns True if the operation is enabled in the Workflow; otherwise False.
- Return type
bool
- execute(executor=None, connection=None, callback_slot=None, finished_slot=None, except_slot=None, default_exhandle=True, lock=None, fill_kwargs=True, threadkey=None, **kwargs)¶
Execute this workflow on the specified host. Connection will be a Connection object (WIP) keeping a connection to a compute resource, include connection.hostname, connection.username…
- Returns
A concurrent.futures-like qthread to monitor status. The future’s callback_slot receives the result.
- Return type
QThreadFuture
- execute_all(connection=None, executor=None, callback_slot=None, finished_slot=None, yield_slot=None, except_slot=None, default_exhandle=True, lock=None, fill_kwargs=True, threadkey=None, **kwargs)¶
Execute this workflow on the specified host. Connection will be a Connection object (WIP) keeping a connection to a compute resource, include connection.hostname, connection.username…
Each kwargs is expected to be an iterable of the same length; these values will be iterated over, zipped, and executed through the workflow.
- Returns
A concurrent.futures-like qthread to monitor status. The future’s callback_slot receives the result.
- Return type
QThreadFuture
- fill_kwargs(**kwargs)¶
Fills in all empty inputs with names matching keys in kwargs.
- get_inbound_links(operation)¶
Returns the links connected to the operation given (linked inputs of the operation).
The returned dict represents all operations that are connected to operation. Links are represented as a list of 2-element tuples, where the first element of the tuple is another operation’s output parameter, and the second element of the tuple is operation’s input parameter.
Using keys() will give all of the operations that connect to operation. Using values() will give all of the links from each operation to operation.
- Parameters
operation (OperationPlugin) – Operation to get incoming links for (some operation -> operation).
- Returns
Returns a dictionary defining all of the links from any connected operations to operation.
- Return type
defaultdict
- get_outbound_links(operation)¶
Returns the links connected from the operation given (linked outputs of the operation).
The returned dict represents all the operations that operation connects to. Links are represented as a list of 2-element tuples, where the first element of the tuple is operation’s output parameter, and the second element of the tuple is another operation’s input parameter.
Using .keys() on the returned dict will give all of the operations that operation connects to. Using .values() on the returned dict will give all of the links from operation to each operation.
- Parameters
operation (OperationPlugin) – Operation to get outgoing links for (operation -> some operation).
- Returns
Returns a dictionary defining all of the links from operation to any connected operations.
- Return type
defaultdict
- insert_operation(index: int, operation: OperationPlugin)¶
Insert an operation at a specific index in the workflow.
- Parameters
index (int) – Index where to insert the operation. 0 will add at the beginning; -1 will add to the end.
operation (OperationPlugin) – Operation to insert.
- links()¶
Returns all the links defined in the workflow.
Returns a list of tuples, each tuple representing a link as follows: source operation, destination operation, source parameter, destination parameter.
Note that the links are shown as outbound links.
- Returns
Returns a list of the links (defined as outbound links) in the workflow.
- Return type
list
- notify()¶
Notify the observers; the observers will be called.
- operation_links(operation: OperationPlugin)¶
Returns the outbound links for an operation.
Returns a list of tuples, each tuple representing a link as follows: operation, destination operation, operation source parameter, destination parameter.
- Returns
Returns a list of the links (defined as outbound links) for operation.
- Return type
list
- property operations¶
Returns the operations of this workflow.
- remove_link(source, dest, source_param, dest_param)¶
Remove a link between two operations.
- Parameters
source (OperationPlugin) – The source operation to remove a link from.
dest (OperationPlugin) – The destination operation to remove a link from.
source_param (str) – Name of the source parameter that is defining the link to remove.
dest_param (str) – Name of the destination parameter that is defining the link to remove.
- remove_operation(operation, remove_orphan_links=True)¶
Remove an operation from the workflow.
- Parameters
operation (OperationPlugin) – Operation to remove from the workflow.
remove_orphan_links (bool) – If True, removes all links that link to the operation to be removed. If False, does not remove any links for the operation and returns the removed operations links dict (default is True).
- Returns
By default (remove_orphan_links is True), returns None. Otherwise, returns the links for the removed operation.
- Return type
defaultdict
- set_disabled(operation: OperationPlugin, value: bool = True, remove_orphan_links: bool = True, auto_connect_all: bool = True)¶
Set an operation’s disabled state in the workflow.
By default when disabling an operation, links connected to the operation will be removed (remove_orphan_links would be True). If value is False (re-enabling an operation), then no links are changed.
- Parameters
operation (OperationPlugin) – The operation whose disabled state is being modified.
value (bool) – Indicates the disabled state (default is True, which disables the operation).
remove_orphan_links (bool) – If True and value is True, removes the links connected to the operation. Otherwise, no links are changed (default is True).
auto_connect_all (bool) – If True, then a best-effort attempt will be made to try to reconnect the operations in the workflow (default is True). See the Graph.auto_connect_all method for more information.
- Returns
Returns a list of any orphaned links for an operation that is set to disabled. Default behavior will return an empty list (when remove_orphan_links is True). If enabling an operation (value is False), then an empty list is returned, as no links are changed.
- Return type
list
- stage(connection)¶
Stages required data resources to the compute resource. Connection will be a Connection object (WIP) keeping a connection to a compute resource, include connection.hostname, connection.username…
- Returns
A concurrent.futures-like qthread to monitor status. Returns True if successful
- Return type
QThreadFuture
- toggle_disabled(operation: OperationPlugin, remove_orphan_links=True, auto_connect_all=True)¶
Toggle the disable state of an operation.
By default, when an operation is toggled to a disabled state, any links connected to the operation will be removed.
- Parameters
operation (OperationPlugin) – The operation to toggle disable state for.
remove_orphan_links (bool) – If True, when the operation’s toggle state is toggled to disabled, any links connected to the operation will be removed (default is True).
- Returns
Returns a list of any orphaned links for an operation. Default behavior will return an empty list. A non-empty list can be returned when remove_orphan_links is False and the connected operation is toggled to disabled.
- Return type
list
- validate()¶
Validate all of: - All required inputs are satisfied. - Connection is active. - ?
- Returns
True if workflow is valid.
- Return type
bool