Changelog for Oban Pro v0.14
This release includes engine and supervisor improvements which require Oban v2.15+
horizontal-auto-scaling-with-dynamicscaler
๐ธ Horizontal Auto-Scaling with DynamicScaler
The new DynamicScaler
plugin monitors queue throughput and horizontally scales cloud
infrastructure to optimize processing. Horizontal scaling is applied at the node level, not the
queue level, so you can distribute processing over more physical hardware.
With auto-scaling you can spin up additional nodes during high traffic events, and pare down to a single node during a lull. Beyond optimizing throughput, scaling can save money in environments with little to no usage at off-peak times, e.g. staging.
DynamicScaler calculates the optimal scale by predicting the future size of a queue based on recent trends. You provide an acceptable range of node sizes, which queues to track, and a cloud moduleโauto-scaling takes care of the rest (with observability, of course).
It's designed to integrate with popular cloud infrastructure like AWS, GCP, K8S, and Fly via a simple, flexible behaviour. For example, here we declare auto-scaling rules for two distinct queues:
{Oban.Pro.Plugins.DynamicScaler,
scalers: [
[queues: :reports, range: 0..1, cloud: {MyApp.Cloud, asg: "rep-asg"}],
[queues: :exports, range: 1..4, cloud: {MyApp.Cloud, asg: "exp-asg"}]
]}
There are also options to filter scaling by a particular queue, optimize responsiveness with scaling steps, and tunable cooldown periods to prevent unnecessary scaling.
See the DynamicScaler docs for a complete reference and a guide on defining cloud modules.
args-schema-macro-for-structured-workers
๐๏ธ Args Schema Macro for Structured Workers
Structured args are indispensable for enforcing an args schema. However, the legacy keyword list
syntax with field: :type
, implicit enums, and an asterisk symbol for required fields was simply
awkward. We're correcting that awkwardness with a new args_schema/1
macro for defining
structured workers. The args_schema
macro defines a DSL that's a subset of Ecto's schema
,
optimized for JSON compatibility and without the need for dedicated changeset functions.
Here's a taste that demonstrates multiple field types, the required option, enums, and embedded structures:
use Oban.Pro.Worker
alias __MODULE__, as: Args
args_schema do
field :id, :id, required: true
field :mode, :enum, values: ~w(enabled disabled paused)a
embeds_one :data do
field :office_id, :uuid, required: true
field :has_notes, :boolean
field :addons, {:array, :string}
end
embeds_many :address do
field :street, :string
field :city, :string
field :country, :string
end
end
@impl Oban.Pro.Worker
def process(%{args: %Args{id: id, mode: mode} = args) do
%Args.Data{office_id: office_id} = args.data
[%Args.Address{street: street} | _] = args.addresses
...
end
The legacy (and legacy-legacy) syntax is still viable and it generates the appropriate field declarations automatically. However, we strongly recommend updating to the new syntax for your own sanity.
See the Structured Jobs docs for specifics.
delcarative-chunk-partitioning
๐ช Delcarative Chunk Partitioning
Previously, chunk workers executed jobs in groups based on size or timeout, with the grouping always consisting of jobs from the same queue, regardless of worker, args, or other job attributes. However, sometimes there's a need for more flexibility in grouping jobs based on different criteria.
To address this, we have introduced partitioning, which allows grouping chunks by worker and/or a subset of args or meta. This improvement enables you to methodically compose chunks of jobs with the same args or meta, instead of running a separate queue for each chunk.
Here's an example that demonstrates using GPT to summarize groups of messages from a particular author:
defmodule MyApp.MessageSummarizer do
use Oban.Pro.Workers.Chunk,
by: [:worker, args: :author_id],
size: 100,
timeout: :timer.minutes(5)
@impl true
def process([%{"author_id" => author_id} | _] = jobs) do
messages =
jobs
|> Enum.map(& &1.args["message_id"])
|> MyApp.Messages.all()
{:ok, summary} = MyApp.GPT.summarize(messages)
# Push the summary
end
end
By leveraging the enhanced partitioning capabilities, you can now effectively group and process jobs based on specific criteria, providing a more streamlined approach to managing your workloads.
Chunk queries have been optimized for tables of any size to compensate for their newfound advanced complexity. As a result, even with hundreds of thousands of available jobs, queries run in milliseconds.
See the Chunk Partitioning docs for details.
batch-performance
๐๏ธ Batch Performance
Some teams run batches containing upwards of 50k and often run multiple batches simultaneously to the tune of millions of jobs a day. That load level exposed some opportunities for performance tuning that we're excited to provide for batches of all sizes.
In short, batches are lazier, their queries are faster, and they'll put less load on your database. If query tuning and performance optimizations are your things, read on for the details!
Debounce batch callback queries so that only one database call is made for each batch within a short window of time. By default, batching debounces for 100ms, but it's configurable at the batch level if you'd prefer to debounce for longer.
Only query for the exact details needed by a batch's supported callbacks. If a batch only has a
cancelled
handler, then no other states are checked.Optimize state-checking queries to avoid using exact counts and ignore
completed
jobs entirely, as that's typically the state with most jobs.Use index-powered match queries to find existing callback jobs, again, only for the current batch's supported callbacks.
v0-14-2-2023-05-19
v0.14.2 โ 2023-05-19
bug-fixes
Bug Fixes
[SmartEngine] Prevent
fetch_jobs
from ever exceedingmax_attempts
In some situations, a condition to ensure the attempts don't exceed max attempts is still necessary. By checking the attempts outside the CTE we maintain optimal query performance for the large scan that finds jobs, and only apply the check in the small outer query.
[SmartEngine] Use
Oban.Repo.default_options
for all Multi operationsThe
SmartEngine
andChunk
worker extracted a subset of opts from the config, but that subset didn't includetelemetry_options
. To ensure consistency with plainRepo
calls, all Multi operations now useRepo.default_options
.[DynamicLifeline] Prevent rescuing non-leader chunk jobs
Now that chunks record the chunk leader as a third element in
attempted_by
, theDynamicLifeline
could erroneously rescue executing jobs.[DynamicQueues] Safely update dynamic queues via
Oban.scale_queue/2
For queues with a configured
global
orrate
limit option,scale_queue
calls that only included the plainlimit
would crash with an update error.[Worker] Delete
oban_meta
from process dictionary when recording dataAny
oban_meta
data stored in the process dictionary would linger between executions because it wasn't deleted. That caused problems when usingdrain_jobs
because each job runs in the same process.[Producer] Derive
Jason.Encodable
forOban.Pro.Producer
schemaThe
:__meta__
field isn't encodable and triggers an error when passed to a JSON based log.[DynamicQueue] Unwrap
queue_input
typespec forinsert/2
because it is already a list
v0-14-1-2023-04-19
v0.14.1 โ 2023-04-19
bug-fixes-1
Bug Fixes
[Chunk] Apply decrypting and structuring to all jobs in a chunk
Previously, only the chunk leader was decrypted or structured, causing a mismatch of structured args and unusable encrypted args.
[DynamicScaler] Safely handle
Cloud.scale/1
errors and recycle scaler configurationThe
DynamicScaler
crashed when it received an error response from scaling events. Now, it includes anerror
value in the telemetry event metadata and makes it clear that a successful scale must return a newconf
for subsequent scale calls.[Worker] Support options for
embeds_one/3
andembeds_many
macrosAn oversight omitted support for
required: true
variants of the embed macros[Worker] Safely cast
{:array, :uuid}
to array ofbinary_id
In args, a UUID is identical to a binary id, but isn't natively supported by Ecto changesets.
v0-14-0-2023-04-13
v0.14.0 โ 2023-04-13
enhancements
Enhancements
[DynamicLifeline] Remove transactional wrapper and add a
:timeout
option for rescue and discard queries.[Chunk] Apply cancellation to all jobs within a chunk
Only the leader job is registered as "running" by the queue producer. Now a running chunk listens for cancel messages for any job in the chunk and cancels the leader. Without this, cancelling doesn't apply to the leader and chunk jobs were orphaned.
[SmartEngine] Bypass transaction on insert for small batches
Inserting a list of non-unique jobs smaller than the batch size is no longer wrapped in a transaction.
bug-fixes-2
Bug Fixes
[Chunk] Monitor chunks with timeouts to prevent orphans
The queue executor uses
:timer.exit_after/2
to kill jobs that exceed a timeout. Then, the queue's producer catches the:DOWN
message and uses that to ack a job error. Producers aren't aware of the chunks, so we spawn a sentinel process to monitor the parent process and ack the chunk jobs.[SmartEngine] Cast jsonb to text before casting to integer for global tracking
A recent improvement to global tracking broke a previous compatibility trick used for legacy Postgres support. While we officially support PG 12+ now, there's no reason to break backward compatibility for a simple cast.
[SmartEngine] Use a CTE for
fetch_jobs
query to prevent overfetchingThe CTE prevents Postgres from optimizing the subquery and ignoring the
limit
. More details are available in https://github.com/sorentwo/oban/commit/399092159.