Oban.Pro.Plugins.DynamicPruner (Oban Pro v1.5.0-rc.1)

DynamicPruner enhances the default Pruner plugin's behaviour by allowing you to customize how jobs are retained in the jobs table. Where the Pruner operates on a fixed schedule and treats all jobs the same, with the DynamicPruner, you can use a flexible CRON schedule and provide custom rules for specific queues, workers, and job states.

Using the Plugin

To start using the DynamicPruner add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicPruner]
  ...

Without any additional options the pruner operates in maximum length mode (max_len) and retains a conservative 1,000 completed, cancelled, or discarded jobs. To increase the number of jobs retained you can provide your own mode configuration:

plugins: [{Oban.Pro.Plugins.DynamicPruner, mode: {:max_len, 50_000}}]

Now the pruner will retain the most recent 50,000 jobs instead.

A fixed limit on the number of jobs isn't always ideal. Often you want to retain jobs based on their age instead. For example, if you your application needs to ensure that a duplicate job hasn't been enqueued within the past 24 hours you need to retain jobs for at least 24 hours; a fixed limit simply won't work. For that we can use maximum age (max_age) mode instead:

plugins: [{Oban.Pro.Plugins.DynamicPruner, mode: {:max_age, 60 * 60 * 48}}]

Here we've specified max_age using seconds, where 60 * 60 * 48 is the number of seconds in two days.

Calculating the number of seconds in a period isn't especially readable, particularly when you have numerous max_age declarations in overrides (see below). For clarity you can specify the age's time unit as :second, :minute, :hour, :day or :month. Here is the same 48 hour configuration from above, but specified in terms of days:

plugins: [{Oban.Pro.Plugins.DynamicPruner, mode: {:max_age, {2, :days}}}]

Now you can tell exactly how long jobs should be retained, without reverse calculating how many seconds an expression represents.

Providing Overrides

The mode option is indiscriminate when determining which jobs to prune. It pays no attention to which queue they are in, what worker the job is for, or which state they landed in. The DynamicPruner allows you to specify per-queue, per-worker and per-state overrides that fine tune pruning.

We'll start with a simple example of limiting the total number of retained jobs in the events queue:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  mode: {:max_age, {7, :days}},
  queue_overrides: [events: {:max_len, 1_000}]
}]

With this configuration most jobs will be retained for seven days, but we'll only keep the latest 1,000 jobs in the events queue. We can extend this further and override all of our queues (and omit the default mode entirely):

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  queue_overrides: [
    default: {:max_age, {6, :hours}},
    analysis: {:max_age, {1, :day}},
    events: {:max_age, {10, :minutes}},
    mailers: {:max_age, {2, :weeks}},
    media: {:max_age, {2, :months}}
  ]
}]

When pruning by queue isn't granular enough you can provide overrides by worker instead:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  worker_overrides: [
    "MyApp.BusyWorker": {:max_age, {1, :day}},
    "MyApp.SecretWorker": {:max_age, {1, :second}},
    "MyApp.HistoricWorker": {:max_age, {1, :month}}
  ]
}]

You can also override by state, which allows you to keep discarded jobs for inspection while quickly purging cancelled or successfully completed jobs:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  state_overrides: [
    cancelled: {:max_age, {1, :hour}},
    completed: {:max_age, {1, :day}},
    discarded: {:max_age, {1, :month}}
  ]
}]

Naturally you can mix and match overrides to finely control job retention:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  mode: {:max_age, {7, :days}},
  queue_overrides: [events: {:max_age, {10, :minutes}}],
  state_overrides: [discarded: {:max_age, {2, :days}}],
  worker_overrides: ["MyApp.SecretWorker": {:max_age, {1, :second}}]
}]

Override Precedence

Overrides are applied sequentially, in this order:

  1. Queue
  2. State
  3. Worker
  4. Default

Using the example above, jobs in the events queue are deleted first, followed by jobs in the discarded state, then the MyApp.SecretWorker worker, and finally any other jobs older than 7 days that weren't covered by any overrides.

Using per State Timestamps

Pruning checks the scheduled_at timestamp for optimized, index-backed queries by default. In situations where the scheduled_at timestamp isn't accurate enough to identify prunable jobs, e.g. cancelling large swaths of jobs scheduled far into the future, you can use the by_state_timestamp option for increased accuracy.

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  by_state_timestamp: true,
  mode: {:max_age, {7, :days}}
}]

Preventing Timeouts with Overrides

