Skip to content

Workflow Syntax

Simple Workflow

Let's do the following:

  1. Add two numbers (e.g. 1 + 2)
  2. Multiply the output of Step 1 by a third number (e.g. 3 * 3)

In practice, we would want each of the two tasks to be their own compute job.

graph LR
  A[Input] --> B(add) --> C(mult) --> D[Output];

Important

If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE configuration variable and start the Covalent server:

quacc set WORKFLOW_ENGINE covalent
covalent start
import covalent as ct
from quacc import flow, job


@job  #  (1)!
def add(a, b):
    return a + b


@job
def mult(a, b):
    return a * b


@flow  #  (2)!
def workflow(a, b, c):
    output1 = add(a, b)
    output2 = mult(output1, c)
    return output2


dispatch_id = ct.dispatch(workflow)(1, 2, 3)  #  (3)!
result = ct.get_result(dispatch_id, wait=True)  #  (4)!
print(result)  # 9
  1. The @job decorator will be transformed into a Covalent @ct.electron.

  2. The @flow decorator will be transformed into a Covalent @ct.lattice.

  3. This command will dispatch the workflow to the Covalent server, and a unique dispatch ID will be returned in place of the result.

  4. This command will fetch the result from the Covalent server. The wait=True argument will block until the workflow is complete.

Important

If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE configuration variable and load the default Dask client:

quacc set WORKFLOW_ENGINE dask
python
from dask.distributed import Client

client = Client()  #  (1)!
  1. It is necessary to instantiate a Dask client before running Dask workflows. This command loads the default (local) client and only needs to be done once.
from quacc import job


@job  #  (1)!
def add(a, b):
    return a + b


@job
def mult(a, b):
    return a * b


def workflow(a, b, c):  #  (2)!
    output1 = add(a, b)
    output2 = mult(output1, c)
    return output2


delayed = workflow(1, 2, 3)  #  (3)!
result = client.compute(delayed).result()  #  (4)!
print(result)  # 9
  1. The @job decorator will be transformed into a Dask @delayed.

  2. The @flow decorator doesn't actually do anything when using Dask, so we chose to not include it here for brevity.

  3. This returns a Delayed object. A reference is returned.

  4. There are multiple ways to resolve a Delayed object. Here, client.compute(delayed) will return a Future object, which can be resolved with .result(). The .result() call will block until the workflow is complete and return the result. As an alternative, you could also use delayed.compute() to dispatch and resolve the Delayed object in one command. Similarly, you could use dask.compute(delayed)[0], where the [0] indexing is needed because dask.compute() always returns a tuple.

Important

If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE configuration variable and load the default Parsl configuration:

quacc set WORKFLOW_ENGINE parsl
python
import parsl

parsl.load()  #  (1)!
  1. It is necessary to instantiate a Parsl configuration before running Parsl workflows. This command loads the default (local) configuration and only needs to be done once.
from quacc import job


@job  #  (1)!
def add(a, b):
    return a + b


@job
def mult(a, b):
    return a * b


def workflow(a, b, c):  #  (2)!
    output1 = add(a, b)
    output2 = mult(output1, c)
    return output2


future = workflow(1, 2, 3)  #  (3)!
result = future.result()  #  (4)!
print(result)  # 9
  1. The @job decorator will be transformed into a Parsl @python_app.

  2. The @flow decorator doesn't actually do anything when using Parsl, so we chose to not include it here for brevity.

  3. This will create a PythonApp object that represents the workflow. At this point, the workflow has been dispatched, but only a reference is returned.

  4. Calling .result() will block until the workflow is complete and return the result.

Important

If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE configuration variable.

quacc set WORKFLOW_ENGINE prefect
from quacc import flow, job


@job  #  (1)!
def add(a, b):
    return a + b


@job
def mult(a, b):
    return a * b


@flow
def workflow(a, b, c):  #  (2)!
    output1 = add(a, b)
    output2 = mult(output1, c)
    return output2


result = workflow(1, 2, 3)  #  (3)!
print(result)  # 9
  1. The @job decorator will be transformed into a Prefect @task. It will also be launched via .submit() if SETTINGS.PREFECT_AUTO_SUBMIT is True.

  2. The @flow decorator will be transformed into a Prefect @flow.

  3. This will create and run the Flow. At this point, the workflow has been dispatched and the final results are returned.

Important

If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE configuration variable:

quacc set WORKFLOW_ENGINE redun
python
from redun import Scheduler

scheduler = Scheduler()  #  (1)!
  1. It is necessary to instantiate the scheduler before submitting calculations.
from quacc import flow, job


@job  #  (1)!
def add(a, b):
    return a + b


@job
def mult(a, b):
    return a * b


@flow  #  (2)!
def workflow(a, b, c):
    output1 = add(a, b)
    output2 = mult(output1, c)
    return output2


result = scheduler.run(workflow(1, 2, 3))  # (3)!
print(result)  # 9
  1. The @job decorator will be transformed into a Redun @task.

  2. The @flow decorator will also be transformed into a Redun @task. Everything in Redun is a @task, so it doesn't matter what quacc decorator you apply. We chose @flow simply for clarity.

  3. This command will submit the workflow to the Redun scheduler.

Important

If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE configuration variable:

quacc set WORKFLOW_ENGINE jobflow
import jobflow as jf
from quacc import job


@job  #  (1)!
def add(a, b):
    return a + b


@job
def mult(a, b):
    return a * b


job1 = add(1, 2)
job2 = mult(job1.output, 3)
flow = jf.Flow([job1, job2])  #  (2)!

responses = jf.run_locally(flow)  #  (3)!
result = responses[job2.uuid][1].output  #  (4)!
print(result)  # 9
  1. The @job decorator will be transformed into @jf.job.

  2. A @jf.Flow object is created to represent the workflow.

  3. The workflow is run locally and the result is returned in a dictionary.

  4. The result is extracted from the dictionary by using the UUID of the second job in the workflow.

Stripping the Decorator from a Job

If you ever want to strip the decorator from a pre-decorated @job (e.g. to test out a calculation locally without changing your quacc settings), you can do so with quacc.wflow_tools.customizers.strip_decorator as follows:

from quacc import job, strip_decorator


@job
def add(a, b):
    return a + b


original_add = strip_decorator(add)
original_add(1, 2)  # 3

Learn More

If you want to learn more about Covalent, you can read the Covalent Documentation. Please refer to the Covalent Slack channel or Discussion Board for any Covalent-specific questions.

If you want to learn more about Dask, you can read the Dask Delayed documentation to read more about the decorators and the Dask Distributed documentation to read more about the distributed Dask cluster. Please refer to the Dask Discourse page for Dask-specific questions.

If you want to learn more about Parsl, you can read the Parsl Documentation. Please refer to the Parsl Slack Channel for any Parsl-specific questions.

If you want to learn more about Prefect, you can read the Prefect Documentation. Please refer to the Prefect community resources for any Prefect-specific questions.

If you want to learn more about Redun, you can read the Redun documentation.

If you want to learn more about Jobflow, you can read the Jobflow Documentation. Please refer to the Jobflow Discussions Board for Jobflow-specific questions.