Workflows#

Workflows compose jobs with dependency relationships, enabling sequential execution, fan-out parallelization, and fan-in convergence patterns. By declaratively defining jobs and their connections, you can build complex processing pipelines that are fault-tolerant and horizontally scalable across all nodes.

Basic Usage#

Workflows are composed of jobs linked together through declarative dependencies. A workflow consists of these key elements:

  • Jobs — Individual units of work identified by unique names within the workflow

  • Dependencies — Relationships specifying execution order between jobs

Here’s a simple workflow that demonstrates sequential and parallel execution:

from oban import worker
from oban_pro import Workflow

@worker()
class StepWorker:
    async def process(self, job):
        print(f"Executing: {job.meta['name']}")

workflow = (
    Workflow()
    .add("a", StepWorker.new({}))
    .add("b", StepWorker.new({}), deps="a")
    .add("c", StepWorker.new({}), deps="b")
    .add("d", StepWorker.new({}), deps="b")
    .add("e", StepWorker.new({}), deps=["c", "d"])
)

await oban.enqueue_many(workflow)

The workflow is initialized with Workflow(), jobs are added with add(), dependencies are specified with the deps parameter, and finally the workflow is inserted with oban.enqueue_many().

When executed, this workflow prints each step’s name in order. Steps c and d both depend on b, so they may execute in parallel. Visually, the workflow looks like this:

    a
    │
    b
   ╱ ╲
  c   d
   ╲ ╱
    e

Common Patterns#

Workflows support several execution patterns that form the building blocks of workflow design. These patterns can be combined to create complex workflows tailored to your specific needs.

Sequential Execution#

Jobs run one after another, with each job waiting for the previous one to complete:

Workflow()
.add("first", Worker.new({}))
.add("second", Worker.new({}), deps="first")
.add("third", Worker.new({}), deps="second")

Fan-Out (1-to-many)#

One job triggers multiple parallel jobs:

Workflow()
.add("parent", Worker.new({}))
.add("child_1", Worker.new({}), deps="parent")
.add("child_2", Worker.new({}), deps="parent")
.add("child_3", Worker.new({}), deps="parent")

Fan-In (many-to-1)#

Multiple parallel jobs converge to a single job:

Workflow()
.add("step_1", Worker.new({}))
.add("step_2", Worker.new({}))
.add("step_3", Worker.new({}))
.add("final", Worker.new({}), deps=["step_1", "step_2", "step_3"])

Diamond Pattern#

Combines fan-out and fan-in into a diamond shape:

Workflow()
.add("start", Worker.new({}))
.add("left", Worker.new({}), deps="start")
.add("right", Worker.new({}), deps="start")
.add("end", Worker.new({}), deps=["left", "right"])

Sub-Workflows#

Workflows can be nested hierarchically using sub-workflows. This allows you to compose complex workflows from simpler ones, making it easier to organize and reuse workflow patterns.

The add_workflow() method lets you add an entire workflow as a dependency of another workflow. Like add(), it accepts a name and optional dependencies, but instead of a job, it takes another workflow.

from oban_pro import Workflow

# Create reusable sub-workflows
extract_flow = (
    Workflow()
    .add("extract", ExtractWorker.new({"source": "database"}))
    .add("transform", TransformWorker.new({"type": "normalize"}), deps="extract")
)

notify_flow = (
    Workflow()
    .add("prepare", PrepareWorker.new({"template": "report"}))
    .add("send", SendWorker.new({"method": "email"}), deps="prepare")
)

# Compose into a main workflow
workflow = (
    Workflow()
    .add("setup", SetupWorker.new({"mode": "initialize"}))
    .add_workflow("extract", extract_flow, deps="setup")
    .add_workflow("notify", notify_flow, deps="extract")
    .add("finalize", CleanupWorker.new({"mode": "cleanup"}), deps="notify")
)

await oban.enqueue_many(workflow)

In this example, the main workflow has a single job named setup, followed by two sub-workflows, and ends with a finalize job. The dependencies ensure proper execution order: setup extract notify finalize.

When you depend on a sub-workflow (like deps="extract"), the dependent job waits for all jobs in that sub-workflow to complete before starting.

Sub-workflows are useful for:

  • Organizing large job dependencies into logical units

  • Enabling reuse of workflow patterns across your application

  • Simplifying the maintenance of complex job graphs

Collection Fan-Out#

The add_many() method is a convenience for creating sub-workflows from a collection of jobs. Jobs in a list are auto-named by index (0, 1, 2, …). Jobs in a dict use their keys as names.

