Changelog#
This release adds durable signals for pausing jobs on external events, aggregate progress tracking across workflows and their sub-workflows, and unique workflows that prevent duplicate runs, alongside various usability improvements.
Signals#
Pause a job to wait for an external event (e.g. human approval, a third-party callback, or a downstream completion), then resume it exactly where it left off.
from datetime import timedelta
from oban import Cancel, worker
from oban_pro import SignalTimeout, await_signal
@worker()
class ApprovalWorker:
async def process(self, job):
try:
payload = await await_signal(wait_for=timedelta(days=1))
except SignalTimeout:
return Cancel("no decision")
if payload["decision"] == "approved":
await charge_card(job.args["user_id"])
else:
return Cancel(f"declined: {payload['decision']}")
Waiting jobs block for a configurable timeout so it can resume instantly, then snoozes and frees the worker until a signal arrives. To deliver a signal from anywhere with the job id:
from oban_pro import signal
await signal(job.id, {"decision": "approved"})
Signals are durable, a payload delivered before the job reaches await_signal is persisted and
consumed on the next call. You can target a single job, a list of ids, or workflow steps by name
with workflow.signal("approval", payload).
Workflow Status#
Track progress across an entire workflow and all of its sub-workflows without scanning individual jobs. Per-state counts are maintained automatically as jobs transition, so a status check is a single lookup.
status = await workflow.status()
print(status.state) # "executing"
print(status.total) # 12
print(status.counts["completed"]) # 9
print(status.duration) # timedelta(seconds=42)
Nested sub-workflows are available under status.subs keyed by name:
print(status.subs["import"].state) # "completed"
In addition to manual status checks, workflow aggregates are used by Oban Web for highly efficient progress updates, filtering, and searching.
Finally, completed workflow records are pruned automatically alongside their jobs, to keep tracking accurate without leaving stray rows.
Unique Workflows#
Guarantee that only one instance of a named workflow runs at a time by declaring a workflow as
unique. Concurrent enqueues with the same workflow name are rejected before they’re inserted.
from oban_pro import Workflow
workflow = (
Workflow(name="nightly-import", unique=True)
.add("extract", ExtractWorker.new({}))
.add("load", LoadWorker.new({}), deps="extract")
)
await oban.enqueue_many(workflow)
Uniqueness is keyed by workflow name and enforced at the database level, so it holds even across nodes inserting simultaneously. When a matching workflow is still running, the new jobs come back flagged rather than persisted:
if all(job.extra.get("conflicted") for job in inserted):
print("nightly-import already running")
The name frees up for reuse as soon as the original workflow completes, and appending to an existing workflow by id always succeeds.
v0.6.1 — 2026-06-11#
Bug Fixes#
[CLI] Open a dedicated pool in each worker process
Multi-process execution shared the connection pool created in the main process with forked workers. Database connections aren’t fork-safe, so workers that queried the database through an Oban isntance drove a socket owned by another process and crashed with “invalid socket”.
Each engine now opens its own client-mode Oban (a fresh pool and query, no producers or supervision loops) after starting, so worker queries resolve to connections owned by that process. Worker pools default to a maximum size of 5 to keep the total connection count in check, since every process adds its own pool on top of the main one.
[Schema] Expose public
oban_pro.schemamoduleThe installation docs instruct Alembic and Django users to import from
oban_pro.schema, but the helpers lived in a private_schemamodule only reachable through the encrypted _core package, so the documented import failed.Add a top-level shim in the packaged distribution so
install_sql,uninstall_sql,install, anduninstallresolve via the documented path.
v0.6.0 — 2026-06-02#
Enhancements#
[Workflow] Add
signalandawait_signalfor external eventsPause a job mid-execution with
await_signaland resume it from anywhere with signal, turning long-running workers into durable state machines that wait for human approval, third-party callbacks, or any other out-of-band event without holding a worker process or DB connection.[Workflow] Track aggregated workflow progress
A new
oban_workflowstable is populated at insert time and maintained by triggers. Each row tracks per-state job counts, timestamps, and a derived state from counts and option flags.[Workflow] Support unique workflows with conflict detection
A new
uniqueoption ensures entire workflows can be unique, which prevents partial inserts of unique jobs within a workflow.[Workflow] Prune workflow rows alongside jobs
The pruner now removes
oban_workflowsrows once a workflow has finished and all of its jobs have been pruned, preventing tracking rows from accumulating forever. Active workflows and finished workflows whose jobs are still around are left untouched.[Lifeline] Rescue stuck workflows using aggregate table
Stuck-workflow rescue now reads from the new table instead of scanning
oban_jobs, andrepair_workflowsaccepts amax_ageargument (default 60s) so callers can tune how long a suspended workflow sits before it gets repaired.
Bug Fixes#
[Query] Isolate workflow flushing transaction from acking
A flush failure rolled back the surrounding ack transaction, leaving jobs stranded in
executingwhile their producers stayed healthy and their pending_acks were lost.