Oban Pro Releases
Bug Fixes
-
[BatchManager] Consider all states for batch callback uniquness.
Previously, if a callback failed enough it could be discarded and not considered for subsequent uniquness checks.
Enhancements
- [DynamicPruner] Support
:infinity
as duration in dynamic pruning so users don't have to specify ludicrous values like{999, :years}
.
Bug Fixes
-
[Relay] Attach the
Relay
telemetry handler using module function capture syntax to prevent warnings. -
[DynamicCron] Include
:expression
as an available update option to prevent dialyzer errors. -
[SmartEngine] Preserve existing rate limit fields when scaling or otherwise changing a producer's
meta
values. -
[SmartEngine] Ensure that the total fetch demand is never negative.
When running queues are converted to global mode there may be a defecit between the total jobs and the global limit. In that case we must fetch
0
jobs rather than passing a negative number
Bug Fixes
-
[SmartEngine] Verify the presence of a rate-limit period before calculating window time. This fixes situations where a producer record for the same queue existed, but lacked a rate limit structure.
-
[Producer] Improve legacy Ecto support by lazily calculating the local limit, without assuming the params are coerced into a map.
Bug Fixes
- [SmartEngine] Respect configured log level when performing transaction lock
queries. Previously, the Smart Engine could log every time it made an advisory
lock query with a
global_limit
orrate_limit
set.
Partitioned Rate Limiting
Driven by popular demand, v0.9 brings partitions to the Smart Engine's rate limiter. With partitions, rate limits are applied per-worker, on args, or on a subset of args fields rather than across the entire queue. This enables your application to enforce limits per-customer or respect external throttling, without splitting jobs into multiple queues.
rate_limit: [allowed: 100, period: 5, partition: [fields: [:worker]]
Alternatively, you can partition by the job's account_id
field:
rate_limit: [allowed: 100, period: 5, partition: [fields: [:args], keys: [:account_id]]]
Naturally, a combination of worker
, args
, and any number of keys
works.
Check out the SmartEngine Guide for options and details.
Batch Worker Improvements
Batches are the oldest worker in Pro, and as such, they existed prior to the
meta
field. Finally, that difference is rectified. Building batches on meta
enables a handful of ergonomic improvements and new functionality:
- Forwarding
args
through thebatch_callback_args
option - Heterogeneous batches with an alternative callback module through
the
batch_callback_worker
option - Fetch all jobs in a batch with
stream_batch_jobs/2
for map/reduce processing
IMPORTANT: For in-flight batch callback jobs to run after an upgrade you'll
need to migrate batch_id
and callback
into meta
. Run the following SQL in
a migration immediately prior to, or following, the upgrade.
UPDATE oban_jobs
SET meta = jsonb_set_lax(
jsonb_set(meta, '{batch_id}', args->'batch_id'),
'{callback}',
args->'callback',
true,
'return_target'
)
WHERE state NOT IN ('cancelled', 'completed', 'discarded')
AND args ? 'batch_id'
Workflow Worker Improvements
Workflows got a little ergonomic love, too. Now you can dynamically extend
workflows at runtime with the new append_workflow
function:
def process(%Job{} = job) do
jobs =
job
|> append_workflow(check_deps: false)
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
{:ok, jobs}
end
Note the use of check_deps: false
to prevent dependency validation. To be safe
and check jobs while appending, we'll use the new stream_workflow_jobs/1
function to load all of the previous jobs and feed them in:
def process(%Job{} = job) do
{:ok, jobs} =
MyApp.Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Enum.to_list()
end)
jobs
|> append_workflow()
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
:ok
end
Changes
-
[Oban] Require Oban
~> v2.9
to support the newcancel_all_jobs
engine callback. -
[Oban.Pro.Queue.SmartEngine] Make operations like
refresh
more efficient (less data transport) and more failure tolerant to prevent producer crashes.