This release enhances workflows with sub-workflows and context sharing, overhauls queue partitioning for better performance, improves dynamic plugins, and adds various usability improvements.
See the v1.7 Upgrade Guide for complete upgrade steps and migration caveats.
🗂️ Workflow Tracking
Workflows now use a dedicated oban_workflows table to track workflow metadata in real-time.
Database triggers maintain accurate counts as jobs transition between states, replacing expensive
aggregation queries with simple lookups.
This enables workflow tracking for uniqueness, accurate stuck workflow rescuing, and Oban Web to display workflows using highly efficient queries.
Suspended State
Jobs waiting on workflow or chain dependencies now use a proper suspended state instead of the
previous on_hold pseudo-state. This provides cleaner semantics, better query performance through
simplified indexes, and enables the database triggers to track workflow state counts accurately.
Note that any in-flight workflows will continue to run normally, without any backfilling or data modification.
Unique Workflows
Workflows can now be created with unique: true to prevent multiple workflows with the same name
from running concurrently:
Workflow.new(name: "daily-report", unique: true)
|> Workflow.add(:fetch, FetchWorker.new(%{}))
|> Workflow.add(:process, ProcessWorker.new(%{}), deps: [:fetch])
|> Oban.insert_all()
When a duplicate unique workflow is inserted, its jobs are marked with conflict?: true instead
of being inserted, similar to how unique jobs work.
⚖️ Rate Limiting Overhaul
Rate limiting gains multiple algorithms, variable job weights, and a dedicated module for interacting with rate limits outside of job execution.
Multiple Algorithms
Three algorithms are now available, each with different trade-offs:
-
Sliding Window—Uses weighted averaging across two time buckets for smooth rate limiting without bursts at window boundaries.
-
Fixed Window—Resets the count when each period expires. Simple and predictable, but allows bursting at boundaries (e.g.,
allowedjobs at 11:59, thenallowedmore at 12:00). -
Token Bucket—Tokens refill continuously at
allowed / periodper second. Allows controlled bursting up toallowedwhile maintaining the overall rate. Ideal for APIs that permit short bursts but enforce sustained limits.
queues: [
sliding: [rate_limit: [allowed: 100, period: {1, :minute}, algorithm: :sliding_window]],
fixed: [rate_limit: [allowed: 100, period: {1, :minute}, algorithm: :fixed_window]],
bucket: [rate_limit: [allowed: 100, period: {1, :minute}, algorithm: :token_bucket]]
]
Weighted Jobs
Jobs can now consume variable amounts of rate limit capacity, with three ways to assign weights.
The simplest is a worker default, where all jobs from a worker consume 10 units of quota:
defmodule MyApp.HeavyWorker do
use Oban.Pro.Worker, rate: [weight: 10]
end
Slightly more dynamic is a job option, where you can override the weight at insert time:
MyApp.HeavyWorker.new(args, rate: [weight: 5])
Finally, the most flexible option, is to calculate the weight dynamically at runtime with the new
c:weight/1 callback:
defmodule MyApp.BatchWorker do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def weight(%{args: %{"records" => records}}), do: length(records)
end
Rate Limit API
The new Oban.Pro.RateLimit module provides functions for interacting with rate limits outside of
job execution. There are functions to check availability, manually consume quota, or reset the
rate limit entirely. For example, to conditionally make a batch of api calls based on capacity:
case Oban.Pro.RateLimit.available(:my_queue) do
{:ok, capacity} when capacity >= count ->
:ok = Oban.Pro.RateLimit.consume(:my_queue, count)
make_api_calls()
{:ok, _capacity} ->
{:error, :insufficient_capacity}
end
Even simpler, there is a with_quote/4 helper that can execute a function after atomically
reserving capacity, with an optional timeout:
case Oban.Pro.RateLimit.with_quota(:my_queue, 5, &make_api_calls/0, timeout: 10_000) do
{:ok, result} -> handle_result(result)
{:error, :timeout} -> handle_timeout()
end
All rate limit operations are globally distributed and operate at the queue or partition level, sharing quota with a running queue.
📦 Chunk Overhaul
Chunks now use a pre-computed chunk_id for grouping, enabling a lightweight for much faster
chunk lookups. This replaces expensive dynamic query construction based on partitioning fields
with a simple index backed query.
Additionally, chunks use a single operation for acking all jobs in a chunk, reducing database round-trips when completing, cancelling, retrying, etc. The new acking operation also improves compatibility with non-Postgres databases like CockroachDB.
Legacy Chunk Jobs
Jobs created before v1.7 won't have a
chunk_idin their metadata. TheDynamicLifelineplugin automatically computes and sets thechunk_idfor these jobs, so no manual backfilling is required.
Snooze Support
Chunks can now selectively snooze jobs to retry them after a delay. This is useful when some items in a chunk need to wait before retrying while others complete normally:
@impl Oban.Pro.Workers.Chunk
def process(jobs) do
{ready, not_ready} = Enum.split_with(jobs, &ready_to_process?/1)
process_jobs(ready)
if Enum.any?(not_ready) do
# Snooze jobs that aren't ready, complete the rest
{:snooze, {30, :seconds}, not_ready}
else
:ok
end
end
For mixed outcomes, snooze combines with other result types:
[cancel: {"invalid", invalid_jobs}, snooze: {{1, :minute}, retry_later}]
🪝 Global Cancel/Discard Hooks
Two new worker callbacks fire when jobs are cancelled or discarded outside of execution, regardless of how the state transition happens:
-
on_cancelled/2— called when a job is cancelled due to:dependency(workflow dependency failed),:manual(viaOban.cancel_job/1), or:deadline(force-cancelled by deadline) -
on_discarded/2— called when a job is discarded after exhausting all retries (:exhausted), typically triggered byDynamicLifeline
defmodule MyApp.OrderWorker do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def on_cancelled(reason, job) do
MyApp.Notifications.order_cancelled(job.args["order_id"], reason)
:ok
end
@impl Oban.Pro.Worker
def on_discarded(:exhausted, job) do
MyApp.Notifications.order_failed(job.args["order_id"])
:ok
end
end
For broad concerns like logging or metrics, it's possible a hook module globally so it applies to
all Oban.Pro.Worker modules just like all other hooks.
📇 Improved Indexes
The v1.7 migration includes numerous new and rebuilt indexes that aid performance for chains, chunks, workflows, and general operation while also reducing overall index sizes.
Partial Indexes
New partial indexes reduce index size and improve query performance by only indexing rows that match specific conditions. In addition to the new chunk index, it adds or rebuilds partial indexes for:
-
Staging index—indexes jobs ready to transition to
available, enabling 2-10x faster staging queries depending on job volume and state distribution -
Pruning indexes—separate partial indexes for
completed_at,cancelled_at, anddiscarded_aton terminal job states, making cleanup queries faster with smaller indexes -
Unique/partition indexes — recreated as partial indexes without reliance on generated columns to save space and avoid table locking migrations
No More Generated Columns
The uniq_key and partition_key generated columns introduced in v1.5/v1.6 are replaced with
expression indexes directly on the meta field. This eliminates table locking during migrations
from OSS Oban or older Pro versions, a significant improvement for applications with high
throughput oban_jobs tables.
The Upgrade Guide includes instructions for optional post-migration cleanup for unused transitional indexes and legacy generated columns.