Release Notes

This release adds durable signals for pausing jobs on external events, aggregate progress tracking across workflows and their sub-workflows, and unique workflows that prevent duplicate runs, alongside various usability improvements.

Signals

Pause a job to wait for an external event (e.g. human approval, a third-party callback, or a downstream completion), then resume it exactly where it left off.

from datetime import timedelta
from oban import Cancel, worker
from oban_pro import SignalTimeout, await_signal
@worker()
class ApprovalWorker:
async def process(self, job):
try:
payload = await await_signal(wait_for=timedelta(days=1))
except SignalTimeout:
return Cancel("no decision")
if payload["decision"] == "approved":
await charge_card(job.args["user_id"])
else:
return Cancel(f"declined: {payload['decision']}")

Waiting jobs block for a configurable timeout so it can resume instantly, then snoozes and frees the worker until a signal arrives. To deliver a signal from anywhere with the job id:

from oban_pro import signal
await signal(job.id, {"decision": "approved"})

Signals are durable, a payload delivered before the job reaches await_signal is persisted and consumed on the next call. You can target a single job, a list of ids, or workflow steps by name with workflow.signal("approval", payload).

Workflow Status

Track progress across an entire workflow and all of its sub-workflows without scanning individual jobs. Per-state counts are maintained automatically as jobs transition, so a status check is a single lookup.

status = await workflow.status()
print(status.state) # "executing"
print(status.total) # 12
print(status.counts["completed"]) # 9
print(status.duration) # timedelta(seconds=42)

Nested sub-workflows are available under status.subs keyed by name:

print(status.subs["import"].state) # "completed"

In addition to manual status checks, workflow aggregates are used by Oban Web for highly efficient progress updates, filtering, and searching. Finally, completed workflow records are pruned automatically alongside their jobs, to keep tracking accurate without leaving stray rows.

Unique Workflows

Guarantee that only one instance of a named workflow runs at a time by declaring a workflow as unique. Concurrent enqueues with the same workflow name are rejected before they're inserted.

from oban_pro import Workflow
workflow = (
Workflow(name="nightly-import", unique=True)
.add("extract", ExtractWorker.new({}))
.add("load", LoadWorker.new({}), deps="extract")
)
await oban.enqueue_many(workflow)

Uniqueness is keyed by workflow name and enforced at the database level, so it holds even across nodes inserting simultaneously. When a matching workflow is still running, the new jobs come back flagged rather than persisted:

if all(job.extra.get("conflicted") for job in inserted):
print("nightly-import already running")

The name frees up for reuse as soon as the original workflow completes, and appending to an existing workflow by id always succeeds.

v0.6.0

Enhancements

  • [Workflow] Add signal and await_signal for external events

    Pause a job mid-execution with await_signal and resume it from anywhere with signal, turning long-running workers into durable state machines that wait for human approval, third-party callbacks, or any other out-of-band event without holding a worker process or DB connection.

  • [Workflow] Track aggregated workflow progress

    A new oban_workflows table is populated at insert time and maintained by triggers. Each row tracks per-state job counts, timestamps, and a derived state from counts and option flags.

  • [Workflow] Support unique workflows with conflict detection

    A new unique option ensures entire workflows can be unique, which prevents partial inserts of unique jobs within a workflow.

  • [Workflow] Prune workflow rows alongside jobs

    The pruner now removes oban_workflows rows once a workflow has finished and all of its jobs have been pruned, preventing tracking rows from accumulating forever. Active workflows and finished workflows whose jobs are still around are left untouched.

  • [Lifeline] Rescue stuck workflows using aggregate table

    Stuck-workflow rescue now reads from the new table instead of scanning oban_jobs, and repair_workflows accepts a max_age argument (default 60s) so callers can tune how long a suspended workflow sits before it gets repaired.

Bug Fixes

  • [Query] Isolate workflow flushing transaction from acking

    A flush failure rolled back the surrounding ack transaction, leaving jobs stranded in executing while their producers stayed healthy and their pending_acks were lost.