# Workflows Workflows compose jobs with dependency relationships, enabling sequential execution, fan-out parallelization, and fan-in convergence patterns. By declaratively defining jobs and their connections, you can build complex processing pipelines that are fault-tolerant and horizontally scalable across all nodes. ## Basic Usage Workflows are composed of jobs linked together through declarative dependencies. A workflow consists of these key elements: * **Jobs** — Individual units of work identified by unique names within the workflow * **Dependencies** — Relationships specifying execution order between jobs Here's a simple workflow that demonstrates sequential and parallel execution: ```python from oban import worker from oban_pro import Workflow @worker() class StepWorker: async def process(self, job): print(f"Executing: {job.meta['name']}") workflow = ( Workflow() .add("a", StepWorker.new({})) .add("b", StepWorker.new({}), deps="a") .add("c", StepWorker.new({}), deps="b") .add("d", StepWorker.new({}), deps="b") .add("e", StepWorker.new({}), deps=["c", "d"]) ) await oban.enqueue_many(workflow) ``` The workflow is initialized with `Workflow()`, jobs are added with `add()`, dependencies are specified with the `deps` parameter, and finally the workflow is inserted with `oban.enqueue_many()`. When executed, this workflow prints each step's name in order. Steps `c` and `d` both depend on `b`, so they may execute in parallel. Visually, the workflow looks like this: ``` a │ b ╱ ╲ c d ╲ ╱ e ``` ## Common Patterns Workflows support several execution patterns that form the building blocks of workflow design. These patterns can be combined to create complex workflows tailored to your specific needs. ### Sequential Execution Jobs run one after another, with each job waiting for the previous one to complete: ```python Workflow() .add("first", Worker.new({})) .add("second", Worker.new({}), deps="first") .add("third", Worker.new({}), deps="second") ``` ### Fan-Out (1-to-many) One job triggers multiple parallel jobs: ```python Workflow() .add("parent", Worker.new({})) .add("child_1", Worker.new({}), deps="parent") .add("child_2", Worker.new({}), deps="parent") .add("child_3", Worker.new({}), deps="parent") ``` ### Fan-In (many-to-1) Multiple parallel jobs converge to a single job: ```python Workflow() .add("step_1", Worker.new({})) .add("step_2", Worker.new({})) .add("step_3", Worker.new({})) .add("final", Worker.new({}), deps=["step_1", "step_2", "step_3"]) ``` ### Diamond Pattern Combines fan-out and fan-in into a diamond shape: ```python Workflow() .add("start", Worker.new({})) .add("left", Worker.new({}), deps="start") .add("right", Worker.new({}), deps="start") .add("end", Worker.new({}), deps=["left", "right"]) ``` ## Sub-Workflows Workflows can be nested hierarchically using sub-workflows. This allows you to compose complex workflows from simpler ones, making it easier to organize and reuse workflow patterns. The `add_workflow()` method lets you add an entire workflow as a dependency of another workflow. Like `add()`, it accepts a name and optional dependencies, but instead of a job, it takes another workflow. ```python from oban_pro import Workflow # Create reusable sub-workflows extract_flow = ( Workflow() .add("extract", ExtractWorker.new({"source": "database"})) .add("transform", TransformWorker.new({"type": "normalize"}), deps="extract") ) notify_flow = ( Workflow() .add("prepare", PrepareWorker.new({"template": "report"})) .add("send", SendWorker.new({"method": "email"}), deps="prepare") ) # Compose into a main workflow workflow = ( Workflow() .add("setup", SetupWorker.new({"mode": "initialize"})) .add_workflow("extract", extract_flow, deps="setup") .add_workflow("notify", notify_flow, deps="extract") .add("finalize", CleanupWorker.new({"mode": "cleanup"}), deps="notify") ) await oban.enqueue_many(workflow) ``` In this example, the main workflow has a single job named `setup`, followed by two sub-workflows, and ends with a `finalize` job. The dependencies ensure proper execution order: `setup → extract → notify → finalize`. When you depend on a sub-workflow (like `deps="extract"`), the dependent job waits for *all* jobs in that sub-workflow to complete before starting. Sub-workflows are useful for: * Organizing large job dependencies into logical units * Enabling reuse of workflow patterns across your application * Simplifying the maintenance of complex job graphs ### Collection Fan-Out The `add_many()` method is a convenience for creating sub-workflows from a collection of jobs. Jobs in a list are auto-named by index (`0`, `1`, `2`, ...). Jobs in a dict use their keys as names. ```python # Fan-out with numbered jobs workflow = ( Workflow() .add("setup", SetupWorker.new({})) .add_many("process", [ProcessWorker.new({"item": id}) for id in items], deps="setup") .add("finish", FinishWorker.new({}), deps="process") ) # Fan-out with named jobs workflow = ( Workflow() .add_many("fetch", { "users": FetchWorker.new({"table": "users"}), "orders": FetchWorker.new({"table": "orders"}), }) .add("merge", MergeWorker.new({}), deps="fetch") ) ``` This pattern is especially useful for processing variable-sized collections where the number of jobs isn't known until runtime. ## Sharing Results Directed dependencies between jobs, paired with recorded return values, allow downstream jobs to fetch the output of upstream jobs. This is useful for multi-step processes where each step builds on previous results. ### Recording Results To record a job's result, return a `Recorded()` value from `process()`: ```python @worker() class ExtractWorker: async def process(self, job): data = await fetch_data(job.args["source"]) return Record(data) ``` The return value is stored in the job's metadata and can be retrieved by downstream jobs. ### Fetching Results Use `Workflow.for_job()` to get a workflow handle from within a job, then fetch recorded values: ```python @worker() class TransformWorker: async def process(self, job): # Fetch a single recorded result by name records = await Workflow.for_job(job).get_recorded("extract") # Transform the data return do_some_transformation(transformed) ``` For jobs with multiple dependencies, use `all_recorded()` to fetch all upstream results at once: ```python @worker() class MergeWorker: async def process(self, job): # Fetch all recorded results from this job's dependencies results = await Workflow.for_job(job).all_recorded(deps_of=job) # results is a dict: {"dep_name": recorded_value, ...} users = results["users"] orders = results["orders"] return merge(users, orders) ``` The `deps_of` parameter filters results to only include jobs that are direct dependencies of the specified job, which is typically what you want when aggregating upstream results. It's also able to fetch all recorded values for a sub-workflow the current job depends on. ## Cascading Functions Cascades allow you to build workflows using plain functions that automatically receive context and previous step results. Each function receives a dict containing the workflow context and the recorded results of its dependencies. Return values are automatically recorded and made available to subsequent steps. ### Basic Cascades Here's an ETL (Extract, Transform, Load) pipeline using cascading functions: ```python from oban_pro import Workflow async def extract(context): source = context["source"] return {"records": await fetch_data(source), "extracted_at": datetime.now()} async def transform(context): records = context["extract"]["records"] return {"transformed": [process(rec) for rec in records]} async def load(context): records = context["transform"]["transformed"] await save_to_database(records) return {"loaded": len(records)} workflow = ( Workflow() .add_context({"source": "api", "batch_id": 123}) .add_cascade("extract", extract) .add_cascade("transform", transform, deps="extract") .add_cascade("load", load, deps="transform") ) await oban.enqueue_many(workflow) ``` When this workflow runs, each function is automatically called with a context dict containing: * Values from `add_context()` (e.g., `context["source"]`) * Recorded results from dependencies, keyed by step name (e.g., `context["extract"]`) There's no need to manually fetch recorded results as cascades handle this automatically. ### Workflow Context As you may have noticed above, you can use `add_context()` to store shared data that all cascade functions can access: ```python workflow = ( Workflow() .add_context({"user_id": 123, "batch_size": 100, "environment": "production"}) .add_cascade("step_1", step_1_func) .add_cascade("step_2", step_2_func, deps="step_1") ) ``` The context is available to all cascade functions and is merged with recorded results from dependencies. ### Fan-Out Cascades Cascades can fan out across multiple items by providing a tuple of `(items, function)`. The function receives each item as its first argument and the context as its second: ```python async def process_user(user_id, context): batch_id = context["batch_id"] return {"user_id": user_id, "processed": True} async def summarize(context): # Results from fan-out are indexed by id or dict key, context["0"], context["1"], etc. results = context["process"] return sum(1 for res in results.values() if res["processed"]) user_ids = [1, 2, 3, 4, 5] workflow = ( Workflow() .add_context({"batch_id": 456}) .add_cascade("process", (user_ids, process_user)) .add_cascade("summarize", summarize, deps="process") ) ``` Each item creates a separate job in a sub-workflow. When using a list, jobs are named by index (`"0"`, `"1"`, etc.). You can also use a dict for named jobs: ```python sources = {"wiki": "https://...", "docs": "https://..."} workflow = ( Workflow() .add_cascade("fetch", (sources, fetch_from_source)) .add_cascade("merge", merge_results, deps="fetch") ) # Results available as context["fetch"]["wiki"] and context["fetch"]["docs"] ``` Cascading functions are especially valuable for: * Data pipelines where each step needs the output of previous steps * Minimizing boilerplate compared to manual `get_recorded()` calls * Building fan-out/fan-in patterns with automatic result collection ## Grafting Sub-Workflows Grafting allows sub-workflows to be attached *after* a workflow has started. With grafting, you define placeholders that are expanded into full sub-workflows when data becomes available at runtime. A graft job serves as a placeholder in the workflow. When the graft job executes, it must build and attach a new sub-workflow at that point. Any downstream jobs that depend on the graft will wait for the entire grafted sub-workflow to complete. ### Creating Graft Points Use `add_graft()` to create a placeholder that will be filled in at runtime: ```python @worker() class UserNotifier: async def process(self, job): # Fetch user IDs at runtime—this data wasn't available when workflow was created user_ids = await get_user_ids_for_account(job.args["account_id"]) # Build the sub-workflow dynamically sub_workflow = Workflow() for user_id in user_ids: sub_workflow.add(str(user_id), NotifyWorker.new({"user_id": user_id})) # Attach the sub-workflow to this graft point await oban.enqueue_many(Workflow.apply_graft(job, sub_workflow)) workflow = ( Workflow() .add("setup", SetupWorker.new({})) .add_graft("notify", UserNotifier.new({"account_id": 123}), deps="setup") .add("summary", SummaryWorker.new({}), deps="notify") ) await oban.enqueue_many(workflow) ``` When the workflow runs, the execution flow is: 1. `setup` runs first 2. `notify` (the grafter) runs and calls `apply_graft()` to attach a sub-workflow 3. All jobs in the grafted sub-workflow execute 4. `summary` runs after the entire grafted sub-workflow completes ### When to Use Grafting Grafting is useful when: * The number of jobs depends on data discovered during execution * You need to conditionally execute different workflow branches * Sub-workflow structure can't be determined until a previous step completes For simpler cases where you know the items upfront, prefer `add_many()` or fan-out cascades. ## Workflow Options Workflows provide several options to control their behavior, from identification to error handling. ### Workflow ID Every workflow needs a unique identifier. By default, `id` is an auto-generated UUID (UUIDv7 on Python 3.14+, UUIDv4 on older versions). For more control, you can provide a custom ID: ```python Workflow(id="custom-unique-id") ``` Custom IDs are useful when you need to reference a workflow externally or ensure idempotency. ### Workflow Name Workflows accept an optional name that describes their purpose: ```python Workflow(name="nightly-etl") ``` While the `id` must be unique, the `name` doesn't have to be, so it can serve as a general-purpose label for categorizing or grouping workflows. ### Dependency Handling By default, workflows use conservative dependency handling—if an upstream job is cancelled or discarded, dependent jobs are automatically cancelled. You can customize this behavior: ```python # Apply to entire workflow Workflow(ignore_cancelled=True, ignore_discarded=True) ``` Or configure individual jobs: ```python Workflow() .add("step_1", Worker.new({})) .add("step_2", Worker.new({}), deps="step_1", ignore_cancelled=True) .add("step_3", Worker.new({}), deps="step_2", ignore_discarded=True) ``` * **ignore_cancelled** — Treat `cancelled` dependencies as completed rather than cancelling downstream jobs. Defaults to `False`. * **ignore_discarded** — Treat `discarded` dependencies as completed rather than cancelling downstream jobs. Defaults to `False`. ## Introspection Workflow jobs are tied together through `meta` attributes. You can retrieve these jobs and check workflow status using several methods. ### Getting a Workflow Handle Use `Workflow.for_job()` to get a workflow instance from within a running job: ```python @worker() class MyWorker: async def process(self, job): workflow = Workflow.for_job(job) # Now you can use workflow methods status = await workflow.status() all_jobs = await workflow.all_jobs() ``` You can also create a workflow handle directly if you have the workflow ID: ```python workflow = Workflow(id="known-workflow-id") status = await workflow.status() ``` ### Fetching Jobs Retrieve jobs from a workflow using `get_job()` for a single job or `all_jobs()` for multiple: ```python workflow = Workflow.for_job(job) # Get a single job by name extract_job = await workflow.get_job("extract") # Get all jobs in the workflow all_jobs = await workflow.all_jobs() # Get only specific named jobs some_jobs = await workflow.all_jobs(names=["extract", "transform"]) # Get only the dependencies of the current job dep_jobs = await workflow.all_jobs(deps_of=job) ``` ### Checking Status Get detailed status information about a workflow using `status()`: ```python workflow = Workflow.for_job(job) status = await workflow.status() print(status.state) # "executing", "completed", "cancelled", or "discarded" print(status.total) # Total number of jobs print(status.counts) # Dict of counts by state: {"completed": 5, "executing": 2, ...} print(status.duration) # Time from start to completion (if finished) print(status.started_at) # When the first job started print(status.stopped_at) # When the last job finished (if complete) print(status.subs) # Dict of sub-workflow statuses ``` The `state` field reflects the overall workflow state: * `"executing"` — At least one job is still running or pending * `"completed"` — All jobs completed successfully * `"cancelled"` — At least one job was cancelled (and workflow isn't still executing) * `"discarded"` — At least one job was discarded (and workflow isn't still executing) ## Visualization Workflows are a type of [Directed Acyclic Graph][dag], which means we can represent them visually as nodes (jobs) connected by edges (dependencies). By converting workflows into standard graph description languages, we can create clear visual representations of job execution patterns. ### Converting to Graph Formats There are several built-in visualization options: * `to_graph()` — Returns a `Graph` object with vertices and edges * `to_dot()` — Generates [Graphviz][gv] DOT format * `to_mermaid()` — Produces [Mermaid][mermaid] flowchart markup For example, let's generate a mermaid flowchart from a workflow: ```python workflow = ( Workflow() .add("setup", SetupWorker.new({})) .add("process_a", ProcessWorker.new({}), deps="setup") .add("process_b", ProcessWorker.new({}), deps="setup") .add("finish", FinishWorker.new({}), deps=["process_a", "process_b"]) ) print(workflow.to_mermaid()) ``` This produces the following mermaid output, where each vertex shows the job name and worker: ```text flowchart TD setup["setup (SetupWorker)"] process_a["process_a (ProcessWorker)"] process_b["process_b (ProcessWorker)"] finish["finish (FinishWorker)"] setup --> process_a setup --> process_b process_a --> finish process_b --> finish ``` ### Rendering Visualizations You can render these visualizations in several ways: **Jupyter Notebooks** — Display mermaid diagrams using IPython's display capabilities or a mermaid extension: ```python from IPython.display import display, Markdown display(Markdown(f"```mermaid\n{workflow.to_mermaid()}```")) ``` **Command Line** — Use the `mmdc` (mermaid CLI) or `dot` (Graphviz) commands to generate images: ```bash # Using mermaid-cli python -c "from myapp import workflow; print(workflow.to_mermaid())" | mmdc -i - -o workflow.svg # Using Graphviz python -c "from myapp import workflow; print(workflow.to_dot())" | dot -Tsvg -o workflow.svg ``` **Documentation** — Many documentation tools (MkDocs, Sphinx with extensions, GitHub) render mermaid code blocks directly: ````markdown ```mermaid flowchart TD setup["setup (SetupWorker)"] process["process (ProcessWorker)"] setup --> process ``` ```` The visualization clearly shows how workflows fan-out and fan-in, making complex dependency relationships immediately apparent. [dag]: https://en.wikipedia.org/wiki/Directed_acyclic_graph [mermaid]: https://mermaid.js.org/ [gv]: https://graphviz.org ## Cancellation and Retry Workflows provide methods to cancel or retry jobs in bulk. ### Cancelling Jobs Cancel jobs in a workflow using `cancel_jobs()`: ```python workflow = Workflow.for_job(job) # Cancel all jobs in the workflow count = await workflow.cancel_jobs() # Cancel only specific named jobs count = await workflow.cancel_jobs(names=["step_2", "step_3"]) ``` Jobs in a final state (`completed`, `cancelled`, `discarded`) cannot be cancelled. For `executing` jobs, the database state is updated immediately, but running tasks are not forcefully terminated. ### Retrying Jobs Retry failed or cancelled jobs using `retry_jobs()`: ```python workflow = Workflow.for_job(job) # Retry all retryable jobs in the workflow count = await workflow.retry_jobs() # Retry only specific named jobs count = await workflow.retry_jobs(names=["failed_step"]) ``` Jobs that are `available` or `executing` are ignored. Jobs without dependencies are set to `available`, while jobs with dependencies are set to `suspended` to wait for their dependencies. ## API Reference ### `Workflow` Class #### Constructor ```python Workflow( id: str | None = None, name: str | None = None, check_deps: bool = True, ignore_cancelled: bool = False, ignore_discarded: bool = False, ) ``` * **id** — Custom workflow ID. Auto-generated UUID if not provided. * **name** — Optional name for categorizing workflows. * **check_deps** — Validate that dependencies exist when adding jobs. Defaults to `True`. * **ignore_cancelled** — Treat cancelled deps as completed. Defaults to `False`. * **ignore_discarded** — Treat discarded deps as completed. Defaults to `False`. #### Building Methods ```python add(name, job, *, deps=[], ignore_cancelled=None, ignore_discarded=None) ``` Add a named job to the workflow. * **name** — Unique name for this job within the workflow. * **job** — The job to add, created with `Worker.new(args)`. * **deps** — Job names this one depends on. String or list of strings. * **ignore_cancelled** — Per-job override for cancelled dependency handling. * **ignore_discarded** — Per-job override for discarded dependency handling. Returns the workflow for method chaining. --- ```python add_workflow(name, sub_workflow, *, deps=[]) ``` Add a sub-workflow with optional dependencies on parent jobs. * **name** — Unique name for this sub-workflow. * **sub_workflow** — The workflow to nest. * **deps** — Names of parent jobs the sub-workflow depends on. --- ```python add_many(name, jobs, *, deps=[]) ``` Add multiple jobs as a sub-workflow. Jobs in a list are named by index; jobs in a dict use keys. * **name** — Unique name for this sub-workflow. * **jobs** — List or dict of jobs. * **deps** — Names of parent jobs the sub-workflow depends on. --- ```python add_cascade(name, func_or_tuple, *, deps=[], **job_opts) ``` Add a cascade function that receives context and auto-records results. * **name** — Unique name for this cascade. * **func_or_tuple** — Either a function `(context) -> result` or a tuple `(items, func)` for fan-out. * **deps** — Names of jobs this cascade depends on. * **job_opts** — Additional job options (queue, priority, etc.). --- ```python add_graft(name, job, *, deps=[], **kwargs) ``` Add a grafting placeholder for attaching a sub-workflow at runtime. * **name** — Unique name for this graft point. * **job** — The grafter job that will call `apply_graft()`. * **deps** — Names of jobs this graft depends on. --- ```python add_context(value) ``` Store shared context data accessible to all cascade functions. * **value** — A dict of context data. #### Class Methods ```python Workflow.for_job(job) ``` Create a workflow instance from a job that belongs to a workflow. * **job** — A job with `workflow_id` in its meta. --- ```python Workflow.apply_graft(job, sub_workflow) ``` Attach a sub-workflow to a graft point. Must be called from within a grafter job's `process()`. * **job** — The currently executing grafter job. * **sub_workflow** — The sub-workflow to attach. Returns a workflow ready to be inserted with `oban.enqueue_many()`. #### Instance Methods ```python await get_job(name) ``` Fetch a single job from the workflow by name. Returns `Job | None`. --- ```python await all_jobs(*, names=None, deps_of=None) ``` Fetch jobs belonging to the workflow. * **names** — Filter to only return jobs with these names. * **deps_of** — Filter to only return jobs that are dependencies of this job. Returns a list of jobs. --- ```python await get_recorded(name) ``` Fetch the recorded return value from a workflow job by name. Returns the decoded value or `None`. --- ```python await all_recorded(*, names=None, deps_of=None) ``` Fetch all recorded return values from workflow jobs. * **names** — Filter to jobs with these names. * **deps_of** — Filter to jobs that are dependencies of this job. Returns a dict mapping job names to their recorded values. Sub-workflow results are nested. --- ```python await get_context() ``` Retrieve the workflow's context data set by `add_context()`. Returns the context dict or `None`. --- ```python await status() ``` Get current status of the workflow and all sub-workflows. Returns a `WorkflowStatus` object. Returned by `workflow.status()`. * **id** — The workflow ID. * **name** — The workflow name (if set). * **state** — Overall state: `"executing"`, `"completed"`, `"cancelled"`, `"discarded"`, or `"unknown"`. * **total** — Total number of jobs. * **counts** — Dict of job counts by state. * **duration** — Time from start to completion (`timedelta` or `None`). * **started_at** — When the first job started (`datetime` or `None`). * **stopped_at** — When the last job finished (`datetime` or `None`). * **subs** — Dict of sub-workflow statuses, keyed by sub-workflow name. --- ```python await cancel_jobs(*, names=None) ``` Cancel jobs in the workflow. Returns the number of jobs cancelled. * **names** — Filter to only cancel jobs with these names. --- ```python await retry_jobs(*, names=None) ``` Retry jobs in the workflow. Returns the number of jobs retried. * **names** — Filter to only retry jobs with these names. --- ```python to_graph() ``` Build a `Graph` representation of the workflow. Returns a `Graph` object with `vertices` and `edges` attributes. Sub-workflows are represented as nested `Graph` objects within vertices. --- ```python to_dot() ``` Convert the workflow to Graphviz DOT format. Returns a string suitable for rendering with Graphviz tools. --- ```python to_mermaid() ``` Convert the workflow to Mermaid flowchart format. Returns a string suitable for rendering in Mermaid-compatible viewers.