Worker override queries aren't able to use any of Oban's standard indexes. If you're processing a high volume of jobs, pruning with worker overrides may be extremely slow due to sequential scans. To prevent timeouts, and speed up pruning altogether, you should add a compound index to the oban_jobs table:

create_if_not_exists index(:oban_jobs, [:worker, :state, :id], concurrently: true)

Retaining Jobs Forever

To retain jobs in a queue, state, or for a particular worker forever (without using something like {:max_age, {999, :years}} use :infinity as the length or duration:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  mode: {:max_age, {7, :days}},
  state_overrides: [
    cancelled: {:max_len, :infinity},
    discarded: {:max_age, :infinity}
  ]
}]

Keeping Up With Inserts

With the default settings the DynamicPruner will only delete 10,000 jobs each time it prunes. The limit exists to prevent connection timeouts and excessive table locks. A busy system can easily insert more than 10,000 jobs per minute during standard operation. If you find that jobs are accumulating despite active pruning you can override the limit.

Here we set the delete limit to 25,000 and give it 60 seconds to complete:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  mode: {:max_len, 100_000},
  limit: 25_000,
  timeout: :timer.seconds(60)
}]

Deleting in PostgreSQL is very fast, and the 10k default is rather conservative. Feel free to increase the limit to a number that your system can handle.

Setting a Schedule

By default, pruning happens at the top of every minute based on the CRON schedule * * * * *. You're free to set any CRON schedule you prefer for greater control over when to prune. For example, to prune once an hour instead:

plugins: [{Oban.Pro.Plugins.DynamicPruner, schedule: "0 * * * *"}]

Or, to prune once a day at midnight in your local timezone:

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  limit: 100_000,
  schedule: "0 0 * * *",
  timezone: "America/Chicago",
  timeout: :timer.minutes(1)
}]

Pruning less frequently can reduce load on your system, particularly if you're using multiple overrides. However, be sure to set a higher limit and timeout (as shown above) to compensate for more accumulated jobs.

Executing a Callback Before Delete

Sometimes jobs are a historic record of activity and it's desirable to operate on them before they're deleted. For example, you may want copy some jobs into cold storage prior to completion.

To accomplish this, specify a callback to execute before proceeding with the deletion.:

defmodule DeleteHandler do
  def call(job_ids) do
    # Use the ids at this point, from within a transaction
  end
end

plugins: [{
  Oban.Pro.Plugins.DynamicPruner,
  mode: {:max_age, {7, :days}},
  before_delete: {DeleteHandler, :call, []}
}]

The callback receives a list of the ids for the jobs that are about to be deleted. The callback runs within the same transaction that's used for deletion, and you should keep it quick or move heavy processing to an async process. Note that because it runs in the same transaction as deletion, the jobs won't be available after the callback exits.

To pass in extra arguments as "configuration" you can provide args to the callback MFA:

defmodule DeleteHandler do
  import Ecto.Query

  def call(job_ids, storage_name) do
    jobs = MyApp.Repo.all(where(Oban.Job, [j], j.id in ^job_ids))

    Storage.call(storage_name, jobs)
  end
end

before_delete: {DeleteHandler, :call, [ColdStorage]}

Implementation Notes

Some additional notes about pruning in general and nuances of the DynamicPruner plugin:

  • Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.

  • Pruning is only applied to jobs that are completed, cancelled or discarded (has reached the maximum number of retries or has been manually killed). It'll never delete a new, scheduled, or retryable job.

  • Only a single node will prune at any given time, which prevents potential deadlocks between transactions.

Instrumenting with Telemetry

The DynamicPruner plugin adds the following metadata to the [:oban, :plugin, :stop] event:

  • :pruned_jobs - the jobs that were deleted from the database

Note: jobs only include id, queue, and state fields.

Summary

Types

@type max_age() :: pos_integer() | {pos_integer(), time_unit()}
@type max_len() :: pos_integer()
@type mode() :: {:max_age, max_age()} | {:max_len, max_len()}
@type option() ::
  {:conf, Oban.Config.t()} | {:schedule, String.t()} | {:name, Oban.name()}
@type override() :: state_override() | queue_override() | worker_override()
Link to this type

queue_override()

@type queue_override() :: {atom(), mode()}
Link to this type

state_override()

@type state_override() :: {:completed | :cancelled | :discarded, mode()}
@type time_unit() ::
  :second
  | :seconds
  | :minute
  | :minutes
  | :hour
  | :hours
  | :day
  | :days
  | :week
  | :weeks
  | :month
  | :month
Link to this type

worker_override()

@type worker_override() :: {module(), mode()}