Adoption
Oban Pro layers enhanced functionality on top of Oban through extensions, plugins, and workers. Those enhancements are all optional and require a little configuration to enable.
This guide will walk you through the minimum configuration changes you should make to start utilizing Pro along with some optional suggestions to hint at what's possible.
1. Smart Engine
The Smart engine is the brains behind much of Pro's advanced capabilities and added reliability. It brings global concurrency limits, distributed rate limiting, partitioned limiting, unique bulk job inserts, and precise orphaned job rescuing.
The Smart
engine utilizes centralized producer records in an oban_producers
table to coordinate
between nodes with minimal load on the database (and without any reliance on Distributed Erlang
clustering).
To start, create a migration to create an oban_producers
table:
$ mix ecto.gen.migration add_oban_producers
Within the migration module:
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.Producers
Next, update your config to use the Smart
engine:
config :my_app, Oban,
+ engine: Oban.Pro.Engines.Smart,
repo: MyApp.Repo
...
π‘ Explore Queue Options
Apply global concurrency limits across all nodesβe.g. a genuinely global
limit: 1
Throttle queue throughput by rate limiting to a maximum number of jobs within a period of time.
Partition global or rate limits within a queue by worker, args, or args sub-keys.
2. DynamicLifeline
The DynamicLifeline) plugin is an enhancement over the
basic Oban.Plugins.Lifeline
plugin, which uses producer records to rescue orphaned jobs, i.e.,
jobs that are stuck in the executing
state because the node shut down before the job could
finish. Producer-based rescuing is entirely accurate and doesn't suffer from erroneous rescues
like basic time-based rescuing.
Without the DynamicLifeline
plugin you may need to manually rescue jobs stuck in the executing
state, or rescue workflows stuck jobs stuck in the scheduled
state.
config :my_app, Oban,
engine: Oban.Pro.Engines.Smart,
- plugins: [Oban.Plugins.Lifeline]
+ plugins: [Oban.Pro.Plugins.DynamicLifeline]
...
By default, the plugin rescues orphaned jobs 1 minute after the queue shuts down.
π‘ Explore Plugins
Use the DynamicPartitioner to switch to a partitioned table for optimized query performance, minimal database bloat, and efficient historic job pruning.
Adapt basic cron to DynamicCron to make add, update, and remove cron jobs at runtime.
Swap to the DynamicPruner to customize how long jobs in various states, queues, or from particular workers linger.
Use DynamicQueues to define queues and reconfigure queues at runtime, and have those changes persist between restarts.
3. Pro.Worker
The Oban.Pro.Worker
is a replacement for Oban.Worker
with expanded capabilities such as
encryption, enforced structure, output recording, and execution hooks.
Upgrade all of your workers to Pro workers by switching out the use
module and replacing
perform/1
with process/1
:
def MyApp.Worker do
- use Oban.Worker
+ use Oban.Pro.Worker
- @impl Oban.Worker
- def perform(%Job{} = job) do
- # Do stuff with the job
- end
+ @impl Oban.Pro.Worker
+ def process(%Job{} = job) do
+ # Do stuff with the job
+ end
end
Only Pro workers support execution hooks, which are especially helpful as a reliable alternative
to telemetry
for error reporting. Once you've modified existing workers you can define a global
error hook (be sure to remove any existing telemetry backed error hooks):
defmodule MyApp.ErrorHook do
def after_process(state, job) when state in [:discard, :error] do
error = job.unsaved_error
extra = Map.take([:attempt, :id, :args, :max_attempts, :meta, :queue, :worker])
Sentry.capture_exception(error.reason, stacktrace: error.stacktrace, extra: extra)
end
def after_process(_state, _job), do: :ok
end
Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)
π‘ Explore Pro Workers
Validate args on insert and atomize them during execution with structured jobs.
Stash a job's return value to retrieve it later manually or as part of a workflow with recorded jobs.
Store all job data at rest with encrypted jobs so that sensitive data can't be seen in the clear.
Execute callbacks synchronously, from within the job's process after jobs finish executing with worker hooks
Process jobs in groups while tracking overall progress with Batches.
Compose jobs together with arbitrary dependencies using Workflows.
Process jobs in strict sequential order regardless of retries using Chains
4. Oban.Pro.Testing
Switch from Oban.Testing
to Oban.Pro.Testing
to more easily test workers, drain queues
reliably, supervise test instances, and make assertions about enqeueud jobs.
defmodule MyApp.Case do
use ExUnit.CaseTemplate
using do
quote do
- use Oban.Testing, repo: MyApp.Repo
+ use Oban.Pro.Testing, repo: MyApp.Repo
end
end
end
Continue with the testing guide for additional setup tips, or skip straight into the
Oban.Pro.Testing
docs to explore all of the testing functions.