Changelog#
Oban Pro is a licensed add-on for the Oban job orchestration framework, providing advanced features for complex workflows. This is the initial release, bringing battle-tested patterns from Oban Pro for Elixir to the Python ecosystem with an async-native API.
Multi-Process Execution#
Bypass Python’s Global Interpreter Lock (GIL) and utilize multiple CPU cores for CPU-intensive
workloads. The obanpro start command launches workers with multi-process execution, distributing
jobs across multiple Python processes for true parallelism.
# Start with automatic core detection
obanpro start
# Or specify the number of processes
obanpro start --processes 8
# With setup/teardown hooks for each process
obanpro start --setup myapp.worker_setup --teardown myapp.worker_cleanup
Each process runs its own asyncio event loop, allowing you to scale CPU-bound work across all available cores without rewriting your async code.
Smart Concurrency#
Control job execution with global concurrency limits, rate limiting, and queue partitioning—all coordinated across the entire cluster.
Global Limits cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of how many workers you’re running:
[queues.exports]
limit = 10
global_limit = 5
Rate Limiting controls job throughput over configurable time windows:
[queues.emails]
limit = 10
rate_limit = { allowed = 60, period = 60 }
Partitioning applies concurrency or rate limits per worker, tenant, or any argument key:
[queues.api_calls]
limit = 10
global_limit = 1
partition = { args = "tenant_id" }
Unique Jobs#
Prevent duplicate jobs from being enqueued based on configurable fields. Supports time-based uniqueness periods, partial args matching, and state-based filtering.
from datetime import timedelta
from oban import worker
@worker(unique={"period": timedelta(hours=1), "keys": ["account_id"]})
class ImportWorker:
async def process(self, job):
return await import_account(job.args["account_id"])
Detect conflicts to handle duplicate insertions gracefully:
job = await ImportWorker.enqueue({"account_id": 123}, unique=True)
if job.extra.get("conflicted"):
print(f"Matched existing job {job.id}")
Relay#
Dispatch jobs and await their results synchronously—even when jobs execute on different servers. Think of it as persistent, distributed async/await.
from oban import worker
@worker(queue="default")
class CalculatorWorker:
async def process(self, job):
return {"result": job.args["a"] + job.args["b"]}
result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)
print(result) # {"result": 42}
Perfect for request/response workflows and service coordination where you need synchronous semantics with the reliability of persistent job queues.
Workflows#
Compose jobs with dependencies for sequential execution, fan-out parallelization, and fan-in convergence patterns.
from oban_pro import Workflow
workflow = (
Workflow()
.add("extract", ExtractWorker.new({"source": "db"}))
.add("transform", TransformWorker.new({}), deps="extract")
.add("load_a", LoadWorker.new({"dest": "warehouse"}), deps="transform")
.add("load_b", LoadWorker.new({"dest": "archive"}), deps="transform")
.add("notify", NotifyWorker.new({}), deps=["load_a", "load_b"])
)
await oban.enqueue_many(workflow)
Sub-Workflows with add_workflow() compose complex pipelines from reusable workflow patterns.
Collection Fan-Out with add_many() creates parallel jobs from lists or dictionaries.
Cascading Functions with add_cascade() automatically receive context and upstream results:
async def extract(context):
return {"records": await fetch_data(context["source"])}
async def transform(context):
return {"transformed": [process(r) for r in context["extract"]["records"]]}
workflow = (
Workflow()
.add_context({"source": "api", "batch_id": 123})
.add_cascade("extract", extract)
.add_cascade("transform", transform, deps="extract")
)
Runtime Grafting with add_graft() attaches sub-workflows dynamically when data becomes
available during execution—perfect for workflows where the structure depends on runtime data.
v0.5.2 — 2025-02-05#
Enhancements#
[Pro] Require oban v0.5.2 and support changes since v0.5.0
[Oban] Emit insert notifications after bulk job insertion
[Oban] Add transactional insertion with conn parameter support
Bug Fixes#
[CLI] Fix module discovery when running
obanpro startPreviously, worker modules in the current directory couldn’t be loaded when using the
obanpro startcommand, causing jobs to fail with import errors. This worked correctly withpython -m oban startbut not with the Oban Pro CLI.The CLI now properly adds the current working directory to the module search path, and
python -m oban_prois available as an alternative invocation method.
v0.5.1 — 2025-02-01#
Bug Fixes#
[Oban] Relax python requirement down to 3.12+
The previous requirement of 3.14+ was unnecessarily high and prohibited downloading the package with lower Python versions that are actually compatible.
v0.5.0 — 2025-01-21#
Features#
[CLI] The
obanpro installcommand for installing the Pro database schema.[CLI] The
obanpro uninstallcommand for removing the Pro database schema.[CLI] The
obanpro startcommand launches workers with multi-process execution, distributing jobs across multiple Python processes for true parallelism. Each process runs its own asyncio event loop, bypassing the GIL for CPU-intensive workloads.[CLI] The
--processesoption controls the number of worker processes. Defaults to the number of CPU cores when not specified.[CLI] The
--setupand--teardownoptions specify async functions to run when each worker process starts and stops, useful for initializing database pools or loading ML models.[Queues] Global concurrency limits with
global_limitto cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of node count.[Queues] Rate limiting with
rate_limitto control job throughput over time windows. Supports jobs per second, minute, hour, or custom periods.[Queues] Queue partitioning with
partitionto apply concurrency or rate limits per worker, args key, or meta key. Enables per-tenant or per-resource throttling.[Jobs] Unique job constraints to prevent enqueueing duplicate jobs based on configurable fields. Supports
periodfor time-based uniqueness,fieldsfor attribute comparison,keysfor partial args matching, andgroupfor state-based filtering.[Jobs] Conflict detection via
job.extra["conflicted"]to determine if an insert matched an existing job.[Workflow] Declarative job dependencies with
Workflow.add()for sequential execution, fan-out parallelization, and fan-in convergence patterns.[Workflow] Sub-workflows with
add_workflow()for composing complex pipelines from reusable workflow patterns.[Workflow] Collection fan-out with
add_many()for creating sub-workflows from lists or dicts of jobs.[Workflow] Cascading functions with
add_cascade()that automatically receive context and upstream results. Supports fan-out across collections with automatic result aggregation.[Workflow] Runtime grafting with
add_graft()for attaching sub-workflows dynamically when data becomes available during execution.[Workflow] Shared context with
add_context()for passing configuration to all cascade functions.[Workflow] Result sharing with
Record()return values andget_recorded()/all_recorded()for passing data between dependent jobs.[Workflow] Workflow introspection with
status(),all_jobs(), andget_job()for monitoring execution progress.[Workflow] Bulk operations with
cancel_jobs()andretry_jobs()for workflow-level control.[Workflow] Visualization with
to_mermaid()andto_dot()for generating flowchart diagrams.[Workflow] Configurable dependency handling with
ignore_cancelledandignore_discardedoptions.[Relay] Dispatch jobs and await results synchronously with
Worker.relay(), even when jobs execute on different servers. Think of it as persistent, distributed async/await.