Oban.Pro.Engines.Smart (Oban Pro v1.5.0-rc.0)
The Smart engine provides advanced concurrency features, enhanced observability, lighter
weight bulk processing, and provides a foundation for accurate job lifecycle management. As an
Oban.Engine, it is responsible for all non-plugin database interaction, from inserting through
executing jobs.
Major features include:
- Global Concurrency — limit the number of concurrent jobs that run across all nodes
- Rate Limiting — limit the number of jobs that execute within a window of time
- Queue Partitioning — segment a queue so concurrency or rate limits apply separately to each partition
- Async Tracking — bundle job acks (completed, cancelled, etc.) to minimize transactions and reduce load on the database
- Enhanced Unique — selectively use a custom index to accelerate inserting unique jobs safely between processes and nodes
- Bulk Inserts — automatically batch inserts to prevent hitting
database limits, reduce data sent over the wire, and respect
uniqueoptions when usingOban.insert_all/2 - Accurate Snooze — differentiate between attempts with errors and intentional snoozes
Installation
See the Smart Engine section in the adoption guide to get started.
Global Concurrency
Global concurrency limits the number of concurrent jobs that run across all nodes.
Typically the global concurrency limit is local_limit * num_nodes. For example, with three
nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit is present,
and the local_limit is omitted, then the local_limit falls back to the global_limit.
The only way to guarantee that all connected nodes will run exactly one job concurrently is to
set global_limit: 1.
Here are some examples:
# Execute 10 jobs concurrently across all nodes, with up to 10 on a single node
my_queue: [global_limit: 10]
# Execute 10 jobs concurrently, but only 3 jobs on a single node
my_queue: [local_limit: 3, global_limit: 10]
# Execute at most 1 job concurrently
my_queue: [global_limit: 1]Rate Limiting
Rate limiting controls the number of jobs that execute within a period of time.
Rate limiting uses counts for the same queue from all other nodes in the cluster (with or without Distributed Erlang). The limiter uses a sliding window over the configured period to accurately approximate a limit.
Every job execution counts toward the rate limit, regardless of whether the job completes, errors, snoozes, etc.
Without a modifier, the rate_limit period is defined in seconds. However, you can provide a
:second, :minute, :hour or :day modifier to use more intuitive values.
period: 30— 30 secondsperiod: {1, :minute}— 60 secondsperiod: {2, :minutes}— 120 secondsperiod: {1, :hour}— 3,600 secondsperiod: {1, :day}—86,400 seconds
Here are a few examples:
# Execute at most 1 job per second, the lowest possible limit
my_queue: [rate_limit: [allowed: 1, period: 1]]
# Execute at most 10 jobs per 30 seconds
my_queue: [rate_limit: [allowed: 10, period: 30]]
# Execute at most 10 jobs per minute
my_queue: [rate_limit: [allowed: 10, period: {1, :minute}]]
# Execute at most 1000 jobs per hour
my_queue: [rate_limit: [allowed: 1000, period: {1, :hour}]]Understanding Concurrency Limits
The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a
local_limitof 10 and aglobal_limitof 20, a single node will run 10 jobs concurrently. If that same queue had arate_limitthat allowed 5 jobs within a period, then a single node is limited to 5 jobs.
Queue Partitioning
In addition to global and rate limits at the queue level, you can partition a queue so that it's treated as multiple queues where concurrency or rate limits apply separately to each partition.
Partitions are specified with fields and keys, where keys is optional but highly
recommended if you've included :args. Aside from making partitions easier to reason about,
partitioning by keys minimizes the amount of data a queue needs to track and simplifies
job-fetching queries.
Configuring Partitions
The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)
Here are a few examples of viable partitioning schemes:
# Partition by worker alone
partition: :worker
# Partition by the `id` and `account_id` from args, ignoring the worker
partition: [args: [:id, :account_id]]
# Partition by worker and the `account_id` key from args
partition: [:worker, args: :account_id]Remember, take care to minimize partition cardinality by using a few keys whenever possible.
Partitioning based on every permutation of your args makes concurrency or rate limits hard to
reason about and can negatively impact queue performance.
Global Partitioning
Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.
Consider the following example:
local_limit: 10, global_limit: [allowed: 1, partition: :worker]The queue is configured to run one job per-worker across every node, but only 10 concurrently on a
single node. That is in contrast to the standard behaviour of global_limit, which would override
the local_limit and only allow 1 concurrent job across every node.
Alternatively, you could partition by a single key:
local_limit: 10, global_limit: [allowed: 1, partition: [args: :tenant_id]]That configures the queue to run one job concurrently across the entire cluster per tenant_id.
Rate Limit Partitioning
Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.
For example, to allow one job per-worker, every five seconds, across every instance of the alpha
queue in your cluster:
local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: :worker]Async Tracking
The results of job execution, e.g. completed, cancelled, etc., are bundled together into a
single transaction to minimize load on an app's Ecto pool and the database.
Bundling updates and reporting them asynchronously dramatically reduces the number of transactions per second. However, async bundling introduces a slight lag (up to 5ms) between job execution finishing and recording the outcome in the database.
Async tracking can be disabled for specific queues with the ack_async option:
queues: [
standard: 30,
critical: [ack_async: false, local_limit: 10]
]Enhanced Unique
The Smart engine supports an addtional opt-in unique mode designed for speed, correctness,
scalability, and simplicity. The enhanced simple mode supports slightly fewer options while
boosting insert performance 1x-3x, reducing database load, improving memory usage, and staying
correct across multiple processes/nodes.
Here are the three possible modes:
:legacy- allows using numeric period and all the OSS options. Inserting jobs is slower, less efficient, and requires much more memory. This is the default for compatibility with OSS.:hybrid— use the unique-backedsimplemode forperiod: :infinityorunique: true, otherwise fall back to the unsafelegacyversion.:simple- uniqueness is backed by a partial unique index and insertion is entirely safe between processes and nodes. This mode only allowsperiod: :infinityand using other periods or thetimestampoption will raise an error at compile-time.
Partitioned Tables
Only the
legacymode is compatible wiht partitioned tables. The unique index used byhybridandsimplemodes is incompatible with partitioned tables and won't be created.
Configuration
In order for Oban.Pro.Worker to accurately validate options during compilation, the unique
mode must be set at compile-time.
Explicitly set the default legacy mode:
config :oban_pro, unique: :legacyOpt in to using the simple mode for period: :infinity, while sticking with legacy mode for
jobs with a non-infinite periods:
config :oban_pro, unique: :hybridSwtich to simple mode exclusively and prevent using non-infinite periods:
config :oban_pro, unique: :simpleBulk Inserts
Where the Basic engine requires you to insert unique jobs individually, the Smart engine adds
unique job support to Oban.insert_all/2. No additional configuration is necessary—simply use
insert_all instead for unique jobs.
Oban.insert_all(lots_of_unique_jobs)Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)
list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()The default batch size for unique jobs is 250, and 1_000 for non-unique jobs. Regardless, you
can override with batch_size:
Oban.insert_all(lots_of_jobs, batch_size: 1500)It's also possible to set a custom timeout for batch inserts:
Oban.insert_all(lots_of_jobs, timeout: :infinity)Accurate Snooze
Unlike the Basic engine which increments attempts and max_attempts, the Smart engine rolls
back the attempt on snooze. This approach preserves the original max_attempts and records a
snoozed count in meta. As a result, it's simple to differentiate between "real" attempts and
snoozes, and backoff calculation remains accurate regardless of snoozing.
The following process/1 function demonstrates checking a job's meta for a snoozed count:
def process(job) do
case job.meta do
%{"orig_scheduled_at" => unix_microseconds, "snoozed" => snoozed} ->
IO.inspect({snoozed, unix_microseconds}, label: "Times snoozed since")
_ ->
# This job has never snoozed before
end
end
Summary
Types
global_limit()
@type global_limit() :: pos_integer() | [allowed: pos_integer(), partition: partition()]
local_limit()
@type local_limit() :: pos_integer()
partition()
period()
@type period() :: pos_integer() | {pos_integer(), unit()}
rate_limit()
@type rate_limit() :: [ allowed: pos_integer(), period: period(), partition: partition() ]
unit()
@type unit() ::
:second | :seconds | :minute | :minutes | :hour | :hours | :day | :days