Oban.Pro.Engines.Smart (Oban Pro v1.7.0-rc.0)
The Smart engine provides advanced concurrency features, enhanced observability, lighter
weight bulk processing, and provides a foundation for accurate job lifecycle management. As an
Oban.Engine, it is responsible for all non-plugin database interaction, from inserting through
executing jobs.
Major features include:
- Global Concurrency — limit the number of concurrent jobs that run across all nodes
- Rate Limiting — limit the number of jobs that execute within a window of time
- Queue Partitioning — segment a queue so concurrency or rate limits apply separately to each partition
- Async Tracking — bundle job acks (completed, cancelled, etc.) to minimize transactions and reduce load on the database
- Enhanced Unique — enforce job uniqueness with a custom index to accelerate inserting unique jobs safely between processes and nodes
- Bulk Inserts — automatically batch inserts to prevent hitting
database limits, reduce data sent over the wire, and respect
uniqueoptions when usingOban.insert_all/2 - Accurate Snooze — differentiate between attempts with errors and intentional snoozes
Installation
See the Smart Engine section in the adoption guide to get started.
Global Concurrency
Global concurrency limits the number of concurrent jobs that run across all nodes.
Typically the global concurrency limit is local_limit * num_nodes. For example, with three
nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit is present,
and the local_limit is omitted, then the local_limit falls back to the global_limit.
The only way to guarantee that all connected nodes will run exactly one job concurrently is to
set global_limit: 1.
Here are some examples:
# Execute 10 jobs concurrently across all nodes, with up to 10 on a single node
my_queue: [global_limit: 10]
# Execute 10 jobs concurrently, but only 3 jobs on a single node
my_queue: [local_limit: 3, global_limit: 10]
# Execute at most 1 job concurrently
my_queue: [global_limit: 1]Rate Limiting
Rate limiting controls the number of jobs that execute within a period of time.
Rate limiting uses counts for the same queue from all other nodes in the cluster (with or without Distributed Erlang). By default, the limiter uses a sliding window over the configured period to accurately approximate a limit, though other algorithms are available.
Every job execution counts toward the rate limit, regardless of whether the job completes, errors, snoozes, etc.
Without a modifier, the rate_limit period is defined in seconds. However, you can provide a
:second, :minute, :hour or :day modifier to use more intuitive values.
period: 30— 30 secondsperiod: {1, :minute}— 60 secondsperiod: {2, :minutes}— 120 secondsperiod: {1, :hour}— 3,600 secondsperiod: {1, :day}—86,400 seconds
Here are a few examples:
# Execute at most 60 jobs per minute (1 job per second equivalent)
my_queue: [rate_limit: [allowed: 60, period: {1, :minute}]]
# Execute at most 10 jobs per 30 seconds
my_queue: [rate_limit: [allowed: 10, period: 30]]
# Execute at most 10 jobs per minute
my_queue: [rate_limit: [allowed: 10, period: {1, :minute}]]
# Execute at most 1000 jobs per hour
my_queue: [rate_limit: [allowed: 1000, period: {1, :hour}]]Using larger time periods allows for smoother tracking of rate limits. For example, expressing "1 job per second" as "60 jobs per minute" provides the same throughput but reduces the granularity of tracking, resulting in more consistent job execution patterns.
Understanding Concurrency Limits
The local, global, or rate limit with the lowest value determines how many jobs are executed
concurrently. For example, with a local_limit of 10 and a global_limit of 20, a single node
will run 10 jobs concurrently. If that same queue had a rate_limit that allowed 5 jobs within
a period, then a single node is limited to 5 jobs.
Rate Limiting Algorithms
Three rate limiting algorithms are available, each with different trade-offs:
:sliding_window(default) — Uses two time buckets with weighted averaging to provide smooth rate limiting. Prevents bursting at window boundaries by gradually transitioning between periods.:fixed_window— Resets the count when each period expires. Simple and predictable, but can allow bursting at window boundaries (e.g.,allowedjobs at 11:59:59, thenallowedmore at 12:00:01).:token_bucket— Tokens refill continuously at a rate ofallowed / periodper second. Allows controlled bursting up toallowedwhile maintaining the overall rate over time. Ideal for APIs that permit short bursts but enforce sustained limits.
Specify the algorithm with the :algorithm option:
# Use fixed windows for simple, predictable resets
my_queue: [rate_limit: [allowed: 100, period: {1, :minute}, algorithm: :fixed_window]]
# Use token bucket for APIs that allow bursting
my_queue: [rate_limit: [allowed: 60, period: {1, :minute}, algorithm: :token_bucket]]Weighted Jobs
By default, each job consumes one unit of rate limit capacity. For jobs that vary in resource usage, you can assign weights so that heavier jobs consume more capacity.
There are three ways to assign weights, in order of precedence:
- Callback — Define a
weight/1callback for runtime calculation based on job data - Job option — Set the weight when creating a specific job
- Worker option — Set a default weight for all jobs from a worker
For simple rate adjustments, set a default heavier weight for the worker:
defmodule MyApp.HeavyWorker do
use Oban.Pro.Worker, rate: [weight: 10]
endThe default can be overridden by passing an option to the worker's new/2 function:
MyApp.HeavyWorker.new(args, rate: [weight: 5])For variable rate consumption, you can calculate the weight dynamically with the weight/1
callback:
defmodule MyApp.BatchWorker do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def weight(%{args: args}), do: length(args["records"])
@impl Oban.Pro.Worker
def process(job), do: # ...
endJobs without any weight configuration default to a weight of 1. See Oban.Pro.Worker for more
details on the weight/1 callback.
Queue Partitioning
In addition to global and rate limits at the queue level, you can partition a queue so that it's treated as multiple queues where concurrency or rate limits apply separately to each partition.
Partitions are specified with a list of fields like :worker, :args, or :meta. When
partitioning by :args, choosing specific keys is highly recommended to keep partitioning
meaningful. Focused partitioning minimizes the amount of data a queue needs to track and
simplifies job-fetching queries.
Configuring Partitions
The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)
Here are a few examples of viable partitioning schemes:
# Partition by worker alone
partition: :worker
# Partition by the `id` and `account_id` from args, ignoring the worker
partition: [args: [:id, :account_id]]
# Partition by worker and the `account_id` key from args
partition: [:worker, args: :account_id]Remember, take care to minimize partition cardinality by using a few keys whenever possible.
Partitioning based on every permutation of your args makes concurrency or rate limits hard to
reason about and can negatively impact queue performance.
Global Partitioning
Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.
Consider the following example:
local_limit: 10, global_limit: [allowed: 1, partition: :worker]The queue is configured to run one job per-worker across every node, but only 10 concurrently on a
single node. That is in contrast to the standard behaviour of global_limit, which would override
the local_limit and only allow 1 concurrent job across every node.
Alternatively, you could partition by a single key:
local_limit: 10, global_limit: [allowed: 1, partition: [args: :tenant_id]]That configures the queue to run one job concurrently across the entire cluster per tenant_id.
Partitioning with Burst Mode
Global partitioning includes an advanced feature called "burst mode" for global limits with partitioning. This feature allows you to maximize throughput by temporarily exceeding per-partition global limits when there are available resources.
Each global partition is typically restricted to the configured allowed value. However, with
burst mode enabled, the system can intelligently allocate more jobs to each active partition,
potentially exceeding the per-partition limit while still respecting the overall queue
concurrency.
This is particularly useful when:
- You have many potential partitions but only a few are active at any given time
- You want to maximize throughput while maintaining some level of fairness between partitions
- You need to ensure your queues aren't sitting idle when capacity is available
Here's an example of a queue that will 5 jobs from a single partition concurrently under load, but can burst up to 100 for a single partition when there is available capacity:
queues: [
exports: [
global_limit: [
allowed: 5,
burst: true,
partition: [args: :tenant_id]
],
local_limit: 100
]
]Bursting still respects the overall global concurrency limit. Using the example above, the queue can only execute 100 of a given partition's jobs concurrently across all nodes, even if there are more available resources.
Rate Limit Partitioning
Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.
For example, to allow one job per-worker, every ten seconds, across every instance of the alpha
queue in your cluster:
local_limit: 10, rate_limit: [allowed: 1, period: 10, partition: :worker]Partition Keys Cache
The Smart engine maintains a cache of available partition keys to optimize performance when fetching jobs. The cache has a configurable time-to-live (TTL) which controls how long partition keys are remembered before needing to be refreshed from the database.
The default TTL is set to 3_000ms, which is suitable for most applications. You can adjust it
by adding configuration in your application's config:
# In config/config.exs
config :oban_pro, Oban.Pro.Partition, keys_cache_ttl: 5_000 # 5 secondsA longer TTL reduces database load but might cause the system to be slower to recognize newly created partition keys. A shorter TTL ensures more up-to-date partition information at the cost of more frequent database queries.
The number of partition keys retrieved is based on the queue's local_limit multiplied by a
configurable factor (default 3). This ensures the query scales appropriately with queue
capacity while keeping result sets manageable:
# In config/config.exs
config :oban_pro, Oban.Pro.Partition, keys_limit_multiplier: 5Partitions are selected fairly by prioritizing those with the earliest scheduled jobs, ensuring work is distributed across active partitions rather than favoring any single partition.
Async Tracking
The results of job execution, e.g. completed, cancelled, etc., are bundled together into a
single transaction to minimize load on an app's Ecto pool and the database.
Bundling updates and reporting them asynchronously dramatically reduces the number of transactions per second. However, async bundling introduces a slight lag (up to 5ms) between job execution finishing and recording the outcome in the database.
Async tracking can be disabled for specific queues with the ack_async option:
queues: [
standard: 30,
critical: [ack_async: false, local_limit: 10]
]Enhanced Unique
The Smart engine uses an alternative mechanism for unique jobs that's designed for speed,
correctness, scalability, and simplicity. Uniqueness is enforced through a unique index that
makes insertion entirely safe between processes and nodes, without the use of advisory locks or
multiple queries.
Unlike standard uniqueness, which is only checked as jobs are inserted, the index-backed version applies for the job's entire lifetime. That prevents race conditions where a job changes states and inadvertently causes a conflict.
When conflicts are detected, the conflicted job, i.e. the one already in the database, is
annotated with uniq_conflict: true.
Safe Hash for Uniqueness
To avoid potential hash collisions when using unique jobs with sub-fields in args, enable
"safe" hashing:
config :oban_pro, Oban.Pro.Utils, safe_hash: trueThis applies to uniq_key, chain_key, and partition_key values stored in job meta.
Note that generated values will not match previous values for configurations using
sub-fields in args.
Period-based Uniqueness Uses Buckets
To leverage unique indexes, period-based uniqueness snaps timestamps to fixed time buckets rather than using a sliding window. The current time is rounded down to the nearest multiple of the period, and uniqueness is enforced within that bucket.
For example, with a period of 15 minutes (unique: [period: {15, :minutes}]) and a job
inserted at 10:21:00, uniqueness applies from 10:15:00 to 10:30:00. A duplicate job
inserted at 10:28:00 would be rejected, but one inserted at 10:31:00 would be allowed
because it falls into the next bucket.
Bulk Inserts
Where the Basic engine requires you to insert unique jobs individually, the Smart engine adds
unique job support to Oban.insert_all/2. No additional configuration is necessary—simply use
insert_all instead for unique jobs.
Oban.insert_all(lots_of_unique_jobs)Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)
list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()The default batch size for unique jobs is 250, and 1_000 for non-unique jobs. Regardless, you
can override with batch_size:
Oban.insert_all(lots_of_jobs, batch_size: 1500)It's also possible to set a custom timeout for batch inserts:
Oban.insert_all(lots_of_jobs, timeout: :infinity)Skipping Conflicts
By default, when a unique conflict is detected during bulk insert, the conflicting job is still
returned with conflict?: true set. This requires updating the existing row to mark it as a
conflict, which involves row-level locking.
For high-throughput scenarios where you don't need information about conflicting jobs, you can
use on_conflict: :skip to bypass locking entirely:
Oban.insert_all(lots_of_unique_jobs, on_conflict: :skip)With this option:
- Conflicting jobs are silently skipped without any database locks
- Only newly inserted jobs are returned in the result list
- Any
replace:options on individual changesets are ignored
This is ideal for "fire and forget" scenarios where you want to insert jobs as fast as possible and don't need to track which jobs already existed.
Automatic Spacing
When inserting a large batch of jobs that can't all execute immediately, you can automatically
space them out over time using the auto_space option. This schedules each batch at increasing
intervals, preventing a flood of jobs from overwhelming your queue.
Oban.insert_all(lots_of_jobs, batch_size: 1000, auto_space: 60)With batch_size: 1000 and auto_space: 60, jobs are scheduled as follows:
- Jobs 1–1000: scheduled immediately
- Jobs 1001–2000: scheduled 60 seconds from now
- Jobs 2001–3000: scheduled 120 seconds from now
- And so on...
The auto_space value can be an integer (seconds) or a duration tuple:
auto_space: 30 # 30 seconds between batches
auto_space: {1, :minute} # 1 minute between batches
auto_space: {5, :minutes} # 5 minutes between batchesNote that auto_space overrides any scheduled_at values set on individual changesets.
Per-Batch Transactions
By default, all batches are inserted within a single transaction. If any batch fails, all
previously inserted jobs are rolled back. For scenarios where you'd rather commit successful
batches even if a later batch fails, use transaction: :per_batch:
Oban.insert_all(lots_of_jobs, batch_size: 1000, transaction: :per_batch)With this option, each batch is committed independently. If batch 3 fails, jobs from batches 1 and 2 remain in the database. Note that on failure, the function still raises an exception and does not return references to the successfully inserted jobs.
Accurate Snooze
Unlike the Basic engine which increments attempts and max_attempts, the Smart engine rolls
back the attempt on snooze. This approach preserves the original max_attempts and records a
snoozed count in meta. As a result, it's simple to differentiate between "real" attempts and
snoozes, and backoff calculation remains accurate regardless of snoozing.
The following process/1 function demonstrates checking a job's meta for a snoozed count:
def process(job) do
case job.meta do
%{"orig_scheduled_at" => unix_microseconds, "snoozed" => snoozed} ->
IO.inspect({snoozed, unix_microseconds}, label: "Times snoozed since")
_ ->
# This job has never snoozed before
end
end
Summary
Types
@type global_limit() :: pos_integer() | [allowed: pos_integer(), partition: partition()]
@type local_limit() :: pos_integer()
@type period() :: pos_integer() | {pos_integer(), unit()}
@type rate_limit() :: [ allowed: pos_integer(), period: period(), partition: partition() ]
@type unit() ::
:second | :seconds | :minute | :minutes | :hour | :hours | :day | :days