API Reference#

This page contains auto-generated API documentation for Oban.

Oban#

class oban.Oban[source]#

Bases: object

__init__(*, pool, dispatcher=None, leadership=None, lifeline={}, name=None, node=None, notifier=None, prefix=None, pruner={}, queues=None, refresher={}, scheduler={}, stager={})[source]#

Initialize an Oban instance.

Oban can run in two modes:

  • Server mode: When queues are configured, this instance processes jobs. Leadership is enabled by default to coordinate cluster-wide operations.

  • Client mode: When no queues are configured, this instance only enqueues jobs. Leadership is disabled by default.

Parameters:
  • pool (Any) – Database connection pool (e.g., AsyncConnectionPool)

  • leadership (bool | None) – Enable leadership election (default: True if queues configured, False otherwise)

  • lifeline (dict[str, Any]) – Lifeline config options: interval (default: 60.0)

  • name (str | None) – Name for this instance in the registry (default: “oban”)

  • node (str | None) – Node identifier for this instance (default: socket.gethostname())

  • notifier (Notifier | None) – Notifier instance for pub/sub (default: PostgresNotifier with default config)

  • prefix (str | None) – PostgreSQL schema where Oban tables are located (default: “public”)

  • pruner (dict[str, Any]) – Pruning config options: max_age in seconds (default: 86_400.0, 1 day), interval (default: 60.0), limit (default: 20_000).

  • queues (dict[str, int | dict[str, Any]] | None) – Queue names mapped to worker limits (default: {})

  • refresher (dict[str, Any]) – Refresher config options: interval (default: 15.0), max_age (default: 60.0)

  • scheduler (dict[str, Any]) – Scheduler config options: timezone (default: “UTC”)

  • stager (dict[str, Any]) – Stager config options: interval (default: 1.0), limit (default: 20_000)

  • dispatcher (Any)

Return type:

None

async static create_pool(dsn=None, *, min_size=None, max_size=None, timeout=None)[source]#

Create a connection pool for use with Oban.

This is a convenience method that creates and opens an AsyncConnectionPool. Configuration is loaded from multiple sources in order of precedence (lowest to highest):

  1. oban.toml in the current directory

  2. Environment variables (OBAN_DSN, OBAN_POOL_MIN_SIZE, etc.)

  3. Explicit arguments passed to this method

The caller is responsible for closing the pool when done.

