This release adds durable signals for pausing jobs on external events, aggregate progress tracking across workflows and their sub-workflows, and unique workflows that prevent duplicate runs, alongside various usability improvements.
Signals
Pause a job to wait for an external event (e.g. human approval, a third-party callback, or a downstream completion), then resume it exactly where it left off.
from datetime import timedelta
from oban import Cancel, worker
from oban_pro import SignalTimeout, await_signal
@worker()
class ApprovalWorker:
async def process(self, job):
try:
payload = await await_signal(wait_for=timedelta(days=1))
except SignalTimeout:
return Cancel("no decision")
if payload["decision"] == "approved":
await charge_card(job.args["user_id"])
else:
return Cancel(f"declined: {payload['decision']}")
Waiting jobs block for a configurable timeout so it can resume instantly, then snoozes and frees the worker until a signal arrives. To deliver a signal from anywhere with the job id:
from oban_pro import signal
await signal(job.id, {"decision": "approved"})
Signals are durable, a payload delivered before the job reaches await_signal is persisted and
consumed on the next call. You can target a single job, a list of ids, or workflow steps by name
with workflow.signal("approval", payload).
Workflow Status
Track progress across an entire workflow and all of its sub-workflows without scanning individual jobs. Per-state counts are maintained automatically as jobs transition, so a status check is a single lookup.
status = await workflow.status()
print(status.state) # "executing"
print(status.total) # 12
print(status.counts["completed"]) # 9
print(status.duration) # timedelta(seconds=42)
Nested sub-workflows are available under status.subs keyed by name:
print(status.subs["import"].state) # "completed"
In addition to manual status checks, workflow aggregates are used by Oban Web for highly efficient progress updates, filtering, and searching. Finally, completed workflow records are pruned automatically alongside their jobs, to keep tracking accurate without leaving stray rows.
Unique Workflows
Guarantee that only one instance of a named workflow runs at a time by declaring a workflow as
unique. Concurrent enqueues with the same workflow name are rejected before they're inserted.
from oban_pro import Workflow
workflow = (
Workflow(name="nightly-import", unique=True)
.add("extract", ExtractWorker.new({}))
.add("load", LoadWorker.new({}), deps="extract")
)
await oban.enqueue_many(workflow)
Uniqueness is keyed by workflow name and enforced at the database level, so it holds even across nodes inserting simultaneously. When a matching workflow is still running, the new jobs come back flagged rather than persisted:
if all(job.extra.get("conflicted") for job in inserted):
print("nightly-import already running")
The name frees up for reuse as soon as the original workflow completes, and appending to an existing workflow by id always succeeds.