Workflows#
Workflows compose jobs with dependency relationships, enabling sequential execution, fan-out parallelization, and fan-in convergence patterns. By declaratively defining jobs and their connections, you can build complex processing pipelines that are fault-tolerant and horizontally scalable across all nodes.
Basic Usage#
Workflows are composed of jobs linked together through declarative dependencies. A workflow consists of these key elements:
Jobs — Individual units of work identified by unique names within the workflow
Dependencies — Relationships specifying execution order between jobs
Here’s a simple workflow that demonstrates sequential and parallel execution:
from oban import worker
from oban_pro import Workflow
@worker()
class StepWorker:
async def process(self, job):
print(f"Executing: {job.meta['name']}")
workflow = (
Workflow()
.add("a", StepWorker.new({}))
.add("b", StepWorker.new({}), deps="a")
.add("c", StepWorker.new({}), deps="b")
.add("d", StepWorker.new({}), deps="b")
.add("e", StepWorker.new({}), deps=["c", "d"])
)
await oban.enqueue_many(workflow)
The workflow is initialized with Workflow(), jobs are added with add(), dependencies are
specified with the deps parameter, and finally the workflow is inserted with oban.enqueue_many().
When executed, this workflow prints each step’s name in order. Steps c and d both depend on b,
so they may execute in parallel. Visually, the workflow looks like this:
a
│
b
╱ ╲
c d
╲ ╱
e
Common Patterns#
Workflows support several execution patterns that form the building blocks of workflow design. These patterns can be combined to create complex workflows tailored to your specific needs.
Sequential Execution#
Jobs run one after another, with each job waiting for the previous one to complete:
Workflow()
.add("first", Worker.new({}))
.add("second", Worker.new({}), deps="first")
.add("third", Worker.new({}), deps="second")
Fan-Out (1-to-many)#
One job triggers multiple parallel jobs:
Workflow()
.add("parent", Worker.new({}))
.add("child_1", Worker.new({}), deps="parent")
.add("child_2", Worker.new({}), deps="parent")
.add("child_3", Worker.new({}), deps="parent")
Fan-In (many-to-1)#
Multiple parallel jobs converge to a single job:
Workflow()
.add("step_1", Worker.new({}))
.add("step_2", Worker.new({}))
.add("step_3", Worker.new({}))
.add("final", Worker.new({}), deps=["step_1", "step_2", "step_3"])
Diamond Pattern#
Combines fan-out and fan-in into a diamond shape:
Workflow()
.add("start", Worker.new({}))
.add("left", Worker.new({}), deps="start")
.add("right", Worker.new({}), deps="start")
.add("end", Worker.new({}), deps=["left", "right"])
Sub-Workflows#
Workflows can be nested hierarchically using sub-workflows. This allows you to compose complex workflows from simpler ones, making it easier to organize and reuse workflow patterns.
The add_workflow() method lets you add an entire workflow as a dependency of another workflow.
Like add(), it accepts a name and optional dependencies, but instead of a job, it takes another
workflow.
from oban_pro import Workflow
# Create reusable sub-workflows
extract_flow = (
Workflow()
.add("extract", ExtractWorker.new({"source": "database"}))
.add("transform", TransformWorker.new({"type": "normalize"}), deps="extract")
)
notify_flow = (
Workflow()
.add("prepare", PrepareWorker.new({"template": "report"}))
.add("send", SendWorker.new({"method": "email"}), deps="prepare")
)
# Compose into a main workflow
workflow = (
Workflow()
.add("setup", SetupWorker.new({"mode": "initialize"}))
.add_workflow("extract", extract_flow, deps="setup")
.add_workflow("notify", notify_flow, deps="extract")
.add("finalize", CleanupWorker.new({"mode": "cleanup"}), deps="notify")
)
await oban.enqueue_many(workflow)
In this example, the main workflow has a single job named setup, followed by two sub-workflows,
and ends with a finalize job. The dependencies ensure proper execution order:
setup → extract → notify → finalize.
When you depend on a sub-workflow (like deps="extract"), the dependent job waits for all jobs
in that sub-workflow to complete before starting.
Sub-workflows are useful for:
Organizing large job dependencies into logical units
Enabling reuse of workflow patterns across your application
Simplifying the maintenance of complex job graphs
Collection Fan-Out#
The add_many() method is a convenience for creating sub-workflows from a collection of jobs. Jobs
in a list are auto-named by index (0, 1, 2, …). Jobs in a dict use their keys as names.
# Fan-out with numbered jobs
workflow = (
Workflow()
.add("setup", SetupWorker.new({}))
.add_many("process", [ProcessWorker.new({"item": id}) for id in items], deps="setup")
.add("finish", FinishWorker.new({}), deps="process")
)
# Fan-out with named jobs
workflow = (
Workflow()
.add_many("fetch", {
"users": FetchWorker.new({"table": "users"}),
"orders": FetchWorker.new({"table": "orders"}),
})
.add("merge", MergeWorker.new({}), deps="fetch")
)
This pattern is especially useful for processing variable-sized collections where the number of jobs isn’t known until runtime.
Cascading Functions#
Cascades allow you to build workflows using plain functions that automatically receive context and previous step results. Each function receives a dict containing the workflow context and the recorded results of its dependencies. Return values are automatically recorded and made available to subsequent steps.
Basic Cascades#
Here’s an ETL (Extract, Transform, Load) pipeline using cascading functions:
from oban_pro import Workflow
async def extract(context):
source = context["source"]
return {"records": await fetch_data(source), "extracted_at": datetime.now()}
async def transform(context):
records = context["extract"]["records"]
return {"transformed": [process(rec) for rec in records]}
async def load(context):
records = context["transform"]["transformed"]
await save_to_database(records)
return {"loaded": len(records)}
workflow = (
Workflow()
.add_context({"source": "api", "batch_id": 123})
.add_cascade("extract", extract)
.add_cascade("transform", transform, deps="extract")
.add_cascade("load", load, deps="transform")
)
await oban.enqueue_many(workflow)
When this workflow runs, each function is automatically called with a context dict containing:
Values from
add_context()(e.g.,context["source"])Recorded results from dependencies, keyed by step name (e.g.,
context["extract"])
There’s no need to manually fetch recorded results as cascades handle this automatically.
Workflow Context#
As you may have noticed above, you can use add_context() to store shared data that all cascade
functions can access:
workflow = (
Workflow()
.add_context({"user_id": 123, "batch_size": 100, "environment": "production"})
.add_cascade("step_1", step_1_func)
.add_cascade("step_2", step_2_func, deps="step_1")
)
The context is available to all cascade functions and is merged with recorded results from dependencies.
Fan-Out Cascades#
Cascades can fan out across multiple items by providing a tuple of (items, function). The
function receives each item as its first argument and the context as its second:
async def process_user(user_id, context):
batch_id = context["batch_id"]
return {"user_id": user_id, "processed": True}
async def summarize(context):
# Results from fan-out are indexed by id or dict key, context["0"], context["1"], etc.
results = context["process"]
return sum(1 for res in results.values() if res["processed"])
user_ids = [1, 2, 3, 4, 5]
workflow = (
Workflow()
.add_context({"batch_id": 456})
.add_cascade("process", (user_ids, process_user))
.add_cascade("summarize", summarize, deps="process")
)
Each item creates a separate job in a sub-workflow. When using a list, jobs are named by index
("0", "1", etc.). You can also use a dict for named jobs:
sources = {"wiki": "https://...", "docs": "https://..."}
workflow = (
Workflow()
.add_cascade("fetch", (sources, fetch_from_source))
.add_cascade("merge", merge_results, deps="fetch")
)
# Results available as context["fetch"]["wiki"] and context["fetch"]["docs"]
Cascading functions are especially valuable for:
Data pipelines where each step needs the output of previous steps
Minimizing boilerplate compared to manual
get_recorded()callsBuilding fan-out/fan-in patterns with automatic result collection
Grafting Sub-Workflows#
Grafting allows sub-workflows to be attached after a workflow has started. With grafting, you define placeholders that are expanded into full sub-workflows when data becomes available at runtime.
A graft job serves as a placeholder in the workflow. When the graft job executes, it must build and attach a new sub-workflow at that point. Any downstream jobs that depend on the graft will wait for the entire grafted sub-workflow to complete.
Creating Graft Points#
Use add_graft() to create a placeholder that will be filled in at runtime:
@worker()
class UserNotifier:
async def process(self, job):
# Fetch user IDs at runtime—this data wasn't available when workflow was created
user_ids = await get_user_ids_for_account(job.args["account_id"])
# Build the sub-workflow dynamically
sub_workflow = Workflow()
for user_id in user_ids:
sub_workflow.add(str(user_id), NotifyWorker.new({"user_id": user_id}))
# Attach the sub-workflow to this graft point
await oban.enqueue_many(Workflow.apply_graft(job, sub_workflow))
workflow = (
Workflow()
.add("setup", SetupWorker.new({}))
.add_graft("notify", UserNotifier.new({"account_id": 123}), deps="setup")
.add("summary", SummaryWorker.new({}), deps="notify")
)
await oban.enqueue_many(workflow)
When the workflow runs, the execution flow is:
setupruns firstnotify(the grafter) runs and callsapply_graft()to attach a sub-workflowAll jobs in the grafted sub-workflow execute
summaryruns after the entire grafted sub-workflow completes
When to Use Grafting#
Grafting is useful when:
The number of jobs depends on data discovered during execution
You need to conditionally execute different workflow branches
Sub-workflow structure can’t be determined until a previous step completes
For simpler cases where you know the items upfront, prefer add_many() or fan-out cascades.
Workflow Options#
Workflows provide several options to control their behavior, from identification to error handling.
Workflow ID#
Every workflow needs a unique identifier. By default, id is an auto-generated UUID (UUIDv7 on
Python 3.14+, UUIDv4 on older versions). For more control, you can provide a custom ID:
Workflow(id="custom-unique-id")
Custom IDs are useful when you need to reference a workflow externally or ensure idempotency.
Workflow Name#
Workflows accept an optional name that describes their purpose:
Workflow(name="nightly-etl")
While the id must be unique, the name doesn’t have to be, so it can serve as a general-purpose
label for categorizing or grouping workflows.
Dependency Handling#
By default, workflows use conservative dependency handling—if an upstream job is cancelled or discarded, dependent jobs are automatically cancelled. You can customize this behavior:
# Apply to entire workflow
Workflow(ignore_cancelled=True, ignore_discarded=True)
Or configure individual jobs:
Workflow()
.add("step_1", Worker.new({}))
.add("step_2", Worker.new({}), deps="step_1", ignore_cancelled=True)
.add("step_3", Worker.new({}), deps="step_2", ignore_discarded=True)
ignore_cancelled — Treat
cancelleddependencies as completed rather than cancelling downstream jobs. Defaults toFalse.ignore_discarded — Treat
discardeddependencies as completed rather than cancelling downstream jobs. Defaults toFalse.
Introspection#
Workflow jobs are tied together through meta attributes. You can retrieve these jobs and check
workflow status using several methods.
Getting a Workflow Handle#
Use Workflow.for_job() to get a workflow instance from within a running job:
@worker()
class MyWorker:
async def process(self, job):
workflow = Workflow.for_job(job)
# Now you can use workflow methods
status = await workflow.status()
all_jobs = await workflow.all_jobs()
You can also create a workflow handle directly if you have the workflow ID:
workflow = Workflow(id="known-workflow-id")
status = await workflow.status()
Fetching Jobs#
Retrieve jobs from a workflow using get_job() for a single job or all_jobs() for multiple:
workflow = Workflow.for_job(job)
# Get a single job by name
extract_job = await workflow.get_job("extract")
# Get all jobs in the workflow
all_jobs = await workflow.all_jobs()
# Get only specific named jobs
some_jobs = await workflow.all_jobs(names=["extract", "transform"])
# Get only the dependencies of the current job
dep_jobs = await workflow.all_jobs(deps_of=job)
Checking Status#
Get detailed status information about a workflow using status():
workflow = Workflow.for_job(job)
status = await workflow.status()
print(status.state) # "executing", "completed", "cancelled", or "discarded"
print(status.total) # Total number of jobs
print(status.counts) # Dict of counts by state: {"completed": 5, "executing": 2, ...}
print(status.duration) # Time from start to completion (if finished)
print(status.started_at) # When the first job started
print(status.stopped_at) # When the last job finished (if complete)
print(status.subs) # Dict of sub-workflow statuses
The state field reflects the overall workflow state:
"executing"— At least one job is still running or pending"completed"— All jobs completed successfully"cancelled"— At least one job was cancelled (and workflow isn’t still executing)"discarded"— At least one job was discarded (and workflow isn’t still executing)
Visualization#
Workflows are a type of Directed Acyclic Graph, which means we can represent them visually as nodes (jobs) connected by edges (dependencies). By converting workflows into standard graph description languages, we can create clear visual representations of job execution patterns.
Converting to Graph Formats#
There are several built-in visualization options:
to_graph()— Returns aGraphobject with vertices and edgesto_dot()— Generates Graphviz DOT formatto_mermaid()— Produces Mermaid flowchart markup
For example, let’s generate a mermaid flowchart from a workflow:
workflow = (
Workflow()
.add("setup", SetupWorker.new({}))
.add("process_a", ProcessWorker.new({}), deps="setup")
.add("process_b", ProcessWorker.new({}), deps="setup")
.add("finish", FinishWorker.new({}), deps=["process_a", "process_b"])
)
print(workflow.to_mermaid())
This produces the following mermaid output, where each vertex shows the job name and worker:
flowchart TD
setup["setup (SetupWorker)"]
process_a["process_a (ProcessWorker)"]
process_b["process_b (ProcessWorker)"]
finish["finish (FinishWorker)"]
setup --> process_a
setup --> process_b
process_a --> finish
process_b --> finish
Rendering Visualizations#
You can render these visualizations in several ways:
Jupyter Notebooks — Display mermaid diagrams using IPython’s display capabilities or a mermaid extension:
from IPython.display import display, Markdown
display(Markdown(f"```mermaid\n{workflow.to_mermaid()}```"))
Command Line — Use the mmdc (mermaid CLI) or dot (Graphviz) commands to generate images:
# Using mermaid-cli
python -c "from myapp import workflow; print(workflow.to_mermaid())" | mmdc -i - -o workflow.svg
# Using Graphviz
python -c "from myapp import workflow; print(workflow.to_dot())" | dot -Tsvg -o workflow.svg
Documentation — Many documentation tools (MkDocs, Sphinx with extensions, GitHub) render mermaid code blocks directly:
```mermaid
flowchart TD
setup["setup (SetupWorker)"]
process["process (ProcessWorker)"]
setup --> process
```
The visualization clearly shows how workflows fan-out and fan-in, making complex dependency relationships immediately apparent.
Cancellation and Retry#
Workflows provide methods to cancel or retry jobs in bulk.
Cancelling Jobs#
Cancel jobs in a workflow using cancel_jobs():
workflow = Workflow.for_job(job)
# Cancel all jobs in the workflow
count = await workflow.cancel_jobs()
# Cancel only specific named jobs
count = await workflow.cancel_jobs(names=["step_2", "step_3"])
Jobs in a final state (completed, cancelled, discarded) cannot be cancelled. For executing
jobs, the database state is updated immediately, but running tasks are not forcefully terminated.
Retrying Jobs#
Retry failed or cancelled jobs using retry_jobs():
workflow = Workflow.for_job(job)
# Retry all retryable jobs in the workflow
count = await workflow.retry_jobs()
# Retry only specific named jobs
count = await workflow.retry_jobs(names=["failed_step"])
Jobs that are available or executing are ignored. Jobs without dependencies are set to
available, while jobs with dependencies are set to suspended to wait for their dependencies.
API Reference#
Workflow Class#
Constructor#
Workflow(
id: str | None = None,
name: str | None = None,
check_deps: bool = True,
ignore_cancelled: bool = False,
ignore_discarded: bool = False,
)
id — Custom workflow ID. Auto-generated UUID if not provided.
name — Optional name for categorizing workflows.
check_deps — Validate that dependencies exist when adding jobs. Defaults to
True.ignore_cancelled — Treat cancelled deps as completed. Defaults to
False.ignore_discarded — Treat discarded deps as completed. Defaults to
False.
Building Methods#
add(name, job, *, deps=[], ignore_cancelled=None, ignore_discarded=None)
Add a named job to the workflow.
name — Unique name for this job within the workflow.
job — The job to add, created with
Worker.new(args).deps — Job names this one depends on. String or list of strings.
ignore_cancelled — Per-job override for cancelled dependency handling.
ignore_discarded — Per-job override for discarded dependency handling.
Returns the workflow for method chaining.
add_workflow(name, sub_workflow, *, deps=[])
Add a sub-workflow with optional dependencies on parent jobs.
name — Unique name for this sub-workflow.
sub_workflow — The workflow to nest.
deps — Names of parent jobs the sub-workflow depends on.
add_many(name, jobs, *, deps=[])
Add multiple jobs as a sub-workflow. Jobs in a list are named by index; jobs in a dict use keys.
name — Unique name for this sub-workflow.
jobs — List or dict of jobs.
deps — Names of parent jobs the sub-workflow depends on.
add_cascade(name, func_or_tuple, *, deps=[], **job_opts)
Add a cascade function that receives context and auto-records results.
name — Unique name for this cascade.
func_or_tuple — Either a function
(context) -> resultor a tuple(items, func)for fan-out.deps — Names of jobs this cascade depends on.
job_opts — Additional job options (queue, priority, etc.).
add_graft(name, job, *, deps=[], **kwargs)
Add a grafting placeholder for attaching a sub-workflow at runtime.
name — Unique name for this graft point.
job — The grafter job that will call
apply_graft().deps — Names of jobs this graft depends on.
add_context(value)
Store shared context data accessible to all cascade functions.
value — A dict of context data.
Class Methods#
Workflow.for_job(job)
Create a workflow instance from a job that belongs to a workflow.
job — A job with
workflow_idin its meta.
Workflow.apply_graft(job, sub_workflow)
Attach a sub-workflow to a graft point. Must be called from within a grafter job’s process().
job — The currently executing grafter job.
sub_workflow — The sub-workflow to attach.
Returns a workflow ready to be inserted with oban.enqueue_many().
Instance Methods#
await get_job(name)
Fetch a single job from the workflow by name. Returns Job | None.
await all_jobs(*, names=None, deps_of=None)
Fetch jobs belonging to the workflow.
names — Filter to only return jobs with these names.
deps_of — Filter to only return jobs that are dependencies of this job.
Returns a list of jobs.
await get_recorded(name)
Fetch the recorded return value from a workflow job by name. Returns the decoded value or None.
await all_recorded(*, names=None, deps_of=None)
Fetch all recorded return values from workflow jobs.
names — Filter to jobs with these names.
deps_of — Filter to jobs that are dependencies of this job.
Returns a dict mapping job names to their recorded values. Sub-workflow results are nested.
await get_context()
Retrieve the workflow’s context data set by add_context(). Returns the context dict or None.
await status()
Get current status of the workflow and all sub-workflows. Returns a WorkflowStatus object.
Returned by workflow.status().
id — The workflow ID.
name — The workflow name (if set).
state — Overall state:
"executing","completed","cancelled","discarded", or"unknown".total — Total number of jobs.
counts — Dict of job counts by state.
duration — Time from start to completion (
timedeltaorNone).started_at — When the first job started (
datetimeorNone).stopped_at — When the last job finished (
datetimeorNone).subs — Dict of sub-workflow statuses, keyed by sub-workflow name.
await cancel_jobs(*, names=None)
Cancel jobs in the workflow. Returns the number of jobs cancelled.
names — Filter to only cancel jobs with these names.
await retry_jobs(*, names=None)
Retry jobs in the workflow. Returns the number of jobs retried.
names — Filter to only retry jobs with these names.
to_graph()
Build a Graph representation of the workflow. Returns a Graph object with vertices and
edges attributes. Sub-workflows are represented as nested Graph objects within vertices.
to_dot()
Convert the workflow to Graphviz DOT format. Returns a string suitable for rendering with Graphviz tools.
to_mermaid()
Convert the workflow to Mermaid flowchart format. Returns a string suitable for rendering in Mermaid-compatible viewers.