Oban.Pro.Engines.Smart (Oban Pro v1.2.0)

The Smart engine provides advanced features such as truly global concurrency, global rate limiting, queue partitioning, unique bulk inserts, and auto insert batching. Additionally, its enhanced obvservability provides the foundation for accurate pruning with the Oban.Pro.Plugins.DynamicPruner and persistence for Oban.Pro.Plugins.DynamicQueues.

Installation

See the Smart Engine section in the adoption guide to get started. The Producer Migrations contains additional details for more complex setups with multiple instances or prefixes.

Unique/Batched Insert All

Where the Basic engine requires you to insert unique jobs individually, the Smart engine adds unique job support to Oban.insert_all/2. No additional configuration is necessary—simply use insert_all instead for unique jobs.

Oban.insert_all(lots_of_unique_jobs)

Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)

list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()

The default batch size for unique jobs is 250, and 1_000 for non-unique jobs. Regardless, you can override with batch_size:

Oban.insert_all(lots_of_jobs, batch_size: 1500)

It's also possible to set a custom timeout for batch inserts:

Oban.insert_all(lots_of_jobs, timeout: :infinity)

A single batch of jobs is inserted without a transaction. Above that, each batch of jobs is inserted in a single transaction, unless there are 10k total unique jobs to insert. After that threshold each batch is committed in a separate transaction to prevent memory errors. It's possible to control the transaction threshold with xact_limit if you happen to have a tuned database. For example, to set the limit at 20k jobs:

Oban.insert_all(lots_of_jobs, xact_limit: 20_000)

Snooze Attempts

Unlike the Basic engine which increments attempts and max_attempts, the Smart engine rolls back the attempt on snooze. This approach preserves the original max_attempts and records a snoozed count in meta. As a result, it's simple to differentiate between "real" attempts and snoozes, and backoff calculation remains accurate regardless of snoozing.

The following process/1 function demonstrates checking a job's meta for a snoozed count:

def process(job) do
  case job.meta do
    %{"snoozed" => snoozed} ->
      IO.inspect(snoozed, label: "The number of previous snoozes")

    _ ->
      # This job has never snoozed before
  end
end

Global Concurrency

Global concurrency limits the number of concurrent jobs that run across all nodes.

Typically the global concurrency limit is local_limit * num_nodes. For example, with three nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit is present, and the local_limit is omitted, then the local_limit falls back to the global_limit.

The only way to guarantee that all connected nodes will run exactly one job concurrently is to set global_limit: 1.

Here are some examples:

# Execute 10 jobs concurrently across all nodes, with up to 10 on a single node
my_queue: [global_limit: 10]

# Execute 10 jobs concurrently, but only 3 jobs on a single node
my_queue: [local_limit: 3, global_limit: 10]

# Execute at most 1 job concurrently
my_queue: [global_limit: 1]

Rate Limiting

Rate limiting controls the number of jobs that execute within a period of time.

Rate limiting uses counts for the same queue from all other nodes in the cluster (with or without Distributed Erlang). The limiter uses a sliding window over the configured period to accurately approximate a limit.

Every job execution counts toward the rate limit, regardless of whether the job completes, errors, snoozes, etc.

Without a modifier, the rate_limit period is defined in seconds. However, you can provide a :second, :minute, :hour or :day modifier to use more intuitive values.

  • period: 30 — 30 seconds
  • period: {1, :minute} — 60 seconds
  • period: {2, :minutes} — 120 seconds
  • period: {1, :hour} — 3,600 seconds
  • period: {1, :day} —86,400 seconds

Here are a few examples:

# Execute at most 1 job per second, the lowest possible limit
my_queue: [rate_limit: [allowed: 1, period: 1]]

# Execute at most 10 jobs per 30 seconds
my_queue: [rate_limit: [allowed: 10, period: 30]]

# Execute at most 10 jobs per minute
my_queue: [rate_limit: [allowed: 10, period: {1, :minute}]]

# Execute at most 1000 jobs per hour
my_queue: [rate_limit: [allowed: 1000, period: {1, :hour}]]

Understanding Concurrency Limits

The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a local_limit of 10 and a global_limit of 20, a single node will run 10 jobs concurrently. If that same queue had a rate_limit that allowed 5 jobs within a period, then a single node is limited to 5 jobs.

Queue Partitioning

In addition to global and rate limits at the queue level, you can partition a queue so that it's treated as multiple queues where concurrency or rate limits apply separately to each partition.

Partitions are specified with fields and keys, where keys is optional but highly recommended if you've included :args. Aside from making partitions easier to reason about, partitioning by keys minimizes the amount of data a queue needs to track and simplifies job-fetching queries.

Configuring Partitions

The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)

Here are a few examples of viable partitioning schemes:

# Partition by worker alone
partition: :worker

# Partition by the `id` and `account_id` from args, ignoring the worker
partition: [args: [:id, :account_id]]

# Partition by worker and the `account_id` key from args
partition: [:worker, args: :account_id]

Remember, take care to minimize partition cardinality by using a few keys whenever possible. Partitioning based on every permutation of your args makes concurrency or rate limits hard to reason about and can negatively impact queue performance.

Global Partitioning

Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.

Consider the following example:

local_limit: 10, global_limit: [allowed: 1, partition: :worker]

The queue is configured to run one job per-worker across every node, but only 10 concurrently on a single node. That is in contrast to the standard behaviour of global_limit, which would override the local_limit and only allow 1 concurrent job across every node.

Alternatively, you could partition by a single key:

local_limit: 10, global_limit: [allowed: 1, partition: [args: :tenant_id]]

That configures the queue to run one job concurrently across the entire cluster per tenant_id.

Rate Limit Partitioning

Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.

For example, to allow one job per-worker, every five seconds, across every instance of the alpha queue in your cluster:

local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: :worker]

Producer Migrations

For multiple Oban instances you'll need to configure each one to use the Smart engine, otherwise they'll default to the Basic engine.

If you use prefixes, or have multiple instances with different prefixes, you can specify the prefix and create multiple tables in one migration:

use Ecto.Migration

def change do
Oban.Pro.Migrations.Producers.change()
Oban.Pro.Migrations.Producers.change(prefix: "special")
Oban.Pro.Migrations.Producers.change(prefix: "private")
end

The Producers migration also exposes up/0 and down/0 functions if change/0 doesn't fit your usecase.

Summary

Types

Link to this type

global_limit()

@type global_limit() ::
  pos_integer() | [allowed: pos_integer(), partition: partition()]
Link to this type

local_limit()

@type local_limit() :: pos_integer()
@type partition() ::
  :worker
  | {:args, atom()}
  | [:worker | {:args, atom()}]
  | [fields: [:worker | :args], keys: [atom()]]
@type period() :: pos_integer() | {pos_integer(), unit()}
Link to this type

rate_limit()

@type rate_limit() :: [
  allowed: pos_integer(),
  period: period(),
  partition: partition()
]
@type unit() ::
  :second | :seconds | :minute | :minutes | :hour | :hours | :day | :days