Adoption

Oban Pro layers enhanced functionality on top of Oban through extensions, plugins, and workers. Those enhancements are all optional and require a little configuration to enable.

This guide will walk you through the minimum configuration changes you should make to start utilizing Pro along with some optional suggestions to hint at what's possible.

1. Smart Engine

The Smart engine is the brains behind much of Pro's advanced capabilities and added reliability. It brings global concurrency limits, distributed rate limiting, partitioned limiting, unique bulk job inserts, and precise orphaned job rescuing.

The Smart engine utilizes centralized producer records in an oban_producers table to coordinate between nodes with minimal load on the database (and without any reliance on Distributed Erlang clustering).

To start, create a migration to create an oban_producers table:

$ mix ecto.gen.migration add_oban_producers

Within the migration module:

use Ecto.Migration

defdelegate change, to: Oban.Pro.Migrations.Producers

Next, update your config to use the Smart engine:

  config :my_app, Oban,
+   engine: Oban.Pro.Engines.Smart,
    repo: MyApp.Repo
    ...

πŸ’‘ Explore Queue Options

2. DynamicLifeline

The DynamicLifeline) plugin is an enhancement over the basic Oban.Plugins.Lifeline plugin, which uses producer records to rescue orphaned jobs, i.e., jobs that are stuck in the executing state because the node shut down before the job could finish. Producer-based rescuing is entirely accurate and doesn't suffer from erroneous rescues like basic time-based rescuing.

Without the DynamicLifeline plugin you may need to manually rescue jobs stuck in the executing state, or rescue workflows stuck jobs stuck in the scheduled state.

  config :my_app, Oban,
    engine: Oban.Pro.Engines.Smart,
-   plugins: [Oban.Plugins.Lifeline]
+   plugins: [Oban.Pro.Plugins.DynamicLifeline]
    ...

By default, the plugin rescues orphaned jobs 1 minute after the queue shuts down.

πŸ’‘ Explore Plugins

  • Use the DynamicPartitioner to switch to a partitioned table for optimized query performance, minimal database bloat, and efficient historic job pruning.

  • Adapt basic cron to DynamicCron to make add, update, and remove cron jobs at runtime.

  • Swap to the DynamicPruner to customize how long jobs in various states, queues, or from particular workers linger.

  • Use DynamicQueues to define queues and reconfigure queues at runtime, and have those changes persist between restarts.

3. Pro.Worker

The Oban.Pro.Worker is a replacement for Oban.Worker with expanded capabilities such as encryption, enforced structure, output recording, and execution hooks.

Upgrade all of your workers to Pro workers by switching out the use module and replacing perform/1 with process/1:

  def MyApp.Worker do
-   use Oban.Worker
+   use Oban.Pro.Worker

-   @impl Oban.Worker
-   def perform(%Job{} = job) do
-     # Do stuff with the job
-   end
+   @impl Oban.Pro.Worker
+   def process(%Job{} = job) do
+     # Do stuff with the job
+   end
  end

Only Pro workers support execution hooks, which are especially helpful as a reliable alternative to telemetry for error reporting. Once you've modified existing workers you can define a global error hook (be sure to remove any existing telemetry backed error hooks):

defmodule MyApp.ErrorHook do
  def after_process(state, job) when state in [:discard, :error] do
    error = job.unsaved_error
    extra = Map.take([:attempt, :id, :args, :max_attempts, :meta, :queue, :worker])

    Sentry.capture_exception(error.reason, stacktrace: error.stacktrace, extra: extra)
  end

  def after_process(_state, _job), do: :ok
end

Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)

πŸ’‘ Explore Pro Workers

  • Validate args on insert and atomize them during execution with structured jobs.

  • Stash a job's return value to retrieve it later manually or as part of a workflow with recorded jobs.

  • Store all job data at rest with encrypted jobs so that sensitive data can't be seen in the clear.

  • Execute callbacks synchronously, from within the job's process after jobs finish executing with worker hooks

  • Process jobs in groups while tracking overall progress with Batches.

  • Compose jobs together with arbitrary dependencies using Workflows.

  • Process jobs in strict sequential order regardless of retries using Chains

4. Oban.Pro.Testing

Switch from Oban.Testing to Oban.Pro.Testing to more easily test workers, drain queues reliably, supervise test instances, and make assertions about enqeueud jobs.

 defmodule MyApp.Case do
   use ExUnit.CaseTemplate

   using do
     quote do
-      use Oban.Testing, repo: MyApp.Repo
+      use Oban.Pro.Testing, repo: MyApp.Repo
     end
   end
 end

Continue with the testing guide for additional setup tips, or skip straight into the Oban.Pro.Testing docs to explore all of the testing functions.