Oban.Pro.Engines.Smart (Oban Pro v1.5.0-rc.1)

The Smart engine provides advanced concurrency features, enhanced observability, lighter weight bulk processing, and provides a foundation for accurate job lifecycle management. As an Oban.Engine, it is responsible for all non-plugin database interaction, from inserting through executing jobs.

Major features include:

  • Global Concurrency — limit the number of concurrent jobs that run across all nodes
  • Rate Limiting — limit the number of jobs that execute within a window of time
  • Queue Partitioning — segment a queue so concurrency or rate limits apply separately to each partition
  • Async Tracking — bundle job acks (completed, cancelled, etc.) to minimize transactions and reduce load on the database
  • Enhanced Unique — selectively use a custom index to accelerate inserting unique jobs safely between processes and nodes
  • Bulk Inserts — automatically batch inserts to prevent hitting database limits, reduce data sent over the wire, and respect unique options when using Oban.insert_all/2
  • Accurate Snooze — differentiate between attempts with errors and intentional snoozes

Installation

See the Smart Engine section in the adoption guide to get started.

Global Concurrency

Global concurrency limits the number of concurrent jobs that run across all nodes.

Typically the global concurrency limit is local_limit * num_nodes. For example, with three nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit is present, and the local_limit is omitted, then the local_limit falls back to the global_limit.

The only way to guarantee that all connected nodes will run exactly one job concurrently is to set global_limit: 1.

Here are some examples:

# Execute 10 jobs concurrently across all nodes, with up to 10 on a single node
my_queue: [global_limit: 10]

# Execute 10 jobs concurrently, but only 3 jobs on a single node
my_queue: [local_limit: 3, global_limit: 10]

# Execute at most 1 job concurrently
my_queue: [global_limit: 1]

Rate Limiting

Rate limiting controls the number of jobs that execute within a period of time.

Rate limiting uses counts for the same queue from all other nodes in the cluster (with or without Distributed Erlang). The limiter uses a sliding window over the configured period to accurately approximate a limit.

Every job execution counts toward the rate limit, regardless of whether the job completes, errors, snoozes, etc.

Without a modifier, the rate_limit period is defined in seconds. However, you can provide a :second, :minute, :hour or :day modifier to use more intuitive values.

  • period: 30 — 30 seconds
  • period: {1, :minute} — 60 seconds
  • period: {2, :minutes} — 120 seconds
  • period: {1, :hour} — 3,600 seconds
  • period: {1, :day} —86,400 seconds

Here are a few examples:

# Execute at most 1 job per second, the lowest possible limit
my_queue: [rate_limit: [allowed: 1, period: 1]]

# Execute at most 10 jobs per 30 seconds
my_queue: [rate_limit: [allowed: 10, period: 30]]

# Execute at most 10 jobs per minute
my_queue: [rate_limit: [allowed: 10, period: {1, :minute}]]

# Execute at most 1000 jobs per hour
my_queue: [rate_limit: [allowed: 1000, period: {1, :hour}]]

Understanding Concurrency Limits

The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a local_limit of 10 and a global_limit of 20, a single node will run 10 jobs concurrently. If that same queue had a rate_limit that allowed 5 jobs within a period, then a single node is limited to 5 jobs.

Queue Partitioning

In addition to global and rate limits at the queue level, you can partition a queue so that it's treated as multiple queues where concurrency or rate limits apply separately to each partition.

Partitions are specified with fields and keys, where keys is optional but highly recommended if you've included :args. Aside from making partitions easier to reason about, partitioning by keys minimizes the amount of data a queue needs to track and simplifies job-fetching queries.

Configuring Partitions

The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)

Here are a few examples of viable partitioning schemes:

# Partition by worker alone
partition: :worker

# Partition by the `id` and `account_id` from args, ignoring the worker
partition: [args: [:id, :account_id]]

# Partition by worker and the `account_id` key from args
partition: [:worker, args: :account_id]

Remember, take care to minimize partition cardinality by using a few keys whenever possible. Partitioning based on every permutation of your args makes concurrency or rate limits hard to reason about and can negatively impact queue performance.

Global Partitioning

Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.

Consider the following example:

local_limit: 10, global_limit: [allowed: 1, partition: :worker]

