Oban.Pro.Plugins.DynamicPrioritizer (Oban Pro v1.7.0)

The DynamicPrioritizer plugin automatically adjusts job priorities to ensure all jobs are eventually processed.

Using mixed priorities in a queue causes certain jobs to execute before others. For example, a queue that processes jobs from various customers may prioritize customers that are in a higher tier or plan. All high priority (0) jobs are guaranteed to run before any with lower priority (1..9), which is wonderful for the higher tier customers but can lead to resource starvation. When there is a constant flow of high priority jobs the lower priority jobs will never get the chance to run.

The DynamicPrioritizer solves this by periodically boosting the priority of jobs that have been waiting longer than a configured threshold. Each cycle, qualifying jobs have their priority decremented by one (e.g. from 3 to 2), gradually converging toward the highest priority.

Using the Plugin

To use the DynamicPrioritizer plugin, add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicPrioritizer]
  ...

Without any additional options the plugin will automatically increase the priority of any jobs that are available for 5 minutes or more, checking once per minute.

Options

  • :after — how long a job must be available before it becomes eligible for reprioritization, in milliseconds. Set to :infinity to disable reprioritization by default (useful when relying entirely on overrides). Defaults to 300_000 (5 minutes).

  • :interval — the number of milliseconds between reprioritization cycles. Defaults to 60_000 (1 minute).

  • :limit — the maximum number of jobs to reprioritize per override or default group in each cycle. Defaults to 10_000.

  • :max_priority — the highest priority (lowest number) that jobs can be boosted to. For example, max_priority: 2 prevents jobs from being boosted above priority 2, reserving 0 and 1 for genuinely high priority work. Defaults to 0.

  • :queue_overrides — a keyword list of per-queue wait thresholds that override :after for specific queues. See "Queue and Worker Overrides" for details.

  • :worker_overrides — a keyword list of per-worker wait thresholds that override :after for specific workers. See "Queue and Worker Overrides" for details.

Queue and Worker Overrides

The :after threshold applies globally to all queues and workers. Overrides let you fine tune reprioritization for specific queues or workers.

Configure the analysis queue to nudge jobs after only 1 minute:

plugins: [{
  Oban.Pro.Plugins.DynamicPrioritizer,
  queue_overrides: [analysis: :timer.minutes(1)]
}]

Disable reprioritization globally while enabling it for a single queue:

plugins: [{
  Oban.Pro.Plugins.DynamicPrioritizer,
  after: :infinity,
  queue_overrides: [analysis: :timer.minutes(1)]
}]

Override on a per-worker basis:

plugins: [{
  Oban.Pro.Plugins.DynamicPrioritizer,
  worker_overrides: [
    "MyApp.HighSLAWorker": :timer.seconds(30),
    "MyApp.LowSLAWorker": :timer.minutes(10)
  ]
}]

Queue and worker overrides can be combined. Note that overrides are applied independently—a job matching both a queue override and a worker override will be boosted by each:

plugins: [{
  Oban.Pro.Plugins.DynamicPrioritizer,
  interval: :timer.minutes(2),
  after: :timer.minutes(5),
  queue_overrides: [media: :timer.minutes(10)],
  worker_overrides: ["MyApp.HighSLAWorker": :timer.seconds(30)]
}]

Instrumenting with Telemetry

The DynamicPrioritizer plugin adds the following metadata to the [:oban, :plugin, :stop] event:

  • :reprioritized_count — the number of jobs reprioritized

Summary

Types

option()

@type option() :: [
  after: timeout(),
  interval: timeout(),
  limit: pos_integer(),
  max_priority: 0..9,
  queue_overrides: [{atom() | String.t(), timeout()}],
  worker_overrides: [{atom() | String.t(), timeout()}]
]