Oban Pro Releases
Changes
-
[SmartEngine] Use database connection safety for all SmartEngine calls.
Database connections may be unstable for many reasons: pool contention, database overload, crashes. We shouldn't crash the producer due to a flickering database call.
Bug Fixes
- [DynamicCron] Explicitly declare cron timestamps as
DateTime
to compensate for apps that configureEcto
to useNaiveDateTime
by default.
Changes
-
[Lifeline] The plugin will only run along with the SmartEngine. Running with the BasicEngine is flawed and will cause inadvertent orphan rescues.
You can either switch to the SmartEngine or remove Lifeline from your plugins.
Bug Fixes
- [SmartEngine] Consider the configured prefix for all ack and update options.
Changes
-
[Relay] Restore the ability to use Elixir 1.9 by removing use of the
is_map_key
guard that wasn't introduced until Elixir 1.10. -
[SmartEngine] Apply jitter and more selective use of the mutex used for global coordination. The engine also avoids touching or reloading producer records to minimize connection use.
-
[Workflow] Improve deps check performance by using an index powered operator. On instances with millions of jobs this may improve performance ~600x.
Pluggable Smart Engine
Pro's SmartEngine
enables truly global concurrency and global rate limiting by
handling demand at the queue level, without any conflict or churn. The engine
uses centralized producer records to coordinate with minimal load on the
database.
config :my_app, Oban,
engine: Oban.Pro.Queue.SmartEngine,
queues: [
alpha: 1,
gamma: [global_limit: 1],
delta: [local_limit: 2, global_limit: 5],
kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
],
...
Relay Plugin
The Relay plugin lets you insert and await the results of jobs locally or remotely, across any number of nodes, i.e. persistent distributed tasks. Once the plugin is running, you can seamlessly distribute oban jobs and await the results synchronously:
alias Oban.Pro.Plugins.Relay
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many(timeout: :timer.seconds(1))
# [{:ok, 2}, {:ok, 4}, {:ok, 6}]
Chunk Worker
Process jobs "broadway style", in groups based on size or a timeout, but with the robust error handling semantics of Oban. Chunking operates at the worker level, allowing many chunks to run in parallel within the same queue.
defmodule MyApp.ChunkWorker do
use Oban.Pro.Workers.Chunk, queue: :alpha, size: 10, timeout: :timer.seconds(5)
@impl Chunk
def process(jobs) do
jobs
|> Enum.map(& &1.args)
|> Business.process_messages()
:ok
end
end
Pro Worker Testing Module
Testing batch, workflows and chunk workers can be tricky due to how they
override the perform/1
function. The new Oban.Pro.Testing
module provides a
process_job/2,3
function that mirrors the functionality of perform_job/2,3
in the base Oban.Testing
module.
With process_job/2,3
you can test pro workers in isolation, without worrying
about whether a manager is running or the state of a workflow. To start using
it, import it into your test case or test file:
import Oban.Pro.Testing
Then test worker logic in isolation:
assert :ok = process_job(MyApp.ProWorker, %{"id" => 1})
Changed
-
[Oban.Pro.Plugins.WorkflowManager] This plugin is no longer necessary and can be safely removed from your configuration.
-
Bump Oban dependency to
~> 2.6