This release includes engine and supervisor improvements which require Oban v2.15+
💸 Horizontal Auto-Scaling with DynamicScaler
The new DynamicScaler
plugin monitors queue throughput and horizontally scales cloud
infrastructure to optimize processing. Horizontal scaling is applied at the node level, not the
queue level, so you can distribute processing over more physical hardware.
With auto-scaling you can spin up additional nodes during high traffic events, and pare down to a single node during a lull. Beyond optimizing throughput, scaling can save money in environments with little to no usage at off-peak times, e.g. staging.
DynamicScaler
calculates the optimal scale by predicting the future size of a queue based on
recent trends. You provide an acceptable range of node sizes, which queues to track, and a cloud
module—auto-scaling takes care of the rest (with observability, of course).
It’s designed to integrate with popular cloud infrastructure like AWS, GCP, K8S, and Fly via a simple, flexible behaviour. For example, here we declare auto-scaling rules for two distinct queues:
plugins: [
{DynamicScaler, scalers: [
queue: :reports, range: 0..1, cloud: {MyApp.Cloud, asg: "my-reports-asg"},
queue: :exports, range: 1..4, cloud: {MyApp.Cloud, asg: "my-exports-asg"}
]
}
]
There are also options to filter scaling by a particular queue, optimize responsiveness with scaling steps, and tunable cooldown periods to prevent unnecessary scaling.
See the DynamicScaler docs for a complete reference and a guide on defining cloud modules.
🏗️ Args Schema Macro for Structured Workers
Structured args are indispensable for enforcing an args schema. However, the legacy keyword list
syntax with field: :type
, implicit enums, and an asterisk symbol for required fields was simply
awkward. We’re correcting that awkwardness with a new args_schema/1
macro for defining
structured workers. The args_schema
macro defines a DSL that’s a subset of Ecto’s schema
,
optimized for JSON compatibility and without the need for dedicated changeset functions.
Here’s a taste that demonstrates multiple field types, the required option, enums, and embedded structures:
use Oban.Pro.Worker
alias __MODULE__, as: Args
args_schema do
field :id, :id, required: true
field :mode, :enum, values: ~w(enabled disabled paused)a
embeds_one :data do
field :office_id, :uuid, required: true
field :has_notes, :boolean
field :addons, {:array, :string}
end
embeds_many :address do
field :street, :string
field :city, :string
field :country, :string
end
end
@impl Oban.Pro.Worker
def process(%{args: %Args{id: id, mode: mode} = args) do
%Args.Data{office_id: office_id} = args.data
[%Args.Address{street: street} | _] = args.addresses
...
end
The legacy (and legacy-legacy) syntax is still viable and it generates the appropriate field declarations automatically. However, we strongly recommend updating to the new syntax for your own sanity.
See the Structured Jobs docs for specifics.
🍪 Declarative Chunk Partitioning
Previously, chunk workers executed jobs in groups based on size or timeout, with the grouping always consisting of jobs from the same queue, regardless of worker, args, or other job attributes. However, sometimes there’s a need for more flexibility in grouping jobs based on different criteria.
To address this, we have introduced partitioning, which allows grouping chunks by worker and/or a subset of args or meta. This improvement enables you to methodically compose chunks of jobs with the same args or meta, instead of running a separate queue for each chunk.
Here’s an example that demonstrates using GPT to summarize groups of messages from a particular author:
defmodule MyApp.MessageSummarizer do
use Oban.Pro.Workers.Chunk,
by: [:worker, args: :author_id],
size: 100,
timeout: :timer.minutes(5)
@impl true
def process([%{"author_id" => author_id} | _] = jobs) do
messages =
jobs
|> Enum.map(& &1.args["message_id"])
|> MyApp.Messages.all()
{:ok, summary} = MyApp.GPT.summarize(messages)
# Push the summary
end
end
By leveraging the enhanced partitioning capabilities, you can now effectively group and process jobs based on specific criteria, providing a more streamlined approach to managing your workloads.
Chunk queries have been optimized for tables of any size to compensate for their newfound advanced complexity. As a result, even with hundreds of thousands of available jobs, queries run in milliseconds.
See the Chunk Partitioning docs for details.
🗄️ Batch Performance
Some teams run batches containing upwards of 50k and often run multiple batches simultaneously to the tune of millions of jobs a day. That load level exposed some opportunities for performance tuning that we’re excited to provide for batches of all sizes.
In short, batches are lazier, their queries are faster, and they’ll put less load on your database. If query tuning and performance optimizations are your things, read on for the details!
-
Debounce batch callback queries so that only one database call is made for each batch within a short window of time. By default, batching debounces for 100ms, but it’s configurable at the batch level if you’d prefer to debounce for longer.
-
Only query for the exact details needed by a batch’s supported callbacks. If a batch only has a
cancelled
handler, then no other states are checked. -
Optimize state-checking queries to avoid using exact counts and ignore
completed
jobs entirely, as that’s typically the state with most jobs. -
Use index-powered match queries to find existing callback jobs, again, only for the current batch’s supported callbacks.
Enhancements
-
[DynamicLifeline] Remove transactional wrapper and add a
:timeout
option for rescue and discard queries. -
[Chunk] Apply cancellation to all jobs within a chunk
Only the leader job is registered as “running” by the queue producer. Now a running chunk listens for cancel messages for any job in the chunk and cancels the leader. Without this, cancelling doesn’t apply to the leader and chunk jobs were orphaned.
-
[SmartEngine] Bypass transaction on insert for small batches
Inserting a list of non-unique jobs smaller than the batch size is no longer wrapped in a transaction.
Bug Fixes
-
[Chunk] Monitor chunks with timeouts to prevent orphans
The queue executor uses
:timer.exit_after/2
to kill jobs that exceed a timeout. Then, the queue’s producer catches the:DOWN
message and uses that to ack a job error. Producers aren’t aware of the chunks, so we spawn a sentinel process to monitor the parent process and ack the chunk jobs. -
[SmartEngine] Cast jsonb to text before casting to integer for global tracking
A recent improvement to global tracking broke a previous compatibility trick used for legacy Postgres support. While we officially support PG 12+ now, there’s no reason to break backward compatibility for a simple cast.
-
[SmartEngine] Use a CTE for
fetch_jobs
query to prevent overfetchingThe CTE prevents Postgres from optimizing the subquery and ignoring the
limit
. More details are available in https://github.com/sorentwo/oban/commit/399092159.