# Fan-out with numbered jobs
workflow = (
    Workflow()
    .add("setup", SetupWorker.new({}))
    .add_many("process", [ProcessWorker.new({"item": id}) for id in items], deps="setup")
    .add("finish", FinishWorker.new({}), deps="process")
)

# Fan-out with named jobs
workflow = (
    Workflow()
    .add_many("fetch", {
        "users": FetchWorker.new({"table": "users"}),
        "orders": FetchWorker.new({"table": "orders"}),
    })
    .add("merge", MergeWorker.new({}), deps="fetch")
)

This pattern is especially useful for processing variable-sized collections where the number of jobs isn’t known until runtime.

Sharing Results#

Directed dependencies between jobs, paired with recorded return values, allow downstream jobs to fetch the output of upstream jobs. This is useful for multi-step processes where each step builds on previous results.

Recording Results#

To record a job’s result, return a Recorded() value from process():

@worker()
class ExtractWorker:
    async def process(self, job):
        data = await fetch_data(job.args["source"])

        return Record(data)

The return value is stored in the job’s metadata and can be retrieved by downstream jobs.

Fetching Results#

Use Workflow.for_job() to get a workflow handle from within a job, then fetch recorded values:

@worker()
class TransformWorker:
    async def process(self, job):
        # Fetch a single recorded result by name
        records = await Workflow.for_job(job).get_recorded("extract")

        # Transform the data
        return do_some_transformation(transformed)

For jobs with multiple dependencies, use all_recorded() to fetch all upstream results at once:

@worker()
class MergeWorker:
    async def process(self, job):
        # Fetch all recorded results from this job's dependencies
        results = await Workflow.for_job(job).all_recorded(deps_of=job)

        # results is a dict: {"dep_name": recorded_value, ...}
        users = results["users"]
        orders = results["orders"]

        return merge(users, orders)

The deps_of parameter filters results to only include jobs that are direct dependencies of the specified job, which is typically what you want when aggregating upstream results. It’s also able to fetch all recorded values for a sub-workflow the current job depends on.

Cascading Functions#

Cascades allow you to build workflows using plain functions that automatically receive context and previous step results. Each function receives a dict containing the workflow context and the recorded results of its dependencies. Return values are automatically recorded and made available to subsequent steps.

Basic Cascades#

Here’s an ETL (Extract, Transform, Load) pipeline using cascading functions:

from oban_pro import Workflow

async def extract(context):
    source = context["source"]
    return {"records": await fetch_data(source), "extracted_at": datetime.now()}

async def transform(context):
    records = context["extract"]["records"]
    return {"transformed": [process(rec) for rec in records]}

async def load(context):
    records = context["transform"]["transformed"]
    await save_to_database(records)
    return {"loaded": len(records)}

workflow = (
    Workflow()
    .add_context({"source": "api", "batch_id": 123})
    .add_cascade("extract", extract)
    .add_cascade("transform", transform, deps="extract")
    .add_cascade("load", load, deps="transform")
)

await oban.enqueue_many(workflow)

When this workflow runs, each function is automatically called with a context dict containing:

  • Values from add_context() (e.g., context["source"])

  • Recorded results from dependencies, keyed by step name (e.g., context["extract"])

There’s no need to manually fetch recorded results as cascades handle this automatically.

Workflow Context#

As you may have noticed above, you can use add_context() to store shared data that all cascade functions can access:

workflow = (
    Workflow()
    .add_context({"user_id": 123, "batch_size": 100, "environment": "production"})
    .add_cascade("step_1", step_1_func)
    .add_cascade("step_2", step_2_func, deps="step_1")
)

The context is available to all cascade functions and is merged with recorded results from dependencies.

Fan-Out Cascades#

Cascades can fan out across multiple items by providing a tuple of (items, function). The function receives each item as its first argument and the context as its second:

async def process_user(user_id, context):
    batch_id = context["batch_id"]

    return {"user_id": user_id, "processed": True}

async def summarize(context):
    # Results from fan-out are indexed by id or dict key, context["0"], context["1"], etc.
    results = context["process"]

    return sum(1 for res in results.values() if res["processed"])

user_ids = [1, 2, 3, 4, 5]

workflow = (
    Workflow()
    .add_context({"batch_id": 456})
    .add_cascade("process", (user_ids, process_user))
    .add_cascade("summarize", summarize, deps="process")
)

Each item creates a separate job in a sub-workflow. When using a list, jobs are named by index ("0", "1", etc.). You can also use a dict for named jobs:

sources = {"wiki": "https://...", "docs": "https://..."}

