Pre-Defined Recipes¶
Here, we will show how to use quacc with one of a variety of workflow engines to construct, dispatch, and monitor your calculations. In quacc, there are two types of recipes:
- Individual compute jobs with the suffix
_job
that have been pre-defined with a@job
decorator. - Multi-step workflows with the suffix
_flow
that have been pre-defined with a@flow
decorator.
Running a Pre-Defined Job¶
We will now try running a job where we relax a bulk Cu structure using EMT, which is pre-defined in quacc as quacc.recipes.emt.core.relax_job.
graph LR
A[Input] --> B(Relax) --> C[Output];
Important
If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE
configuration variable and start the Covalent server:
Covalent also has its own configuration variables you may wish to consider modifying.
import covalent as ct
from ase.build import bulk
from quacc import flow
from quacc.recipes.emt.core import relax_job
# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")
# Define the workflow
@flow # (1)!
def workflow(atoms):
return relax_job(atoms) # (2)!
# Dispatch the workflow to the Covalent server
# with the bulk Cu Atoms object as the input
dispatch_id = ct.dispatch(workflow)(atoms) # (3)!
# Fetch the result from the server
result = ct.get_result(dispatch_id, wait=True) # (4)!
print(result)
-
This can be written more compactly as:
-
The
relax_job
function was pre-defined in quacc with a@job
decorator, which is why we did not need to include it here. -
This will dispatch the workflow to the Covalent server.
-
The
ct.get_result
function is used to fetch the workflow status and results from the server. You don't need to setwait=True
in practice. Once you dispatch the workflow, it will begin running (if the resources are available).
Important
If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE
configuration variable and load the default Dask cluster:
from ase.build import bulk
from quacc.recipes.emt.core import relax_job
# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")
# Call the PythonApp
delayed = relax_job(atoms) # (1)!
# Print result
result = client.compute(delayed).result() # (2)!
print(result)
-
The
relax_job
function was pre-defined in quacc with a@job
decorator, which is why we did not need to include it here. We also did not need to use a@flow
decorator because Dask does not have an analogous decorator. At this point, we have aDelayed
object. -
Calling
client.compute(delayed)
dispatches the compute job to the active Dask cluster and returns aFuture
. The use of.result()
serves to block any further calculations from running and resolves theFuture
. You could also achieve the same result by doingdelayed.compute()
, which will dispatch and resolve theFuture
as one action. This is identical toresult = dask.compute(delayed)[0]
, where the[0]
indexing is needed becausedask.compute
always returns a tuple.
Important
If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE
configuration variable and load the default Parsl configuration:
from ase.build import bulk
from quacc.recipes.emt.core import relax_job
# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")
# Call the PythonApp
future = relax_job(atoms) # (1)!
# Print result
print(future.result()) # (2)!
-
The
relax_job
function was pre-defined in quacc with a@job
decorator, which is why we did not need to include it here. We also did not need to use a@flow
decorator because Parsl does not have an analogous decorator. -
The use of
.result()
serves to block any further calculations from running until it is resolved. Calling.result()
also returns the function output as opposed to theAppFuture
object.
Important
If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE
configuration variable:
from ase.build import bulk
from quacc import flow
from quacc.recipes.emt.core import relax_job
# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")
# Define the workflow
@flow
def workflow(atoms):
return relax_job(atoms) # (1)!
# Dispatch the workflow
result = workflow(atoms) # (2)!
print(result)
-
The
relax_job
function was pre-defined in quacc with a@job
decorator, which is why we did not need to include it here. -
The workflow has been dispatched to the Prefect server at this point and the result returned.
Important
If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE
configuration variable:
from ase.build import bulk
from redun import Scheduler
from quacc.recipes.emt.core import relax_job
# Instantiate the scheduler
scheduler = Scheduler()
# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")
# Dispatch the workflow
result = scheduler.run(relax_job(atoms)) # (1)!
print(result)
- The
relax_job
function was pre-defined in quacc with a@job
decorator, which is why we did not need to include it here.
Important
If you haven't done so yet, make sure you update the quacc WORKFLOW_ENGINE
configuration variable:
import jobflow as jf
from ase.build import bulk
from quacc.recipes.emt.core import relax_job
# Make an Atoms object of a bulk Cu structure
atoms = bulk("Cu")
# Define the Job
job = relax_job(atoms) # (1)!
# Run the job locally
responses = jf.run_locally(job) # (2)!
# Get the result
result = responses[job.uuid][1].output
print(result)
-
The
relax_job
function was pre-defined in quacc with a@job
decorator, which is why we did not need to include it here. -
We chose to run the job locally, but other workflow managers supported by Jobflow can be imported and used.
Running a Pre-Defined Workflow¶
We will now try running a pre-defined workflow where we carve all possible slabs from a given structure, run a new relaxation calculation on each slab, and then a static calculation for each relaxed slab. This is implemented in quacc.recipes.emt.slabs.bulk_to_slabs_flow.
graph LR
A[Input] --> B(Make Slabs)
B --> C(Slab Relax) --> G(Slab Static) --> K[Output]
B --> D(Slab Relax) --> H(Slab Static) --> K[Output]
B --> E(Slab Relax) --> I(Slab Static) --> K[Output]
B --> F(Slab Relax) --> J(Slab Static) --> K[Output];
import covalent as ct
from ase.build import bulk
from quacc.recipes.emt.slabs import bulk_to_slabs_flow
# Define the Atoms object
atoms = bulk("Cu")
# Dispatch the workflow
dispatch_id = ct.dispatch(bulk_to_slabs_flow)(atoms) # (1)!
# Print the results
result = ct.get_result(dispatch_id, wait=True)
print(result)
- We didn't need to wrap
bulk_to_slabs_flow
with a decorator because it is already pre-decorated with a@flow
decorator.
from ase.build import bulk
from redun import Scheduler
from quacc.recipes.emt.slabs import bulk_to_slabs_flow
# Instantiate the scheduler
scheduler = Scheduler()
# Define the Atoms object
atoms = bulk("Cu")
# Define the workflow
result = scheduler.run(bulk_to_slabs_flow(atoms)) # (1)!
# Print the results
print(result)
- We didn't need to wrap
bulk_to_slabs_flow
with a decorator because it is already pre-decorated with a@flow
decorator.
Warning
Due to the difference in how Jobflow handles workflows (particularly dynamic ones) compared to other supported workflow engines, any quacc recipes that have been pre-defined with a @flow
decorator (i.e. have _flow
in the name) cannot be run directly with Jobflow. Rather, a Jobflow-specific Flow
needs to be constructed by the user.