Smart Engine
The SmartEngine
is an alternative to Oban's built-in Basic
engine that
enables advanced features such as:
- Global concurrency across nodes
- Rate limiting across nodes
- Partitioning by worker, args, or specific fields
- Unique bulk inserts
- Batched bulk inserts
- Resiliency against database connection errors
- Enhanced observability
installation
Installation
The SmartEngine
relies on centralized producer records in an oban_producers
table to coordinate between nodes with minimal load on the database. To start,
create a migration to create an oban_producers
table:
$ mix ecto.gen.migration add_oban_producers
Within the migration module:
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.Producers
If you have multiple Oban instances or use prefixes, you can specify the prefix and create multiple tables in one migration:
use Ecto.Migration
def change do
Oban.Pro.Migrations.Producers.change()
Oban.Pro.Migrations.Producers.change(prefix: "special")
Oban.Pro.Migrations.Producers.change(prefix: "private")
end
The Producers
migration also exposes up/0
and down/0
functions if
change/0
doesn't fit your usecase.
Next, update your config to use the SmartEngine
:
config :my_app, Oban,
engine: Oban.Pro.Queue.SmartEngine,
...
If you have multiple Oban instances you'll need to configure each one to use the
SmartEngine
, otherwise they'll default to the Basic
engine.
unique-bulk-insert
Unique Bulk Insert
Where the Basic
engine requires you to insert unique jobs individually, the
SmartEngine
adds unique job support to Oban.insert_all/2
. No additional
configuration is necessary—simply use insert_all
instead for unique jobs.
Oban.insert_all(lots_of_unique_jobs)
Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)
list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()
The default batch size for unique jobs is 250
, and 1_000
for non-unique
jobs. Regardless, you can override with batch_size
:
Oban.insert_all(lots_of_jobs, batch_size: 1500)
It's also possible to set a custom timeout for batch inserts:
Oban.insert_all(lots_of_jobs, timeout: :infinity)
A single batch of jobs is inserted without a transaction. Above that, each batch of jobs is
inserted in a single transaction, unless there are 10k total unique jobs to insert. After that
threshold each batch is committed in a separate transaction to prevent memory errors. It's
possible to control the transaction threshold with xact_limit
if you happen to have a tuned
database. For example, to set the limit at 20k jobs:
Oban.insert_all(lots_of_jobs, xact_limit: 20_000)
configuring-queues
Configuring Queues
With the SmartEngine
in place, you can define global concurrency limits and
rate limits at the queue level. The engine supports the basic queue: limit
format as well as the expanded format.
Let's look at a few examples:
queues: [
alpha: 1,
gamma: [global_limit: 1],
delta: [local_limit: 2, global_limit: 5],
kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
]
local_limit
— This limits the number of concurrent jobs that run within a single node. Typically the global concurrency limit islocal_limit * num_nodes
. For example, with three nodes and a local limit of 10, you'll have a global limit of 30. If aglobal_limit
is present, and thelocal_limit
is omitted, then thelocal_limit
falls back to theglobal_limit
.global_limit
— This limits the number of concurrent jobs that run across all nodes. The queue will process at most the local limit or global limit, whichever is lower. The only way to guarantee that all connected nodes will run exactly one job concurrently is to setglobal_limit: 1
.rate_limit
— This limits the number of jobs that are executed, regardless of the result, within a sliding window of time, across all nodes. Rate limiting uses counts from all other producer records for the same queue in the cluster. The limiter uses a sliding window over the configured period to accurately approximate a limit.
Understanding Concurrency Limits
The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a
local_limit
of 10 and aglobal_limit
of 20, a single node will run 10 jobs concurrently. If that same queue had arate_limit
that allowed 5 jobs within a period, then a single node is limited to 5 jobs.
rate-limit-periods
Rate Limit Periods
Without a modifier, the rate_limit
period is defined in seconds. However, you
can provide a :second
, :minute
, :hour
or :day
modifier to use more
intuitive values. Here are a few examples:
period: 30
— 30 secondsperiod: {1, :minute}
— 60 secondsperiod: {2, :minutes}
— 120 secondsperiod: {1, :hour}
— 3,600 secondsperiod: {1, :day}
—86,400 seconds
partitioning
Partitioning
In addition to global and rate limits at the queue level, you can partition
limits by worker
, args
, or specific keys within a queue.
Partitions are specified with fields
and keys
, where keys
is optional but
highly recommended if you've included :args
. Aside from making partitions
easier to reason about, partitioning by keys minimizes the amount of data a
queue needs to track and simplifies job-fetching queries.
configuring-partitions
Configuring Partitions
@type partition :: [fields: [:args | :worker], keys: [atom()]]
The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)
Here are a few examples of viable partitioning schemes:
# Partition by worker alone
partition: [fields: [:worker]]
# Partition by both worker and all args
partition: [fields: [:worker, :args]]
# Partition by the `id` and `account_id` keys
partition: [fields: [:args], keys: [:id, :account_id]]
# Partition by worker, and `account_id` key
partition: [fields: [:worker, :args], keys: [:account_id]]
Remember, take care to minimize partition cardinality by using keys
whenever
possible.
global-partitioning
Global Partitioning
@type global_limit :: pos_integer() | [allowed: pos_integer(), partition: partition()]
Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.
Consider the following example:
local_limit: 10, global_limit: [allowed: 1, partition: [fields: [:worker]]]
The queue is configured to run one job per-worker across every node, but only 10
concurrently on a single node. That is in contrast to the standard behaviour of
global_limit
, which would override the local_limit
and only allow 1
concurrent job across every node.
rate-limit-partitioning
Rate Limit Partitioning
@type rate_limit :: [allowed: pos_integer(), period: perioud(), partition: partition()]
Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.
For example, to allow one job per-worker, every five seconds, across every
instance of the alpha
queue in your cluster:
local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: [fields: [:worker]]]