Oban Pro is a licensed add-on for the Oban job orchestration framework, providing advanced features for complex workflows. This is the initial release, bringing battle-tested patterns from Oban Pro for Elixir to the Python ecosystem with an async-native API.
Multi-Process Execution
Bypass Python's Global Interpreter Lock (GIL) and utilize multiple CPU cores for CPU-intensive
workloads. The obanpro start command launches workers with multi-process execution, distributing
jobs across multiple Python processes for true parallelism.
# Start with automatic core detection
obanpro start
# Or specify the number of processes
obanpro start --processes 8
# With setup/teardown hooks for each process
obanpro start --setup myapp.worker_setup --teardown myapp.worker_cleanup
Each process runs its own asyncio event loop, allowing you to scale CPU-bound work across all available cores without rewriting your async code.
Smart Concurrency
Control job execution with global concurrency limits, rate limiting, and queue partitioning—all coordinated across the entire cluster. Global Limits cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of how many workers you're running:
[queues.exports]
limit = 10
global_limit = 5
Rate Limiting controls job throughput over configurable time windows:
[queues.emails]
limit = 10
rate_limit = { allowed = 60, period = 60 }
Partitioning applies concurrency or rate limits per worker, tenant, or any argument key:
[queues.api_calls]
limit = 10
global_limit = 1
partition = { args = "tenant_id" }
Unique Jobs
Prevent duplicate jobs from being enqueued based on configurable fields. Supports time-based uniqueness periods, partial args matching, and state-based filtering.
from datetime import timedelta
from oban import worker
@worker(unique={"period": timedelta(hours=1), "keys": ["account_id"]})
class ImportWorker:
async def process(self, job):
return await import_account(job.args["account_id"])
Detect conflicts to handle duplicate insertions gracefully:
job = await ImportWorker.enqueue({"account_id": 123}, unique=True)
if job.extra.get("conflicted"):
print(f"Matched existing job {job.id}")
Relay
Dispatch jobs and await their results synchronously—even when jobs execute on different servers. Think of it as persistent, distributed async/await.
from oban import worker
@worker(queue="default")
class CalculatorWorker:
async def process(self, job):
return {"result": job.args["a"] + job.args["b"]}
result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)
print(result) # {"result": 42}
Perfect for request/response workflows and service coordination where you need synchronous semantics with the reliability of persistent job queues.
Workflows
Compose jobs with dependencies for sequential execution, fan-out parallelization, and fan-in convergence patterns.
from oban_pro import Workflow
workflow = (
Workflow()
.add("extract", ExtractWorker.new({"source": "db"}))
.add("transform", TransformWorker.new({}), deps="extract")
.add("load_a", LoadWorker.new({"dest": "warehouse"}), deps="transform")
.add("load_b", LoadWorker.new({"dest": "archive"}), deps="transform")
.add("notify", NotifyWorker.new({}), deps=["load_a", "load_b"])
)
await oban.enqueue_many(workflow)
Sub-Workflows with add_workflow() compose complex pipelines from reusable workflow patterns.
Collection Fan-Out with add_many() creates parallel jobs from lists or dictionaries.
Cascading Functions with add_cascade() automatically receive context and upstream results:
async def extract(context):
return {"records": await fetch_data(context["source"])}
async def transform(context):
return {"transformed": [process(r) for r in context["extract"]["records"]]}
workflow = (
Workflow()
.add_context({"source": "api", "batch_id": 123})
.add_cascade("extract", extract)
.add_cascade("transform", transform, deps="extract")
)
Runtime Grafting with add_graft() attaches sub-workflows dynamically when data becomes
available during execution—perfect for workflows where the structure depends on runtime data.