Changelog#
Oban Pro is a licensed add-on for the Oban job orchestration framework, providing advanced features for complex workflows. This is the initial release, bringing battle-tested patterns from Oban Pro for Elixir to the Python ecosystem with an async-native API.
Multi-Process Execution#
Bypass Python’s Global Interpreter Lock (GIL) and utilize multiple CPU cores for CPU-intensive
workloads. The obanpro start command launches workers with multi-process execution, distributing
jobs across multiple Python processes for true parallelism.
# Start with automatic core detection
obanpro start
# Or specify the number of processes
obanpro start --processes 8
# With setup/teardown hooks for each process
obanpro start --setup myapp.worker_setup --teardown myapp.worker_cleanup
Each process runs its own asyncio event loop, allowing you to scale CPU-bound work across all available cores without rewriting your async code.
Smart Concurrency#
Control job execution with global concurrency limits, rate limiting, and queue partitioning—all coordinated across the entire cluster.
Global Limits cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of how many workers you’re running:
[queues.exports]
limit = 10
global_limit = 5
Rate Limiting controls job throughput over configurable time windows:
[queues.emails]
limit = 10
rate_limit = { allowed = 60, period = 60 }
Partitioning applies concurrency or rate limits per worker, tenant, or any argument key:
[queues.api_calls]
limit = 10
global_limit = 1
partition = { args = "tenant_id" }
Unique Jobs#
Prevent duplicate jobs from being enqueued based on configurable fields. Supports time-based uniqueness periods, partial args matching, and state-based filtering.
from datetime import timedelta
from oban import worker
@worker(unique={"period": timedelta(hours=1), "keys": ["account_id"]})
class ImportWorker:
async def process(self, job):
return await import_account(job.args["account_id"])
Detect conflicts to handle duplicate insertions gracefully:
job = await ImportWorker.enqueue({"account_id": 123}, unique=True)
if job.extra.get("conflicted"):
print(f"Matched existing job {job.id}")
Relay#
Dispatch jobs and await their results synchronously—even when jobs execute on different servers. Think of it as persistent, distributed async/await.
from oban import worker
@worker(queue="default")
class CalculatorWorker:
async def process(self, job):
return {"result": job.args["a"] + job.args["b"]}
result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)
print(result) # {"result": 42}
Perfect for request/response workflows and service coordination where you need synchronous semantics with the reliability of persistent job queues.
Workflows#
Compose jobs with dependencies for sequential execution, fan-out parallelization, and fan-in convergence patterns.
from oban_pro import Workflow
workflow = (
Workflow()
.add("extract", ExtractWorker.new({"source": "db"}))
.add("transform", TransformWorker.new({}), deps="extract")
.add("load_a", LoadWorker.new({"dest": "warehouse"}), deps="transform")
.add("load_b", LoadWorker.new({"dest": "archive"}), deps="transform")
.add("notify", NotifyWorker.new({}), deps=["load_a", "load_b"])
)
await oban.enqueue_many(workflow)
Sub-Workflows with add_workflow() compose complex pipelines from reusable workflow patterns.
Collection Fan-Out with add_many() creates parallel jobs from lists or dictionaries.
Cascading Functions with add_cascade() automatically receive context and upstream results:
async def extract(context):
return {"records": await fetch_data(context["source"])}
async def transform(context):
return {"transformed": [process(r) for r in context["extract"]["records"]]}
workflow = (
Workflow()
.add_context({"source": "api", "batch_id": 123})
.add_cascade("extract", extract)
.add_cascade("transform", transform, deps="extract")
)
Runtime Grafting with add_graft() attaches sub-workflows dynamically when data becomes
available during execution—perfect for workflows where the structure depends on runtime data.
v0.5.0 — 2025-01-21#
Features#
[CLI] The
obanpro installcommand for installing the Pro database schema.[CLI] The
obanpro uninstallcommand for removing the Pro database schema.[CLI] The
obanpro startcommand launches workers with multi-process execution, distributing jobs across multiple Python processes for true parallelism. Each process runs its own asyncio event loop, bypassing the GIL for CPU-intensive workloads.[CLI] The
--processesoption controls the number of worker processes. Defaults to the number of CPU cores when not specified.[CLI] The
--setupand--teardownoptions specify async functions to run when each worker process starts and stops, useful for initializing database pools or loading ML models.[Queues] Global concurrency limits with
global_limitto cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of node count.[Queues] Rate limiting with
rate_limitto control job throughput over time windows. Supports jobs per second, minute, hour, or custom periods.[Queues] Queue partitioning with
partitionto apply concurrency or rate limits per worker, args key, or meta key. Enables per-tenant or per-resource throttling.[Jobs] Unique job constraints to prevent enqueueing duplicate jobs based on configurable fields. Supports
periodfor time-based uniqueness,fieldsfor attribute comparison,keysfor partial args matching, andgroupfor state-based filtering.[Jobs] Conflict detection via
job.extra["conflicted"]to determine if an insert matched an existing job.[Workflow] Declarative job dependencies with
Workflow.add()for sequential execution, fan-out parallelization, and fan-in convergence patterns.[Workflow] Sub-workflows with
add_workflow()for composing complex pipelines from reusable workflow patterns.[Workflow] Collection fan-out with
add_many()for creating sub-workflows from lists or dicts of jobs.[Workflow] Cascading functions with
add_cascade()that automatically receive context and upstream results. Supports fan-out across collections with automatic result aggregation.[Workflow] Runtime grafting with
add_graft()for attaching sub-workflows dynamically when data becomes available during execution.[Workflow] Shared context with
add_context()for passing configuration to all cascade functions.[Workflow] Result sharing with
Record()return values andget_recorded()/all_recorded()for passing data between dependent jobs.[Workflow] Workflow introspection with
status(),all_jobs(), andget_job()for monitoring execution progress.[Workflow] Bulk operations with
cancel_jobs()andretry_jobs()for workflow-level control.[Workflow] Visualization with
to_mermaid()andto_dot()for generating flowchart diagrams.[Workflow] Configurable dependency handling with
ignore_cancelledandignore_discardedoptions.[Relay] Dispatch jobs and await results synchronously with
Worker.relay(), even when jobs execute on different servers. Think of it as persistent, distributed async/await.