workflow = (
    Workflow()
    .add_cascade("fetch", (sources, fetch_from_source))
    .add_cascade("merge", merge_results, deps="fetch")
)
# Results available as context["fetch"]["wiki"] and context["fetch"]["docs"]

Cascading functions are especially valuable for:

  • Data pipelines where each step needs the output of previous steps

  • Minimizing boilerplate compared to manual get_recorded() calls

  • Building fan-out/fan-in patterns with automatic result collection

Grafting Sub-Workflows#

Grafting allows sub-workflows to be attached after a workflow has started. With grafting, you define placeholders that are expanded into full sub-workflows when data becomes available at runtime.

A graft job serves as a placeholder in the workflow. When the graft job executes, it must build and attach a new sub-workflow at that point. Any downstream jobs that depend on the graft will wait for the entire grafted sub-workflow to complete.

Creating Graft Points#

Use add_graft() to create a placeholder that will be filled in at runtime:

@worker()
class UserNotifier:
    async def process(self, job):
        # Fetch user IDs at runtime—this data wasn't available when workflow was created
        user_ids = await get_user_ids_for_account(job.args["account_id"])

        # Build the sub-workflow dynamically
        sub_workflow = Workflow()

        for user_id in user_ids:
            sub_workflow.add(str(user_id), NotifyWorker.new({"user_id": user_id}))

        # Attach the sub-workflow to this graft point
        await oban.enqueue_many(Workflow.apply_graft(job, sub_workflow))

workflow = (
    Workflow()
    .add("setup", SetupWorker.new({}))
    .add_graft("notify", UserNotifier.new({"account_id": 123}), deps="setup")
    .add("summary", SummaryWorker.new({}), deps="notify")
)

await oban.enqueue_many(workflow)

When the workflow runs, the execution flow is:

  1. setup runs first

  2. notify (the grafter) runs and calls apply_graft() to attach a sub-workflow

  3. All jobs in the grafted sub-workflow execute

  4. summary runs after the entire grafted sub-workflow completes

When to Use Grafting#

Grafting is useful when:

  • The number of jobs depends on data discovered during execution

  • You need to conditionally execute different workflow branches

  • Sub-workflow structure can’t be determined until a previous step completes

For simpler cases where you know the items upfront, prefer add_many() or fan-out cascades.

Workflow Options#

Workflows provide several options to control their behavior, from identification to error handling.

Workflow ID#

Every workflow needs a unique identifier. By default, id is an auto-generated UUID (UUIDv7 on Python 3.14+, UUIDv4 on older versions). For more control, you can provide a custom ID:

Workflow(id="custom-unique-id")

Custom IDs are useful when you need to reference a workflow externally or ensure idempotency.

Workflow Name#

Workflows accept an optional name that describes their purpose:

Workflow(name="nightly-etl")

While the id must be unique, the name doesn’t have to be, so it can serve as a general-purpose label for categorizing or grouping workflows.

Dependency Handling#

By default, workflows use conservative dependency handling—if an upstream job is cancelled or discarded, dependent jobs are automatically cancelled. You can customize this behavior:

# Apply to entire workflow
Workflow(ignore_cancelled=True, ignore_discarded=True)

Or configure individual jobs:

Workflow()
.add("step_1", Worker.new({}))
.add("step_2", Worker.new({}), deps="step_1", ignore_cancelled=True)
.add("step_3", Worker.new({}), deps="step_2", ignore_discarded=True)
  • ignore_cancelled — Treat cancelled dependencies as completed rather than cancelling downstream jobs. Defaults to False.

  • ignore_discarded — Treat discarded dependencies as completed rather than cancelling downstream jobs. Defaults to False.

Introspection#

Workflow jobs are tied together through meta attributes. You can retrieve these jobs and check workflow status using several methods.

Getting a Workflow Handle#

Use Workflow.for_job() to get a workflow instance from within a running job:

@worker()
class MyWorker:
    async def process(self, job):
        workflow = Workflow.for_job(job)

        # Now you can use workflow methods
        status = await workflow.status()
        all_jobs = await workflow.all_jobs()

You can also create a workflow handle directly if you have the workflow ID:

workflow = Workflow(id="known-workflow-id")
status = await workflow.status()

Fetching Jobs#

Retrieve jobs from a workflow using get_job() for a single job or all_jobs() for multiple:

workflow = Workflow.for_job(job)

# Get a single job by name
extract_job = await workflow.get_job("extract")

# Get all jobs in the workflow
all_jobs = await workflow.all_jobs()

