Managing Jobs#
Oban provides methods to manage jobs after they’ve been inserted. You can cancel jobs to prevent execution, retry failed jobs, delete jobs from the database, or update job fields.
Cancelling Jobs#
Cancel a job to prevent it from running and potentially stop it while it is executing:
# Cancel with the job's id
await oban.cancel_job(123)
# Or with a job instance
await oban.cancel_job(job)
Only incomplete jobs, ones that aren’t completed, discarded, or already cancelled, can be
cancelled. Cancellation changes the job status to cancelled and records a timestamp in the
database.
For jobs that are already executing, the database state is updated immediately, but the running task isn’t forcefully terminated. Workers should check for cancellation at safe points and stop gracefully:
class MyWorker(Worker):
async def process(self, job):
for item in dataset:
if job.cancelled():
return Cancel("Job was cancelled")
await process_item(item)
To cancel multiple jobs at once:
count = await oban.cancel_many_jobs([123, 456, 789])
print(f"Cancelled {count} jobs")
Retrying Jobs#
Retrying changes the job’s status to available, making it ready for immediate execution:
# Cancel with the job's id
await oban.retry_job(123)
# Or with a job instance
await oban.retry_job(job)
Jobs that are already available or executing are ignored. If the job has already exhausted
its max_attempts, the limit is automatically increased to allow the retry.
To retry multiple jobs at once:
count = await oban.retry_many_jobs([123, 456, 789])
print(f"Retried {count} jobs")
Tip
The term “retry” is a slight misnomer because any non-executing job can be retried-not just
failed ones. For example, you can retry a scheduled job to make it available immediately,
whether it has ever ran before.
Deleting Jobs#
Deletion permanently removes a job from the database. This is useful for cleaning up jobs that were inserted by mistake, contain sensitive data that shouldn’t be retained, or are no longer relevant and you don’t want them to count toward historical metrics.
To delete a job:
await oban.delete_job(123)
# Or with a job instance
await oban.delete_job(job)
Jobs in the executing state cannot be deleted. If you need to stop an executing job, cancel it
first and then delete it afterwards. Cancelling is preferred over deletion because it retains a
record that the job was purposefully stopped.
To delete multiple jobs at once:
count = await oban.delete_many_jobs([123, 456, 789])
print(f"Deleted {count} jobs")
Updating Jobs#
Updating lets you modify a job’s fields after insertion. Common use cases include:
Reprioritizing: Bump a job’s priority in response to user action or business rules
Rescheduling: Delay a job that’s waiting on an external dependency
Reassigning: Move a job to a different queue based on its content or current load
Annotating: Add metadata for tracking, debugging, or coordination between jobs
To update a job:
await oban.update_job(job, {"priority": 0, "tags": ["urgent"]})
# Or by job ID
await oban.update_job(123, {"queue": "critical"})
The following fields can be updated:
Field |
Description |
|---|---|
|
Job arguments |
|
Maximum retry attempts |
|
Arbitrary metadata |
|
Job priority (0-9, lower is higher priority) |
|
Target queue name |
|
When the job should run |
|
List of string tags |
|
Worker class name |
For scheduling convenience, you can use schedule_in instead of scheduled_at. For example, to
make a scheduled job run right away:
await oban.update_job(job, {"schedule_in": 0})
Dynamic Updates with Callables#
For updates that depend on the current job state, pass a callable that receives the job and returns a dict of changes:
# Append to existing tags
await oban.update_job(job, lambda job: {"tags": job.tags + ["processed"]})
# Merge into existing meta
await oban.update_job(job, lambda job: {
"meta": {**job.meta, "reviewed_at": datetime.now().isoformat()}
})
To update multiple jobs at once:
jobs = await oban.update_many_jobs([123, 456, 789], {"priority": 0})
With a callable, each job is updated individually based on its current state:
jobs = await oban.update_many_jobs(
job_ids,
lambda job: {"meta": {**job.meta, "batch_id": batch_id}}
)
Warning
Use caution when updating jobs that are currently executing. Modifying fields like args,
queue, or worker while a job is running may lead to unexpected behavior. Consider cancelling
the job first, or deferring the update until after execution completes.