Parameters:
  • dsn (str | None) – PostgreSQL connection string (e.g., “postgresql://localhost/mydb”)

  • min_size (int | None) – Minimum number of connections to keep open (default: 1)

  • max_size (int | None) – Maximum number of connections in the pool (default: 10)

  • timeout (float | None) – Timeout in seconds for acquiring a connection (default: 30.0)

Return type:

AsyncConnectionPool

Returns:

An open AsyncConnectionPool ready for use

Example

Using configuration from oban.toml or environment:

>>> pool = await Oban.create_pool()
>>> async with Oban(pool=pool) as oban:
...     await oban.enqueue(SomeWorker.new({"key": "value"}))
>>> await pool.close()

Overriding the dsn:

>>> pool = await Oban.create_pool("postgresql://localhost/mydb")
property is_leader: bool#

Check if this node is currently the leader.

Returns False if leadership is not enabled for this instance. Otherwise, it indicates whether this instance is acting as leader.

Example

>>> async with Oban(pool=pool, leadership=True) as oban:
...     if oban.is_leader:
...         # Perform leader-only operation
async start()[source]#

Start the Oban instance and begin processing jobs.

This starts all internal processes including the notifier, leader election, job staging, scheduling, and queue producers. When queues are configured, it also verifies that the required database tables exist.

Return type:

Oban

Returns:

The started Oban instance for method chaining

Raises:

RuntimeError – If required database tables are missing (run migrations first)

Example

For most use cases, prefer using Oban as an async context manager:

>>> async with Oban(pool=pool, queues={"default": 10}) as oban:
...     await oban.enqueue(MyWorker.new({"id": 1}))

Use explicit start/stop for more control over the lifecycle:

>>> oban = Oban(pool=pool, queues={"default": 10})
>>> await oban.start()
>>> # ... application runs ...
>>> await oban.stop()
async stop()[source]#

Stop the Oban instance and gracefully shut down all processes.

This stops all internal processes including queue producers, the notifier, leader election, and background tasks. Running jobs are allowed to complete before producers fully stop.

Calling stop on an instance that was never started is safe and returns immediately.

Example

For most use cases, prefer using Oban as an async context manager:

>>> async with Oban(pool=pool, queues={"default": 10}) as oban:
...     await oban.enqueue(MyWorker.new({"id": 1}))

Use explicit start/stop for more control over the lifecycle:

>>> oban = Oban(pool=pool, queues={"default": 10})
>>> await oban.start()
>>> # ... application runs ...
>>> await oban.stop()
Return type:

None

async enqueue(job)[source]#

Enqueue a job in the database for processing.

Parameters:

job (Job) – A Job instance created via Worker.new()

Return type:

Job

Returns:

The inserted job with database-assigned values (id, timestamps, state)

Example

>>> job = EmailWorker.new({"to": "[email protected]", "subject": "Welcome"})
>>> await oban.enqueue(job)

For convenience, you can also use Worker.enqueue() directly:

>>> await EmailWorker.enqueue({"to": "[email protected]", "subject": "Welcome"})
async enqueue_many(jobs_or_first, /, *rest)[source]#

Insert multiple jobs into the database in a single operation.

This is more efficient than calling enqueue() multiple times as it uses a single database query to insert all jobs.

Parameters:
  • jobs_or_first (Iterable[Job] | Job) – Either an iterable of jobs, or the first job when using variadic arguments

  • *rest (Job) – Additional jobs when using variadic arguments

Return type:

list[Job]

Returns:

The inserted jobs with database-assigned values (id, timestamps, state)

Example

>>> job1 = EmailWorker.new({"to": "[email protected]"})
>>> job2 = EmailWorker.new({"to": "[email protected]"})
>>> job3 = EmailWorker.new({"to": "[email protected]"})
>>> await oban.enqueue_many(job1, job2, job3)
>>> # Or with an iterable:
>>> await oban.enqueue_many([job1, job2, job3])
>>> await oban.enqueue_many(Worker.new({"id": id}) for id in range(10))
async get_job(job_id)[source]#

Fetch a job by its ID.

Parameters:

job_id (int) – The ID of the job to fetch

Return type:

Job | None

Returns:

The Job if found, None otherwise

Example

>>> job = await oban.get_job(123)
>>> if job:
...     print(f"Job state: {job.state}")
async retry_job(job)[source]#

Retry a job by setting it as available for execution.

Jobs currently available or executing are ignored. The job is scheduled for immediate execution, with max_attempts increased if already maxed out.

Parameters:

job (Job | int) – A Job instance or job ID to retry

Return type:

None

Example

Retry a job by ID:

>>> await oban.retry_job(123)

Retry a job instance:

>>> await oban.retry_job(job)
async retry_many_jobs(jobs)[source]#

Retry multiple jobs by setting them as available for execution.

Jobs currently available or executing are ignored. Jobs are scheduled for immediate execution, with max_attempts increased if already maxed out.

Parameters:

jobs (list[Job | int]) – List of Job instances or job IDs to retry

Return type:

int

Returns:

The number of jobs updated

Example

Retry multiple jobs by ID:

>>> count = await oban.retry_many_jobs([123, 456, 789])

Retry multiple job instances:

>>> count = await oban.retry_many_jobs([job_1, job_2, job_3])
async delete_job(job)[source]#

Delete a job from the database.

Jobs in the executing state cannot be deleted and are ignored.

Parameters:

job (Job | int) – A Job instance or job ID to delete

Return type:

None

Example

Delete a job by ID:

>>> await oban.delete_job(123)

Delete a job instance:

>>> await oban.delete_job(job)
async delete_many_jobs(jobs)[source]#

Delete multiple jobs from the database.

Jobs in the executing state cannot be deleted and are ignored.

Parameters:

jobs (list[Job | int]) – List of Job instances or job IDs to delete

Return type:

int

Returns:

The number of jobs deleted

Example

Delete multiple jobs by ID:

>>> count = await oban.delete_many_jobs([123, 456, 789])

Delete multiple job instances:

>>> count = await oban.delete_many_jobs([job_1, job_2, job_3])
async cancel_job(job)[source]#

Cancel a job to prevent it from running.

Jobs are marked as cancelled. Only jobs with the statuses executing, available, scheduled, or retryable can be cancelled.

For executing jobs, the database state is updated immediately, but the running task is not forcefully terminated. Workers should check for cancellation at safe points and stop gracefully by calling job.cancelled():

>>> async def process(self, job):
...     for item in dataset:
...         if job.cancelled():
...             return Cancel("Job was cancelled")
...         await process_item(item)
Parameters:

job (Job | int) – A Job instance or job ID to cancel

Return type:

None

Example

Cancel a job by ID:

>>> await oban.cancel_job(123)

Cancel a job instance:

>>> await oban.cancel_job(job)
async cancel_many_jobs(jobs)[source]#

Cancel multiple jobs to prevent them from running.

Jobs are marked as cancelled. Only jobs with the statuses executing, available, scheduled, or retryable can be cancelled.

For executing jobs, the database state is updated immediately, but running tasks are not forcefully terminated. Workers should check for cancellation at safe points and stop gracefully by calling job.cancelled():

>>> async def process(self, job):
...     for item in dataset:
...         if job.cancelled():
...             return Cancel("Job was cancelled")
...         await process_item(item)
Parameters:

jobs (list[Job | int]) – List of Job instances or job IDs to cancel

Return type:

int

Returns:

The number of jobs cancelled

Example

Cancel multiple jobs by ID:

>>> count = await oban.cancel_many_jobs([123, 456, 789])

Cancel multiple job instances:

>>> count = await oban.cancel_many_jobs([job_1, job_2, job_3])
async update_job(job, changes)[source]#

Update a job with the given changes.

This function accepts either a job instance or id, along with either a dict of changes or a callable that receives the job and returns a dict of changes.

The update operation is wrapped in a transaction with a locking clause to prevent concurrent modifications.

Fields and Validations:

All changes are validated using the same validations as Job.new(). Only the following subset of fields can be updated:

  • args

  • max_attempts

  • meta

  • priority

  • queue

  • scheduled_at

  • tags

  • worker

Warning

Use caution when updating jobs that are currently executing. Modifying fields like args, queue, or worker while a job is running may lead to unexpected behavior or inconsistent state. Consider whether the job should be cancelled first, or if the update should be deferred until after execution completes.

Parameters:
  • job (Job | int) – A Job instance or job ID to update

  • changes (dict[str, Any] | Callable[[Job], dict[str, Any]]) – Either a dict of field changes, or a callable that takes the job and returns a dict of changes

Return type:

Job

Returns:

The updated job with all current field values

Example

Update a job with a dict of changes:

>>> await oban.update_job(job, {"tags": ["urgent"], "priority": 0})

Update a job by id:

>>> await oban.update_job(123, {"tags": ["processed"], "meta": {"batch_id": 456}})

Update a job using a callable:

>>> await oban.update_job(job, lambda j: {"tags": ["retry"] + j.tags})
>>> from datetime import datetime
>>> await oban.update_job(job, lambda j: {
...     "meta": {**j.meta, "processed_at": datetime.now()}
... })

Use schedule_in for convenient scheduling:

>>> await oban.update_job(job, {"schedule_in": 300})  # 5 minutes from now
async update_many_jobs(jobs, changes)[source]#

Update multiple jobs with the given changes.

This function accepts a list of job instances or ids, along with either a dict of changes or a callable that receives each job and returns a dict of changes.

The update operation is wrapped in a transaction with a locking clause to prevent concurrent modifications.

Fields and Validations:

All changes are validated using the same validations as Job.new(). Only the following subset of fields can be updated:

  • args

  • max_attempts

  • meta

  • priority

  • queue

  • scheduled_at

  • tags

  • worker

Warning

Use caution when updating jobs that are currently executing. Modifying fields like args, queue, or worker while a job is running may lead to unexpected behavior or inconsistent state. Consider whether the job should be cancelled first, or if the update should be deferred until after execution completes.

Parameters:
  • jobs (list[Job | int]) – List of Job instances or job IDs to update

  • changes (dict[str, Any] | Callable[[Job], dict[str, Any]]) – Either a dict of field changes, or a callable that takes a job and returns a dict of changes

Return type:

list[Job]

Returns:

The updated jobs with all current field values

Example

Update multiple jobs with a dict of changes:

>>> await oban.update_many_jobs([job1, job2], {"priority": 0})

Update multiple jobs by ID:

>>> await oban.update_many_jobs([123, 456], {"tags": ["processed"]})

Update multiple jobs using a callable:

>>> await oban.update_many_jobs(
...     [job1, job2],
...     lambda job: {"tags": ["retry"] + job.tags}
... )
async pause_queue(queue, *, node=None)[source]#

Pause a queue, preventing it from executing new jobs.

All running jobs will remain running until they are finished.

Parameters:
  • queue (str) – The name of the queue to pause

  • node (str | None) – Specific node name to pause. If not provided, pauses across all nodes.

Return type:

None

Example

Pause the default queue across all nodes:

>>> await oban.pause_queue("default")

Pause the default queue only on a particular node:

>>> await oban.pause_queue("default", node="worker.1")
async resume_queue(queue, *, node=None)[source]#

Resume a paused queue, allowing it to execute jobs again.

Parameters:
  • queue (str) – The name of the queue to resume

  • node (str | None) – Specific node name to resume. If not provided, resumes across all nodes.

Return type:

None

Example

Resume the default queue across all nodes:

>>> await oban.resume_queue("default")

Resume the default queue only on a particular node:

>>> await oban.resume_queue("default", node="worker.1")
async pause_all_queues(*, node=None)[source]#

Pause all queues, preventing them from executing new jobs.

All running jobs will remain running until they are finished.

Parameters:

node (str | None) – Specific node name to pause. If not provided, pauses across all nodes.

Return type:

None

Example

Pause all queues across all nodes:

>>> await oban.pause_all_queues()

Pause all queues only on a particular node:

>>> await oban.pause_all_queues(node="worker.1")
async resume_all_queues(*, node=None)[source]#

Resume all paused queues, allowing them to execute jobs again.

Parameters:

node (str | None) – Specific node name to resume. If not provided, resumes across all nodes.

Return type:

None

Example

Resume all queues across all nodes:

>>> await oban.resume_all_queues()

Resume all queues only on a particular node:

>>> await oban.resume_all_queues(node="worker.1")
check_queue(queue)[source]#

Check the current state of a queue.

This allows you to introspect on a queue’s health by retrieving key attributes of the producer’s state, such as the current limit, running job IDs, and when the producer was started.

Parameters:

queue (str) – The name of the queue to check

Return type:

QueueInfo | None

Returns:

A QueueInfo instance with the producer’s state, or None if the queue isn’t running on this node.

Example

Get details about the default queue:

>>> state = oban.check_queue("default")
... print(f"Queue {state.queue} has {len(state.running)} jobs running")

Attempt to check a queue that isn’t running locally:

>>> state = oban.check_queue("not_running")
>>> print(state)  # None
check_all_queues()[source]#

Check the current state of all queues running on this node.

Return type:

list[QueueInfo]

Returns:

A list of QueueInfo instances, one for each queue running locally. Returns an empty list if no queues are running on this node.

Example

Get details about all local queues:

>>> states = oban.check_all_queues()
>>> for state in states:
...     print(f"{state.queue}: {len(state.running)} running, paused={state.paused}")
async start_queue(*, queue, limit, paused=False, node=None)[source]#

Start a new supervised queue.

By default, this starts a new supervised queue across all nodes running Oban on the same database and prefix.

Parameters:
  • queue (str) – The name of the queue to start

  • limit (int) – The concurrency limit for the queue

  • paused (bool) – Whether the queue starts in a paused state (default: False)

  • node (str | None) – Specific node name to start the queue on. If not provided, starts across all nodes.

Return type:

None

Example

Start the priority queue with a concurrency limit of 10 across all nodes:

>>> await oban.start_queue(queue="priority", limit=10)

Start the media queue on a particular node:

>>> await oban.start_queue(queue="media", limit=5, node="worker.1")

Start the media queue in a paused state:

>>> await oban.start_queue(queue="media", limit=5, paused=True)
async stop_queue(queue, *, node=None)[source]#

Stop a supervised queue.

By default, this stops the queue across all connected nodes.

Parameters:
  • queue (str) – The name of the queue to stop

  • node (str | None) – Specific node name to stop the queue on. If not provided, stops across all nodes.

Return type:

None

Example

Stop the priority queue across all nodes:

>>> await oban.stop_queue("priority")

Stop the media queue on a particular node:

>>> await oban.stop_queue("media", node="worker.1")
async scale_queue(*, queue, node=None, **kwargs)[source]#

Scale the concurrency for a queue.

By default, this scales the queue across all connected nodes.

Parameters:
  • queue (str) – The name of the queue to scale

  • node (str | None) – Specific node name to scale the queue on. If not provided, scales across all nodes.

  • **kwargs (Any) – Options passed through transparently, including: limit: The new concurrency limit

Return type:

None

Example

Scale a queue up, triggering immediate execution of queued jobs:

>>> await oban.scale_queue(queue="default", limit=50)

Scale the queue back down, allowing executing jobs to finish:

>>> await oban.scale_queue(queue="default", limit=5)

Scale the queue on a particular node:

>>> await oban.scale_queue(queue="default", limit=10, node="worker.1")

Job#

class oban.Job[source]#

Bases: object

Job(worker: ‘str’, id: ‘int | None’ = None, state: ‘JobState’ = <JobState.AVAILABLE: ‘available’>, queue: ‘str’ = ‘default’, attempt: ‘int’ = 0, max_attempts: ‘int’ = 20, priority: ‘int’ = 0, args: ‘dict[str, Any]’ = <factory>, meta: ‘dict[str, Any]’ = <factory>, errors: ‘list[str]’ = <factory>, tags: ‘list[str]’ = <factory>, attempted_by: ‘list[str]’ = <factory>, inserted_at: ‘datetime | None’ = None, attempted_at: ‘datetime | None’ = None, cancelled_at: ‘datetime | None’ = None, completed_at: ‘datetime | None’ = None, discarded_at: ‘datetime | None’ = None, scheduled_at: ‘datetime | None’ = None, extra: ‘dict[str, Any]’ = <factory>)

worker: str#
id: int | None#
state: JobState#
queue: str#
attempt: int#
max_attempts: int#
priority: int#
args: dict[str, Any]#
meta: dict[str, Any]#
errors: list[str]#
tags: list[str]#
attempted_by: list[str]#
inserted_at: datetime | None#
attempted_at: datetime | None#
cancelled_at: datetime | None#
completed_at: datetime | None#
discarded_at: datetime | None#
scheduled_at: datetime | None#
extra: dict[str, Any]#
classmethod new(**params)[source]#

Create a new job with validation and normalization.

This is a low-level method for manually constructing jobs. In most cases, you should use the @worker or @job decorators instead, which provide a more convenient API via Worker.new() and Worker.enqueue().

Jobs returned from the database are constructed directly and skip validation/normalization.

Parameters:

**params – Job field values including: - worker: Required. Fully qualified worker class path - args: Job arguments (default: {}) - queue: Queue name (default: “default”) - priority: Priority 0-9 (default: 0) - max_attempts: Maximum retry attempts (default: 20) - scheduled_at: When to run the job (default: now) - schedule_in: Alternative to scheduled_at. Timedelta or seconds from now - tags: List of tags for grouping - meta: Arbitrary metadata dictionary

Return type:

Job

Returns:

A validated and normalized Job instance

Example

Manual job creation (not recommended for typical use):

>>> job = Job.new(
...     worker="myapp.workers.EmailWorker",
...     args={"to": "[email protected]"},
...     queue="mailers",
...     schedule_in=60  # Run in 60 seconds
... )

Preferred approach using decorators:

>>> from oban import worker
>>>
>>> @worker(queue="mailers")
... class EmailWorker:
...     async def process(self, job):
...         pass
>>>
>>> job = EmailWorker.new({"to": "[email protected]"}, schedule_in=60)
update(changes)[source]#

Update this job with the given changes, applying validation and normalization.

This method creates a new Job instance with the changes applied, then validates and normalizes the result. It’s used internally by Oban’s update_job methods.

Parameters:

changes (dict[str, Any]) – Dictionary of field changes. Supports: - args: Job arguments - max_attempts: Maximum retry attempts - meta: Arbitrary metadata dictionary - priority: Priority 0-9 - queue: Queue name - scheduled_at: When to run the job - schedule_in: Alternative to scheduled_at. Timedelta or seconds from now - tags: List of tags for filtering/grouping - worker: Fully qualified worker class path

Return type:

Job

Returns:

A new Job instance with changes applied and validated

Example

>>> job.update({"priority": 0, "tags": ["urgent"]})
cancelled()[source]#

Check if cancellation has been requested for this job.

Workers can call this method at safe points during execution to check if the job should stop processing and return early.

Return type:

bool

Returns:

True if cancellation has been requested, False otherwise

Example

>>> async def process(self, job):
...     for item in large_dataset:
...         if job.cancelled():
...             return Cancel("Job was cancelled")
...         await process_item(item)
__init__(worker, id=None, state=JobState.AVAILABLE, queue='default', attempt=0, max_attempts=20, priority=0, args=<factory>, meta=<factory>, errors=<factory>, tags=<factory>, attempted_by=<factory>, inserted_at=None, attempted_at=None, cancelled_at=None, completed_at=None, discarded_at=None, scheduled_at=None, extra=<factory>)#
Parameters:
Return type:

None

Decorators#

Decorators for creating Oban workers and jobs.

This module provides two decorators for making your code enqueueable:

  • @worker For classes with a process method

  • @job For wrapping functions as jobs

oban.decorators.worker(*, oban='oban', cron=None, **overrides)[source]#

Decorate a class to make it a viable worker.

The decorator adds worker functionality to a class, including job creation and enqueueing methods. The decorated class must implement a process method.

For simpler function-based jobs, consider using @job instead.

Parameters:
  • oban (str) – Name of the Oban instance to use (default: “oban”)

  • cron (str | dict | None) – Optional cron configuration for periodic execution. Can be: - A string expression (e.g., “0 0 * * *” or “@daily”) - A dict with “expr” and optional “timezone” keys (timezone as string)

  • **overrides – Configuration options for the worker (queue, priority, etc.)

Returns:

A decorator function that can be applied to worker classes

Example

>>> from oban import Oban, worker
>>>
>>> # Create an Oban instance with a specific name
>>> oban_instance = Oban(name="oban", queues={"default": 10, "mailers": 5})
>>>
>>> @worker(queue="mailers", priority=1)
... class EmailWorker:
...     async def process(self, job):
...         print(f"Sending email: {job.args}")
...         return None
>>>
>>> # Create a job without enqueueing
>>> job = EmailWorker.new({"to": "[email protected]", "subject": "Hello"})
>>> print(job.queue)  # "mailers"
>>> print(job.priority)  # 1
>>>
>>> # Create and enqueue a job
>>> job = EmailWorker.enqueue(
...     {"to": "[email protected]", "subject": "Alert"},
...     priority=5  # Override default priority
... )
>>> print(job.priority)  # 5
>>>
>>> # Schedule a job to run in 5 minutes using a timedelta
>>> from datetime import timedelta
>>> job = EmailWorker.enqueue(..., schedule_in=timedelta(minutes=5))
>>>
>>> # Schedule a job to run in 60 seconds
>>> job = EmailWorker.enqueue(...,schedule_in=60)
>>>
>>> # Periodic worker that runs daily at midnight
>>> @worker(queue="cleanup", cron="@daily")
... class DailyCleanup:
...     async def process(self, job):
...         print("Running daily cleanup")
...         return None
>>>
>>> # Periodic worker with timezone
>>> @worker(queue="reports", cron={"expr": "0 9 \* \* MON-FRI", "timezone": "America/New_York"})
... class BusinessHoursReport:
...     async def process(self, job):
...         print("Running during NY business hours")
...         return None
>>>
>>> # Workers can also be created without args
>>> job = DailyCleanup.new()  # args defaults to {}
>>>
>>> # Custom backoff for retries
>>> @worker(queue="default")
... class CustomBackoffWorker:
...     async def process(self, job):
...         return None
...
...     def backoff(self, job):
...         # Simple linear backoff at 2x the attempt number
...         return 2 * job.attempt

Note

The worker class must implement a process(self, job: Job) -> Result[Any] method. If not implemented, a NotImplementedError will be raised when called.

Optionally implement a backoff(self, job: Job) -> int method to customize retry delays. If not provided, uses Oban’s default jittery clamped backoff.

oban.decorators.job(*, oban='oban', cron=None, **overrides)[source]#

Decorate a function to make it an Oban job.

The decorated function’s signature is preserved for new() and enqueue().

Use @job for simple function-based tasks where you don’t need access to job metadata such as the attempt, past errors.

Parameters:
  • oban (str) – Name of the Oban instance to use (default: “oban”)

  • cron (str | dict | None) – Optional cron configuration for periodic execution. Can be: - A string expression (e.g., “0 0 * * *” or “@daily”) - A dict with “expr” and optional “timezone” keys (timezone as string)

  • **overrides – Configuration options (queue, priority, etc.)

Example

>>> from oban import job
>>>
>>> @job(queue="mailers", priority=1)
... def send_email(to: str, subject: str, body: str):
...     print(f"Sending to {to}: {subject}")
>>>
>>> send_email.enqueue("[email protected]", "Hello", "World")
>>>
>>> # Periodic job that runs weekly at midnight
>>> @job(queue="reports", cron="@weekly")
... def generate_weekly_report():
...     print("Generating weekly report")
...     return {"status": "complete"}

Testing#

Testing helpers for Oban workers and queues.

This module provides utilities for unit testing workers without database interaction.

oban.testing.mode(testing_mode)[source]#

Temporarily set the testing mode for Oban instances.

This context manager allows you to override the testing mode for all Oban instances within a specific context. Useful for switching modes in individual tests without affecting the entire test suite.

Parameters:

testing_mode (str) – The mode to set (“inline” or “manual”)

Yields:

None

Example

>>> import oban.testing
>>>
>>> oban.testing.set_mode("manual")
>>>
>>> def test_inline_execution():
...     with oban.testing.mode("inline"):
...         # Jobs execute immediately in this context
...         await EmailWorker.enqueue({"to": "[email protected]"})
async oban.testing.reset_oban(oban='oban')[source]#

Reset Oban tables between tests.

Truncates all oban related tables with CASCADE and RESTART IDENTITY. Useful for cleaning up between tests when using manual testing mode.

Parameters:

oban (str | Oban) – Oban instance name (default: “oban”) or Oban instance

Example

>>> from oban.testing import reset_oban
>>> import pytest
>>>
>>> # In your conftest.py
>>> @pytest.fixture(autouse=True)
>>> async def _reset_oban_after_test():
...     yield
...     await reset_oban()
>>>
>>> # Or call directly in tests
>>> async def test_something(oban):
...     await oban.enqueue(SomeWorker.new({}))
...     # ... test assertions ...
...     await reset_oban()
async oban.testing.all_enqueued(*, oban='oban', **filters)[source]#

Retrieve all currently enqueued jobs matching a set of filters.

Only jobs matching all of the provided filters will be returned. Additionally, jobs are returned in descending order where the most recently enqueued job will be listed first.

Parameters:
  • oban (str | Oban) – Oban instance name (default: “oban”) or Oban instance

  • **filters – Job fields to match (e.g., worker=EmailWorker, args={“to”: “…”}, queue=”mailers”, priority=5). Args supports partial matching.

Return type:

list[Job]

Returns:

List of Job instances matching the filters, in descending order by ID

Example

>>> from oban.testing import all_enqueued
>>> from app.workers import EmailWorker
>>>
>>> # Assert based on only some of a job's args
>>> jobs = await all_enqueued(worker=EmailWorker)
>>> assert len(jobs) == 1
>>> assert jobs[0].args["id"] == 1
>>>
>>> # Assert that exactly one job was inserted for a queue
>>> jobs = await all_enqueued(queue="alpha")
>>> assert len(jobs) == 1
>>>
>>> # Assert that there aren't any jobs enqueued
>>> assert await all_enqueued() == []
async oban.testing.assert_enqueued(*, oban='oban', timeout=0, **filters)[source]#

Assert that a job matching the given criteria was enqueued.

This helper queries the database for jobs in ‘available’ or ‘scheduled’ state that match the provided filters. With a timeout, it will poll repeatedly until a matching job is found or the timeout expires.

Parameters:
  • oban (str | Oban) – Oban instance name (default: “oban”) or Oban instance

  • timeout (float) – Maximum time to wait for a matching job (in seconds). Default: 0 (no wait)

  • **filters – Job fields to match (e.g., worker=EmailWorker, args={“to”: “…”}, queue=”mailers”, priority=5). Args supports partial matching.

Raises:

AssertionError – If no matching job is found within the timeout

Example

>>> from oban.testing import assert_enqueued
>>> from app.workers import EmailWorker
>>>
>>> # Assert job was enqueued with specific worker and args
>>> async def test_signup_sends_email(app):
...     await app.post("/signup", json={"email": "[email protected]"})
...     await assert_enqueued(worker=EmailWorker, args={"to": "[email protected]"})
>>>
>>> # Wait up to 0.2 seconds for an async job to be enqueued
>>> await assert_enqueued(worker=EmailWorker, timeout=0.2)
>>>
>>> # Match on queue alone
>>> await assert_enqueued(queue="mailers")
>>>
>>> # Partial args matching
>>> await assert_enqueued(worker=EmailWorker, args={"to": "[email protected]"})
>>>
>>> # Filter by queue and priority
>>> await assert_enqueued(worker=EmailWorker, queue="mailers", priority=5)
>>>
>>> # Use an alternate oban instance
>>> await assert_enqueued(worker=BatchWorker, oban="batch")
async oban.testing.refute_enqueued(*, oban='oban', timeout=0, **filters)[source]#

Assert that no job matching the given criteria was enqueued.

This helper queries the database for jobs in ‘available’ or ‘scheduled’ state that match the provided filters and asserts that none are found. With a timeout, it will poll repeatedly during the timeout period to ensure no matching job appears.

Parameters:
  • oban (str | Oban) – Oban instance name (default: “oban”) or Oban instance

  • timeout (float) – Time to monitor for matching jobs (in seconds). Default: 0 (check once)

  • **filters – Job fields to match (e.g., worker=EmailWorker, args={“to”: “…”}, queue=”mailers”, priority=5). Args supports partial matching.

Raises:

AssertionError – If any matching jobs are found

Example

>>> from oban.testing import refute_enqueued
>>> from app.workers import EmailWorker
>>>
>>> # Assert no email jobs were enqueued
>>> async def test_no_email_on_invalid_signup(app):
...     await app.post("/signup", json={"email": "invalid"})
...     await refute_enqueued(worker=EmailWorker)
>>>
>>> # Monitor for 0.2 seconds to ensure no async job is enqueued
>>> await refute_enqueued(worker=EmailWorker, timeout=0.2)
>>>
>>> # Refute specific args
>>> await refute_enqueued(worker=EmailWorker, args={"to": "[email protected]"})
>>>
>>> # Refute on queue
>>> await refute_enqueued(queue="mailers")
async oban.testing.drain_queue(queue='default', oban='oban', with_recursion=True, with_safety=False, with_scheduled=True)[source]#

Synchronously execute all available jobs in a queue.

All execution happens within the current process. Draining a queue from within the current process is especially useful for testing, where jobs enqueued by a process in sandbox mode are only visible to that process.

Parameters:
  • queue (str) – Name of the queue to drain

  • oban (str | Oban) – Oban instance name (default: “oban”) or Oban instance

  • with_recursion (bool) – Whether to drain jobs recursively, or all in a single pass. Either way, jobs are processed sequentially, one at a time. Recursion is required when jobs insert other jobs or depend on the execution of other jobs. Defaults to True.

  • with_scheduled (bool) – Whether to include scheduled or retryable jobs when draining. In recursive mode, which is the default, this will include snoozed jobs, and may lead to an infinite loop if the job snoozes repeatedly. Defaults to True.

  • with_safety (bool) – Whether to silently catch and record errors when draining. When False, raised exceptions are immediately propagated to the caller. Defaults to False.

Return type:

dict[str, int]

Returns:

Dict with counts for each terminal job state (completed, discarded, cancelled, scheduled, retryable)

Example

>>> from oban.testing import drain_queue
>>> from oban import worker
>>>
>>> # Drain a queue with jobs
>>> result = await drain_queue(queue="default")
>>> # {'completed': 2, 'discarded': 1, 'cancelled': 0, ...}
>>>
>>> # Drain without scheduled jobs
>>> await drain_queue(queue="default", with_scheduled=False)
>>>
>>> # Drain without safety and assert an error is raised
>>> import pytest
>>> with pytest.raises(RuntimeError):
...     await drain_queue(queue="risky", with_safety=False)
>>>
>>> # Drain without recursion (jobs that enqueue other jobs)
>>> await drain_queue(queue="default", with_recursion=False)
oban.testing.process_job(job)[source]#

Execute a worker’s process method with the given job.

This helper is designed for unit testing workers in isolation without requiring database interaction.

Parameters:

job (Job) – A Job instance to process

Returns:

The result from the worker’s process method (any value is accepted)

Raises:

Any exception raised by the worker if it fails

Example

>>> from oban import worker
>>> from oban.testing import process_job
>>>
>>> @worker()
... class EmailWorker:
...     async def process(self, job):
...         return {"sent": True, "to": job.args["to"]}
>>>
>>> def test_email_worker():
...     job = EmailWorker.new({"to": "[email protected]", "subject": "Hello"})
...     result = process_job(job)
...     assert result["sent"] is True
...     assert result["to"] == "[email protected]"

You can also test function-based workers using the @job decorator:

>>> from oban import job
>>> from oban.testing import process_job
>>>
>>> @job()
... def send_notification(user_id: int, message: str):
...     return f"Sent '{message}' to user {user_id}"
>>>
>>> def test_send_notification():
...     job = send_notification.new(123, "Hello World")
...     result = process_job(job)
...     assert result == "Sent 'Hello World' to user 123"

Telemetry#

Lightweight telemetry tooling for agnostic instrumentation.

Provides event emission and handler attachment for instrumentation, similar to Elixir’s :telemetry library, but tailored to Oban’s needs.

class oban.telemetry.Collector[source]#

Bases: object

Collector for adding metadata during span execution.

Yielded by span() to allow adding additional metadata that will be included in the stop or exception event.

__init__()[source]#
add(metadata)[source]#

Add metadata to be included in the stop or exception event.

Parameters:

metadata (dict[str, Any]) – Dictionary of metadata to merge into the event

Return type:

None

get_all()[source]#
Return type:

dict[str, Any]

oban.telemetry.attach(id, events, handler)[source]#

Attach a handler function to one or more telemetry events.

Parameters:
  • id (str) – Unique identifier for this handler (used for detaching)

  • events (List[str]) – List of event names to handle (e.g., [“oban.job.execute.start”])

  • handler (Callable[[str, dict[str, Any]], None]) – Function called with (name, metadata) for each event

Return type:

None

Example

def log_events(name, metadata):

print(f”{name}: {metadata}”)

telemetry.attach(“my-logger”, [“oban.job.execute.stop”], log_events)

oban.telemetry.detach(id)[source]#

Remove all handlers with the given ID.

Parameters:

id (str) – The handler ID to remove

Return type:

None

Example

telemetry.detach(“my-logger”)

oban.telemetry.execute(name, metadata)[source]#

Execute all handlers registered for an event.

Handlers are called synchronously. Exceptions in handlers are caught and logged to prevent breaking instrumented code.

Parameters:
  • name (str) – Name of the event (e.g., “oban.job.execute.start”)

  • metadata (dict[str, Any]) – Event metadata (job_id, duration, etc.)

Return type:

None

Example

telemetry.execute(“oban.job.execute.start”, {“job_id”: 123, “queue”: “default”})

oban.telemetry.span(prefix, start_metadata)[source]#

Context manager that emits start/stop/exception events.

Automatically measures duration and emits three possible events: - {prefix}.start - When entering the span - {prefix}.stop - When exiting normally - {prefix}.exception - When an exception is raised

Parameters:
  • prefix (str) – Event name prefix (e.g., “oban.job.execute”)

  • start_metadata (dict[str, Any]) – Metadata included in all events

Yields:

Collector – Object for adding additional metadata during execution

Example

with telemetry.span(“oban.job.execute”, {“job_id”: 123}) as collector:

result = process_job() collector.add({“state”: result.state})

# Emits: # - “oban.job.execute.start” with job_id and system_time # - “oban.job.execute.stop” with job_id, state, duration, and system_time

Schemas#

Database schema installation for Oban.

oban.schema.install_sql(prefix='public')[source]#

Get the SQL for installing Oban.

Returns the raw SQL statements for creating Oban types, tables, and indexes. This is intended for integration with migration frameworks like Django or Alembic.

Parameters:

prefix (str) – PostgreSQL schema where Oban tables will be located (default: “public”)

Return type:

str

Returns:

SQL string for schema installation

Example (Alembic):
>>> from alembic import op
>>> from oban.schema import install_sql
>>>
>>> def upgrade():
...     op.execute(install_sql())
Example (Django):
>>> from django.db import migrations
>>> from oban.schema import install_sql
>>>
>>> class Migration(migrations.Migration):
...     operations = [
...         migrations.RunSQL(install_sql()),
...     ]
oban.schema.uninstall_sql(prefix='public')[source]#

Get the SQL for uninstalling Oban.

Returns the raw SQL statements for dropping Oban tables and types. Useful for integration with migration frameworks like Alembic or Django.

Parameters:

prefix (str) – PostgreSQL schema where Oban tables are located (default: “public”)

Return type:

str

Returns:

SQL string for schema uninstallation

Example (Alembic):
>>> from alembic import op
>>> from oban.schema import uninstall_sql
>>>
>>> def downgrade():
...     op.execute(uninstall_sql())
Example (Django):
>>> from django.db import migrations
>>> from oban.schema import uninstall_sql
>>>
>>> class Migration(migrations.Migration):
...     operations = [
...         migrations.RunSQL(uninstall_sql()),
...     ]
async oban.schema.install(pool, prefix='public')[source]#

Install Oban in the specified database.

Creates all necessary types, tables, and indexes for Oban to function. The installation is wrapped in a DDL transaction to ensure the operation is atomic.

Parameters:
  • pool (Any) – A database connection pool (e.g., AsyncConnectionPool)

  • prefix (str) – PostgreSQL schema where Oban tables will be located (default: “public”)

Return type:

None

Example

>>> from psycopg_pool import AsyncConnectionPool
>>> from oban.schema import install
>>>
>>> pool = AsyncConnectionPool(conninfo=DATABASE_URL, open=False)
>>> await pool.open()
>>> await install(pool)
async oban.schema.uninstall(pool, prefix='public')[source]#

Uninstall Oban from the specified database.

Drops all Oban tables and types. The uninstallation is wrapped in a DDL transaction to ensure the operation is atomic.

Parameters:
  • pool (Any) – A database connection pool (e.g., AsyncConnectionPool)

  • prefix (str) – PostgreSQL schema where Oban tables are located (default: “public”)

Return type:

None

Example

>>> from psycopg_pool import AsyncConnectionPool
>>> from oban.schema import uninstall
>>>
>>> pool = AsyncConnectionPool(conninfo=DATABASE_URL, open=False)
>>> await pool.open()
>>> await uninstall(pool)