Managing Queues#
Once queues are defined and running, you’re able to inspect and control them at runtime. This is essential for dynamic processing, handling traffic bursts, and troubleshooting unexpected conditions.
Pausing and Resuming Queues#
Pause a queue to stop it from starting new jobs.
# Pause across all nodes
await oban.pause_queue("media")
By default, that queue is paused across all running instances (thanks to pubsub notifications). It’s also possible to pause on a specific node:
# Pause only on a specific node
await oban.pause_queue("media", node="worker.1")
Jobs that are already running will continue until they complete. Resume a paused queue to allow it to process jobs again:
# Resume across all nodes
await oban.resume_queue("media")
# Resume only on a specific node
await oban.resume_queue("media", node="worker.1")
Some use cases for pausing queues:
Maintenance windows: Pause queues before deploying or running migrations
Incident response: Stop processing a problematic queue while investigating
Resource management: Pause low-priority queues during high-load periods
Scaling Queues#
A queue’s concurrency limit may be changed across all nodes at runtime without restarting:
# Scale up to handle increased load
await oban.scale_queue(queue="default", limit=50)
# Scale down to reduce resource usage
await oban.scale_queue(queue="default", limit=5)
Like pausing and resuming, you can target a single node for scaling:
# Scale only on a specific node
await oban.scale_queue(queue="default", limit=20, node="worker.1")
Scaling up triggers immediate job fetching, so queued jobs will start executing right away. Scaling down doesn’t interrupt running jobs—it only affects how many new jobs can start.
Some use cases for scaling:
Traffic bursts: Scale up queues during peak hours, scale down during quiet periods
Gradual rollouts: Start with low concurrency to validate changes, then scale up
Resource balancing: Shift capacity between queues based on current demand
Starting and Stopping Queues#
Queues can be started and stopped dynamically across all nodes at runtime:
# Start a queue across all nodes
await oban.start_queue(queue="priority", limit=10)
# Start a queue in a paused state
await oban.start_queue(queue="batch", limit=20, paused=True)
Naturally, you can scope starting and stopping to a single node as well:
# Start only on a specific node
await oban.start_queue(queue="local_only", limit=5, node="worker.1")
To stop a running queue:
# Stop across all nodes
await oban.stop_queue("priority")
# Stop only on a specific node
await oban.stop_queue("priority", node="worker.1")
Stopping a queue allows running jobs to complete gracefully—it doesn’t terminate them immediately.
Some use cases for dynamic queues:
Feature flags: Start queues only when certain features are enabled
Tenant isolation: Create dedicated queues for specific customers
Batch processing: Start temporary queues for large batches, stop when done
Node-Specific Operations#
All queue management methods accept an optional node parameter to target specific nodes in a
cluster. This is useful when:
Different nodes have different resource capacities
You want to drain a node before maintenance
Testing changes on a single node before rolling out
# Pause only on the node being maintained
await oban.pause_queue("default", node="worker.3")
# Scale up a specific high-capacity node
await oban.scale_queue(queue="media", limit=100, node="gpu-worker.1")
If no node is specified, the operation applies to all nodes running that queue.
Tip
The term node comes from Oban’s Elixir heritage, where it refers to a running instance of the virtual machine. In Python, a node is simply a single running instance of your application.
When you run oban start or start Oban embedded in your app, that process becomes a node with its
own unique identifier. In a typical deployment, you might have multiple nodes (containers,
servers, or processes) all running Oban and processing jobs from the same database.
Checking Queue State#
When running Oban in embedded mode, you can use check_queue and check_all_queues to inspect
the state of queues on the current instance. These methods only have visibility into queues
running in the same process—they cannot see queues on other nodes.
# Check a specific queue
info = oban.check_queue("default")
if info:
print(f"Queue: {info.queue}")
print(f"Limit: {info.limit}")
print(f"Running: {len(info.running)} jobs")
print(f"Paused: {info.paused}")
# Check all queues on this node
for info in oban.check_all_queues():
print(f"{info.queue}: {len(info.running)}/{info.limit} running, paused={info.paused}")
The returned QueueInfo includes:
Field |
Description |
|---|---|
|
The queue name |
|
Current concurrency limit |
|
List of currently executing job IDs |
|
Whether the queue is paused |
|
The node where this queue is running |
|
When the queue was started |
|
Queue metadata including extension options |
Note that check_queue returns None if the queue isn’t running on the current node, while
check_all_queues returns an empty list if no queues are running locally.