# Get only specific named jobs
some_jobs = await workflow.all_jobs(names=["extract", "transform"])

# Get only the dependencies of the current job
dep_jobs = await workflow.all_jobs(deps_of=job)

Checking Status#

Get detailed status information about a workflow using status():

workflow = Workflow.for_job(job)
status = await workflow.status()

print(status.state)      # "executing", "completed", "cancelled", or "discarded"
print(status.total)      # Total number of jobs
print(status.counts)     # Dict of counts by state: {"completed": 5, "executing": 2, ...}
print(status.duration)   # Time from start to completion (if finished)
print(status.started_at) # When the first job started
print(status.stopped_at) # When the last job finished (if complete)
print(status.subs)       # Dict of sub-workflow statuses

The state field reflects the overall workflow state:

  • "executing" — At least one job is still running or pending

  • "completed" — All jobs completed successfully

  • "cancelled" — At least one job was cancelled (and workflow isn’t still executing)

  • "discarded" — At least one job was discarded (and workflow isn’t still executing)

Visualization#

Workflows are a type of Directed Acyclic Graph, which means we can represent them visually as nodes (jobs) connected by edges (dependencies). By converting workflows into standard graph description languages, we can create clear visual representations of job execution patterns.

Converting to Graph Formats#

There are several built-in visualization options:

  • to_graph() — Returns a Graph object with vertices and edges

  • to_dot() — Generates Graphviz DOT format

  • to_mermaid() — Produces Mermaid flowchart markup

For example, let’s generate a mermaid flowchart from a workflow:

workflow = (
    Workflow()
    .add("setup", SetupWorker.new({}))
    .add("process_a", ProcessWorker.new({}), deps="setup")
    .add("process_b", ProcessWorker.new({}), deps="setup")
    .add("finish", FinishWorker.new({}), deps=["process_a", "process_b"])
)

print(workflow.to_mermaid())

This produces the following mermaid output, where each vertex shows the job name and worker:

flowchart TD
  setup["setup (SetupWorker)"]
  process_a["process_a (ProcessWorker)"]
  process_b["process_b (ProcessWorker)"]
  finish["finish (FinishWorker)"]
  setup --> process_a
  setup --> process_b
  process_a --> finish
  process_b --> finish

Rendering Visualizations#

You can render these visualizations in several ways:

Jupyter Notebooks — Display mermaid diagrams using IPython’s display capabilities or a mermaid extension:

from IPython.display import display, Markdown

display(Markdown(f"```mermaid\n{workflow.to_mermaid()}```"))

Command Line — Use the mmdc (mermaid CLI) or dot (Graphviz) commands to generate images:

# Using mermaid-cli
python -c "from myapp import workflow; print(workflow.to_mermaid())" | mmdc -i - -o workflow.svg

# Using Graphviz
python -c "from myapp import workflow; print(workflow.to_dot())" | dot -Tsvg -o workflow.svg

Documentation — Many documentation tools (MkDocs, Sphinx with extensions, GitHub) render mermaid code blocks directly:

```mermaid
flowchart TD
  setup["setup (SetupWorker)"]
  process["process (ProcessWorker)"]
  setup --> process
```

The visualization clearly shows how workflows fan-out and fan-in, making complex dependency relationships immediately apparent.

Cancellation and Retry#

Workflows provide methods to cancel or retry jobs in bulk.

Cancelling Jobs#

Cancel jobs in a workflow using cancel_jobs():

workflow = Workflow.for_job(job)

# Cancel all jobs in the workflow
count = await workflow.cancel_jobs()

# Cancel only specific named jobs
count = await workflow.cancel_jobs(names=["step_2", "step_3"])

Jobs in a final state (completed, cancelled, discarded) cannot be cancelled. For executing jobs, the database state is updated immediately, but running tasks are not forcefully terminated.

Retrying Jobs#

Retry failed or cancelled jobs using retry_jobs():

workflow = Workflow.for_job(job)

# Retry all retryable jobs in the workflow
count = await workflow.retry_jobs()

# Retry only specific named jobs
count = await workflow.retry_jobs(names=["failed_step"])

Jobs that are available or executing are ignored. Jobs without dependencies are set to available, while jobs with dependencies are set to suspended to wait for their dependencies.

API Reference#

Workflow Class#

Constructor#

Workflow(
    id: str | None = None,
    name: str | None = None,
    check_deps: bool = True,
    ignore_cancelled: bool = False,
    ignore_discarded: bool = False,
)
  • id — Custom workflow ID. Auto-generated UUID if not provided.

  • name — Optional name for categorizing workflows.

  • check_deps — Validate that dependencies exist when adding jobs. Defaults to True.

  • ignore_cancelled — Treat cancelled deps as completed. Defaults to False.

  • ignore_discarded — Treat discarded deps as completed. Defaults to False.

