Changelog#

Oban Pro is a licensed add-on for the Oban job orchestration framework, providing advanced features for complex workflows. This is the initial release, bringing battle-tested patterns from Oban Pro for Elixir to the Python ecosystem with an async-native API.

Multi-Process Execution#

Bypass Python’s Global Interpreter Lock (GIL) and utilize multiple CPU cores for CPU-intensive workloads. The obanpro start command launches workers with multi-process execution, distributing jobs across multiple Python processes for true parallelism.

# Start with automatic core detection
obanpro start

# Or specify the number of processes
obanpro start --processes 8

# With setup/teardown hooks for each process
obanpro start --setup myapp.worker_setup --teardown myapp.worker_cleanup

Each process runs its own asyncio event loop, allowing you to scale CPU-bound work across all available cores without rewriting your async code.

Smart Concurrency#

Control job execution with global concurrency limits, rate limiting, and queue partitioning—all coordinated across the entire cluster.

Global Limits cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of how many workers you’re running:

[queues.exports]
limit = 10
global_limit = 5

Rate Limiting controls job throughput over configurable time windows:

[queues.emails]
limit = 10
rate_limit = { allowed = 60, period = 60 }

Partitioning applies concurrency or rate limits per worker, tenant, or any argument key:

[queues.api_calls]
limit = 10
global_limit = 1
partition = { args = "tenant_id" }

Unique Jobs#

Prevent duplicate jobs from being enqueued based on configurable fields. Supports time-based uniqueness periods, partial args matching, and state-based filtering.

from datetime import timedelta
from oban import worker

@worker(unique={"period": timedelta(hours=1), "keys": ["account_id"]})
class ImportWorker:
    async def process(self, job):
        return await import_account(job.args["account_id"])

Detect conflicts to handle duplicate insertions gracefully:

job = await ImportWorker.enqueue({"account_id": 123}, unique=True)

if job.extra.get("conflicted"):
    print(f"Matched existing job {job.id}")

Relay#

Dispatch jobs and await their results synchronously—even when jobs execute on different servers. Think of it as persistent, distributed async/await.

from oban import worker

@worker(queue="default")
class CalculatorWorker:
    async def process(self, job):
        return {"result": job.args["a"] + job.args["b"]}

result = await CalculatorWorker.relay({"a": 10, "b": 32}, timeout=5.0)

print(result)  # {"result": 42}

Perfect for request/response workflows and service coordination where you need synchronous semantics with the reliability of persistent job queues.

Workflows#

Compose jobs with dependencies for sequential execution, fan-out parallelization, and fan-in convergence patterns.

from oban_pro import Workflow

workflow = (
    Workflow()
    .add("extract", ExtractWorker.new({"source": "db"}))
    .add("transform", TransformWorker.new({}), deps="extract")
    .add("load_a", LoadWorker.new({"dest": "warehouse"}), deps="transform")
    .add("load_b", LoadWorker.new({"dest": "archive"}), deps="transform")
    .add("notify", NotifyWorker.new({}), deps=["load_a", "load_b"])
)

await oban.enqueue_many(workflow)

Sub-Workflows with add_workflow() compose complex pipelines from reusable workflow patterns. Collection Fan-Out with add_many() creates parallel jobs from lists or dictionaries.

Cascading Functions with add_cascade() automatically receive context and upstream results:

async def extract(context):
    return {"records": await fetch_data(context["source"])}

async def transform(context):
    return {"transformed": [process(r) for r in context["extract"]["records"]]}

workflow = (
    Workflow()
    .add_context({"source": "api", "batch_id": 123})
    .add_cascade("extract", extract)
    .add_cascade("transform", transform, deps="extract")
)

Runtime Grafting with add_graft() attaches sub-workflows dynamically when data becomes available during execution—perfect for workflows where the structure depends on runtime data.

v0.5.0 — 2025-01-21#

Features#

  • [CLI] The obanpro install command for installing the Pro database schema.

  • [CLI] The obanpro uninstall command for removing the Pro database schema.

  • [CLI] The obanpro start command launches workers with multi-process execution, distributing jobs across multiple Python processes for true parallelism. Each process runs its own asyncio event loop, bypassing the GIL for CPU-intensive workloads.

  • [CLI] The --processes option controls the number of worker processes. Defaults to the number of CPU cores when not specified.

  • [CLI] The --setup and --teardown options specify async functions to run when each worker process starts and stops, useful for initializing database pools or loading ML models.

  • [Queues] Global concurrency limits with global_limit to cap concurrent jobs across all nodes, ensuring cluster-wide limits regardless of node count.

  • [Queues] Rate limiting with rate_limit to control job throughput over time windows. Supports jobs per second, minute, hour, or custom periods.

  • [Queues] Queue partitioning with partition to apply concurrency or rate limits per worker, args key, or meta key. Enables per-tenant or per-resource throttling.

  • [Jobs] Unique job constraints to prevent enqueueing duplicate jobs based on configurable fields. Supports period for time-based uniqueness, fields for attribute comparison, keys for partial args matching, and group for state-based filtering.

  • [Jobs] Conflict detection via job.extra["conflicted"] to determine if an insert matched an existing job.

  • [Workflow] Declarative job dependencies with Workflow.add() for sequential execution, fan-out parallelization, and fan-in convergence patterns.

  • [Workflow] Sub-workflows with add_workflow() for composing complex pipelines from reusable workflow patterns.

  • [Workflow] Collection fan-out with add_many() for creating sub-workflows from lists or dicts of jobs.

  • [Workflow] Cascading functions with add_cascade() that automatically receive context and upstream results. Supports fan-out across collections with automatic result aggregation.

  • [Workflow] Runtime grafting with add_graft() for attaching sub-workflows dynamically when data becomes available during execution.

  • [Workflow] Shared context with add_context() for passing configuration to all cascade functions.

  • [Workflow] Result sharing with Record() return values and get_recorded() / all_recorded() for passing data between dependent jobs.

  • [Workflow] Workflow introspection with status(), all_jobs(), and get_job() for monitoring execution progress.

  • [Workflow] Bulk operations with cancel_jobs() and retry_jobs() for workflow-level control.

  • [Workflow] Visualization with to_mermaid() and to_dot() for generating flowchart diagrams.

  • [Workflow] Configurable dependency handling with ignore_cancelled and ignore_discarded options.

  • [Relay] Dispatch jobs and await results synchronously with Worker.relay(), even when jobs execute on different servers. Think of it as persistent, distributed async/await.