Awaiting Signals#
Jobs can pause mid-execution to wait for an external decision and resume when a signal arrives. This turns long-running workers into durable state machines that wait for human approval, third-party callbacks, or any other out-of-band event without holding a worker process or database connection open.
Basic Usage#
Pause a job with await_signal from inside process and resume it with signal from anywhere.
For example, wait for approval before charging a credit card:
from datetime import timedelta
from oban import Cancel, worker
from oban_pro import SignalTimeout, await_signal
@worker()
class ApprovalWorker:
async def process(self, job):
try:
payload = await await_signal(wait_for=timedelta(days=1))
if payload["decision"] == "approved":
await charge_card(job.args["user_id"])
else:
return Cancel(f"declined: {payload['decision']}")
except SignalTimeout:
return Cancel("no decision")
Then signal the job by id when the decision arrives:
from oban_pro import signal
await signal(job_id, {"decision": "approved"})
A list of job ids can be used to deliver the same signal to multiple jobs at once:
await signal([job_a.id, job_b.id], {"decision": "approved"})
How It Works#
A short live wait (wait_timeout, default 5s) blocks during execution so signals that arrive
immediately resume in place. Beyond that window the job snoozes until either a signal arrives or
the overall wait_for deadline elapses, freeing the worker process during the wait.
Signals are persisted, so one delivered before await_signal is reached is consumed on the next
call. Payloads may be any value serializable by erlpack—they’re encoded as Erlang terms before
storage and decoded on retrieval, so the wire format stays compact and round-trips intact.
Signals are fire-and-forget. Delivery to a job in a terminal state, or a job that doesn’t exist, is a no-op.
Options#
wait_for — Total time to wait for a signal, measured from the first snooze. Accepts seconds as a float, a
timedelta, orNonefor an indefinite wait. Defaults toNone. The deadline is persisted to the job’s meta on first snooze so subsequent retry attempts honor the same wall-clock deadline.wait_timeout — Seconds to block during execution before snoozing the job. Pass
0to skip the live wait and snooze immediately. Defaults to5.0.
from datetime import timedelta
# Wait up to 30 minutes, blocking for 10s before snoozing.
payload = await await_signal(wait_for=timedelta(minutes=30), wait_timeout=10.0)
# Or pass seconds directly.
payload = await await_signal(wait_for=1800)
# Wait indefinitely (default).
payload = await await_signal()
Workflow Signals#
Workflow jobs can be signaled by their workflow job name, which is convenient when the caller knows the workflow but not the underlying job id:
from oban_pro import Workflow
workflow = (
Workflow()
.add("approval", ApprovalWorker.new({"user_id": 123}))
.add("charge", ChargeWorker.new({"user_id": 123}), deps=["approval"])
)
await oban.enqueue_many(workflow)
# Resolve "approval" to its job id and deliver the signal.
await workflow.signal("approval", {"decision": "approved"})
Several names can be signaled at once:
await workflow.signal(["reviewer_a", "reviewer_b"], {"vote": "yes"})
This turns workflows into durable state machines that wait for human approval, third-party callbacks, or any other out-of-band event between steps.
Exception Handling#
When the overall wait_for deadline elapses without a signal, await_signal raises
SignalTimeout:
from datetime import timedelta
from oban_pro import SignalTimeout
try:
payload = await await_signal(wait_for=timedelta(minutes=1))
proceed(payload)
except SignalTimeout:
return Cancel("no decision within a minute")
await_signal also raises RuntimeError if called outside of an executing job.