The queue is configured to run one job per-worker across every node, but only 10 concurrently on a single node. That is in contrast to the standard behaviour of global_limit, which would override the local_limit and only allow 1 concurrent job across every node.

Alternatively, you could partition by a single key:

local_limit: 10, global_limit: [allowed: 1, partition: [args: :tenant_id]]

That configures the queue to run one job concurrently across the entire cluster per tenant_id.

Rate Limit Partitioning

Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.

For example, to allow one job per-worker, every five seconds, across every instance of the alpha queue in your cluster:

local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: :worker]

Async Tracking

The results of job execution, e.g. completed, cancelled, etc., are bundled together into a single transaction to minimize load on an app's Ecto pool and the database.

Bundling updates and reporting them asynchronously dramatically reduces the number of transactions per second. However, async bundling introduces a slight lag (up to 5ms) between job execution finishing and recording the outcome in the database.

Async tracking can be disabled for specific queues with the ack_async option:

queues: [
  standard: 30,
  critical: [ack_async: false, local_limit: 10]
]

Enhanced Unique

The Smart engine supports an addtional opt-in unique mode designed for speed, correctness, scalability, and simplicity. The enhanced simple mode supports slightly fewer options while boosting insert performance 1x-3x, reducing database load, improving memory usage, and staying correct across multiple processes/nodes.

Here are the three possible modes:

  • :legacy - allows using numeric period and all the OSS options. Inserting jobs is slower, less efficient, and requires much more memory. This is the default for compatibility with OSS.

  • :hybrid — use the unique-backed simple mode for period: :infinity or unique: true, otherwise fall back to the unsafe legacy version.

  • :simple - uniqueness is backed by a partial unique index and insertion is entirely safe between processes and nodes. This mode only allows period: :infinity and using other periods or the timestamp option will raise an error at compile-time.

Partitioned Tables

Only the legacy mode is compatible wiht partitioned tables. The unique index used by hybrid and simple modes is incompatible with partitioned tables and won't be created.

Configuration

In order for Oban.Pro.Worker to accurately validate options during compilation, the unique mode must be set at compile-time.

Explicitly set the default legacy mode:

config :oban_pro, unique: :legacy

Opt in to using the simple mode for period: :infinity, while sticking with legacy mode for jobs with a non-infinite periods:

config :oban_pro, unique: :hybrid

Swtich to simple mode exclusively and prevent using non-infinite periods:

config :oban_pro, unique: :simple

Bulk Inserts

Where the Basic engine requires you to insert unique jobs individually, the Smart engine adds unique job support to Oban.insert_all/2. No additional configuration is necessary—simply use insert_all instead for unique jobs.

Oban.insert_all(lots_of_unique_jobs)

Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)

list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()

The default batch size for unique jobs is 250, and 1_000 for non-unique jobs. Regardless, you can override with batch_size:

Oban.insert_all(lots_of_jobs, batch_size: 1500)

It's also possible to set a custom timeout for batch inserts:

Oban.insert_all(lots_of_jobs, timeout: :infinity)

Accurate Snooze

Unlike the Basic engine which increments attempts and max_attempts, the Smart engine rolls back the attempt on snooze. This approach preserves the original max_attempts and records a snoozed count in meta. As a result, it's simple to differentiate between "real" attempts and snoozes, and backoff calculation remains accurate regardless of snoozing.

The following process/1 function demonstrates checking a job's meta for a snoozed count:

def process(job) do
  case job.meta do
    %{"orig_scheduled_at" => unix_microseconds, "snoozed" => snoozed} ->
      IO.inspect({snoozed, unix_microseconds}, label: "Times snoozed since")

    _ ->
      # This job has never snoozed before
  end
end

Summary

Types

Link to this type

global_limit()

@type global_limit() ::
  pos_integer() | [allowed: pos_integer(), partition: partition()]
Link to this type

local_limit()

@type local_limit() :: pos_integer()
@type partition() ::
  :worker
  | {:args, atom()}
  | [:worker | {:args, atom()}]
  | [fields: [:worker | :args], keys: [atom()]]
@type period() :: pos_integer() | {pos_integer(), unit()}
Link to this type

rate_limit()

@type rate_limit() :: [
  allowed: pos_integer(),
  period: period(),
  partition: partition()
]
@type unit() ::
  :second | :seconds | :minute | :minutes | :hour | :hours | :day | :days