Building Methods#

add(name, job, *, deps=[], ignore_cancelled=None, ignore_discarded=None)

Add a named job to the workflow.

  • name — Unique name for this job within the workflow.

  • job — The job to add, created with Worker.new(args).

  • deps — Job names this one depends on. String or list of strings.

  • ignore_cancelled — Per-job override for cancelled dependency handling.

  • ignore_discarded — Per-job override for discarded dependency handling.

Returns the workflow for method chaining.


add_workflow(name, sub_workflow, *, deps=[])

Add a sub-workflow with optional dependencies on parent jobs.

  • name — Unique name for this sub-workflow.

  • sub_workflow — The workflow to nest.

  • deps — Names of parent jobs the sub-workflow depends on.


add_many(name, jobs, *, deps=[])

Add multiple jobs as a sub-workflow. Jobs in a list are named by index; jobs in a dict use keys.

  • name — Unique name for this sub-workflow.

  • jobs — List or dict of jobs.

  • deps — Names of parent jobs the sub-workflow depends on.


add_cascade(name, func_or_tuple, *, deps=[], **job_opts)

Add a cascade function that receives context and auto-records results.

  • name — Unique name for this cascade.

  • func_or_tuple — Either a function (context) -> result or a tuple (items, func) for fan-out.

  • deps — Names of jobs this cascade depends on.

  • job_opts — Additional job options (queue, priority, etc.).


add_graft(name, job, *, deps=[], **kwargs)

Add a grafting placeholder for attaching a sub-workflow at runtime.

  • name — Unique name for this graft point.

  • job — The grafter job that will call apply_graft().

  • deps — Names of jobs this graft depends on.


add_context(value)

Store shared context data accessible to all cascade functions.

  • value — A dict of context data.

Class Methods#

Workflow.for_job(job)

Create a workflow instance from a job that belongs to a workflow.

  • job — A job with workflow_id in its meta.


Workflow.apply_graft(job, sub_workflow)

Attach a sub-workflow to a graft point. Must be called from within a grafter job’s process().

  • job — The currently executing grafter job.

  • sub_workflow — The sub-workflow to attach.

Returns a workflow ready to be inserted with oban.enqueue_many().

Instance Methods#

await get_job(name)

Fetch a single job from the workflow by name. Returns Job | None.


await all_jobs(*, names=None, deps_of=None)

Fetch jobs belonging to the workflow.

  • names — Filter to only return jobs with these names.

  • deps_of — Filter to only return jobs that are dependencies of this job.

Returns a list of jobs.


await get_recorded(name)

Fetch the recorded return value from a workflow job by name. Returns the decoded value or None.


await all_recorded(*, names=None, deps_of=None)

Fetch all recorded return values from workflow jobs.

  • names — Filter to jobs with these names.

  • deps_of — Filter to jobs that are dependencies of this job.

Returns a dict mapping job names to their recorded values. Sub-workflow results are nested.


await get_context()

Retrieve the workflow’s context data set by add_context(). Returns the context dict or None.


await status()

Get current status of the workflow and all sub-workflows. Returns a WorkflowStatus object.

Returned by workflow.status().

  • id — The workflow ID.

  • name — The workflow name (if set).

  • state — Overall state: "executing", "completed", "cancelled", "discarded", or "unknown".

  • total — Total number of jobs.

  • counts — Dict of job counts by state.

  • duration — Time from start to completion (timedelta or None).

  • started_at — When the first job started (datetime or None).

  • stopped_at — When the last job finished (datetime or None).

  • subs — Dict of sub-workflow statuses, keyed by sub-workflow name.


await cancel_jobs(*, names=None)

Cancel jobs in the workflow. Returns the number of jobs cancelled.

  • names — Filter to only cancel jobs with these names.


await retry_jobs(*, names=None)

Retry jobs in the workflow. Returns the number of jobs retried.

  • names — Filter to only retry jobs with these names.


to_graph()

Build a Graph representation of the workflow. Returns a Graph object with vertices and edges attributes. Sub-workflows are represented as nested Graph objects within vertices.


to_dot()

Convert the workflow to Graphviz DOT format. Returns a string suitable for rendering with Graphviz tools.


to_mermaid()

Convert the workflow to Mermaid flowchart format. Returns a string suitable for rendering in Mermaid-compatible viewers.