Release Notes

Oban Pro is a licensed add-on for the Oban job orchestration framework, providing advanced features for complex workflows. This is the initial release, bringing battle-tested patterns from Oban Pro for Elixir to the Python ecosystem with an async-native API.

Multi-Process Execution

Bypass Python's Global Interpreter Lock (GIL) and utilize multiple CPU cores for CPU-intensive workloads. The obanpro start command launches workers with multi-process execution, distributing jobs across multiple Python processes for true parallelism.

# Start with automatic core detection
obanpro start
# Or specify the number of processes
obanpro start --processes 8
# With setup/teardown hooks for each process
obanpro start --setup myapp.worker_setup --teardown myapp.worker_cleanup

Each process runs its own asyncio event loop, allowing you to scale CPU-bound work across all available cores without rewriting your async code.

Smart Concurrency

Control job execution with global concurrency limits, rate limiting, and queue partitioning—all coordinated across the entire cluster. Global Limits cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of how many workers you're running:

[queues.exports]
limit = 10
global_limit = 5

Rate Limiting controls job throughput over configurable time windows:

[queues.emails]
limit = 10
rate_limit = { allowed = 60, period = 60 }

Partitioning applies concurrency or rate limits per worker, tenant, or any argument key:

[queues.api_calls]
limit = 10
global_limit = 1
partition = { args = "tenant_id" }

Unique Jobs

Prevent duplicate jobs from being enqueued based on configurable fields. Supports time-based uniqueness periods, partial args matching, and state-based filtering.

from datetime import timedelta
from oban import worker
@worker(unique={"period": timedelta(hours=1), "keys": ["account_id"]})
class ImportWorker:
async def process(self, job):
return await import_account(job.args["account_id"])

Detect conflicts to handle duplicate insertions gracefully:

job = await ImportWorker.enqueue({"account_id": 123}, unique=True)
if job.extra.get("conflicted"):
print(f"Matched existing job {job.id}")

Relay

Dispatch jobs and await their results synchronously—even when jobs execute on different servers. Think of it as persistent, distributed async/await.

from oban import worker
@worker(queue="default")
class CalculatorWorker:
async def process(self, job):
return {"result": job.args["a"] + job.args["b"]}
result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)
print(result)  # {"result": 42}

Perfect for request/response workflows and service coordination where you need synchronous semantics with the reliability of persistent job queues.

Workflows

Compose jobs with dependencies for sequential execution, fan-out parallelization, and fan-in convergence patterns.

from oban_pro import Workflow
workflow = (
Workflow()
.add("extract", ExtractWorker.new({"source": "db"}))
.add("transform", TransformWorker.new({}), deps="extract")
.add("load_a", LoadWorker.new({"dest": "warehouse"}), deps="transform")
.add("load_b", LoadWorker.new({"dest": "archive"}), deps="transform")
.add("notify", NotifyWorker.new({}), deps=["load_a", "load_b"])
)
await oban.enqueue_many(workflow)

Sub-Workflows with add_workflow() compose complex pipelines from reusable workflow patterns. Collection Fan-Out with add_many() creates parallel jobs from lists or dictionaries. Cascading Functions with add_cascade() automatically receive context and upstream results:

async def extract(context):
return {"records": await fetch_data(context["source"])}
async def transform(context):
return {"transformed": [process(r) for r in context["extract"]["records"]]}
workflow = (
Workflow()
.add_context({"source": "api", "batch_id": 123})
.add_cascade("extract", extract)
.add_cascade("transform", transform, deps="extract")
)

Runtime Grafting with add_graft() attaches sub-workflows dynamically when data becomes available during execution—perfect for workflows where the structure depends on runtime data.

v0.5.0

Features

  • [CLI] The obanpro install command for installing the Pro database schema.

  • [CLI] The obanpro uninstall command for removing the Pro database schema.

  • [CLI] The obanpro start command launches workers with multi-process execution, distributing jobs across multiple Python processes for true parallelism. Each process runs its own asyncio event loop, bypassing the GIL for CPU-intensive workloads.

  • [CLI] The --processes option controls the number of worker processes. Defaults to the number of CPU cores when not specified.

  • [CLI] The --setup and --teardown options specify async functions to run when each worker process starts and stops, useful for initializing database pools or loading ML models.

  • [Queues] Global concurrency limits with global_limit to cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of node count.

  • [Queues] Rate limiting with rate_limit to control job throughput over time windows. Supports jobs per second, minute, hour, or custom periods.

  • [Queues] Queue partitioning with partition to apply concurrency or rate limits per worker, args key, or meta key. Enables per-tenant or per-resource throttling.

  • [Jobs] Unique job constraints to prevent enqueueing duplicate jobs based on configurable fields. Supports period for time-based uniqueness, fields for attribute comparison, keys for partial args matching, and group for state-based filtering.

  • [Jobs] Conflict detection via job.extra["conflicted"] to determine if an insert matched an existing job.

  • [Workflow] Declarative job dependencies with Workflow.add() for sequential execution, fan-out parallelization, and fan-in convergence patterns.

  • [Workflow] Sub-workflows with add_workflow() for composing complex pipelines from reusable workflow patterns.

  • [Workflow] Collection fan-out with add_many() for creating sub-workflows from lists or dicts of jobs.

  • [Workflow] Cascading functions with add_cascade() that automatically receive context and upstream results. Supports fan-out across collections with automatic result aggregation.

  • [Workflow] Runtime grafting with add_graft() for attaching sub-workflows dynamically when data becomes available during execution.

  • [Workflow] Shared context with add_context() for passing configuration to all cascade functions.

  • [Workflow] Result sharing with Record() return values and get_recorded() / all_recorded() for passing data between dependent jobs.

  • [Workflow] Workflow introspection with status(), all_jobs(), and get_job() for monitoring execution progress.

  • [Workflow] Bulk operations with cancel_jobs() and retry_jobs() for workflow-level control.

  • [Workflow] Visualization with to_mermaid() and to_dot() for generating flowchart diagrams.

  • [Workflow] Configurable dependency handling with ignore_cancelled and ignore_discarded options.

  • [Relay] Dispatch jobs and await results synchronously with Worker.relay(), even when jobs execute on different servers. Think of it as persistent, distributed async/await.