Oban.Pro.RateLimit (Oban Pro v1.7.0-rc.0)

Programmatic API for inspecting and manipulating rate limit state.

Rate limits are normally managed automatically by the Smart engine as jobs execute. This module provides functions to interact with rate limits outside of normal job execution: checking available capacity, manually consuming tokens, and resetting state.

All functions in this module require the Smart engine and a queue configured with rate_limit options.

Checking Available Capacity

Use available/2 to check how much capacity remains before performing operations:

case Oban.Pro.RateLimit.available(:some_api) do
  {:ok, capacity} when capacity >= 10 ->
    make_api_calls(10)

  {:ok, capacity} when capacity > 0 ->
    make_api_calls(capacity)

  {:ok, 0} ->
    {:snooze, 60}
end

For partitioned rate limits, check a specific partition with a job's computed partition_key:

Oban.Pro.RateLimit.available(:some_api, partition: job.meta["partition_key"])

Manual Consumption

Use consume/3 when you need to track rate-limited operations that happen outside of job execution. This ensures the rate limit reflects all usage, not just job execution. Consumption can even track operations entirely outside of Oban:

def handle_webhook(conn, params) do
  :ok = Oban.Pro.RateLimit.consume(:api_calls, 1)
  result = ExternalAPI.process(params)
  json(conn, result)
end

Resetting State

Use reset/2 to clear all rate limit tracking. This is primarily for recovery after configuration changes when limits should be cleared:

:ok = Oban.Pro.RateLimit.reset(:api_calls)

See Also

Summary

Functions

Check the available rate limit capacity for a queue.

Manually consume rate limit capacity for a queue.

Reset the rate limit state for a queue.

Execute a function after atomically reserving rate limit capacity.

Types

option()

(since 1.7.0)
@type option() :: {:oban, Oban.name()} | {:partition, String.t()}

queue()

(since 1.7.0)
@type queue() :: atom() | String.t()

wait_option()

(since 1.7.0)
@type wait_option() :: {:timeout, timeout()} | {:interval, pos_integer()}

Functions

available(queue, opts \\ [])

(since 1.7.0)
@spec available(queue(), [option()]) ::
  {:ok, non_neg_integer()} | {:error, :no_rate_limit | :queue_not_found}

Check the available rate limit capacity for a queue.

Returns the total available capacity across all producers for the queue, calculated by merging window states from all active producers.

Options

  • :oban - The Oban instance name. Defaults to Oban.
  • :partition - The partition key to check. Defaults to "*" (the global partition).

Examples

Check capacity for a queue:

{:ok, capacity} = Oban.Pro.RateLimit.available(:my_queue)

Check capacity for a specific partition:

{:ok, capacity} = Oban.Pro.RateLimit.available(:my_queue, partition: job.meta["partition_key"])

consume(queue, amount, opts \\ [])

(since 1.7.0)
@spec consume(queue(), pos_integer(), [option()]) ::
  :ok | {:error, :insufficient_capacity | :no_rate_limit | :queue_not_found}

Manually consume rate limit capacity for a queue.

Consumption is applied to the producer with the most available capacity. If the requested amount exceeds a single producer's capacity, consumption is spread across multiple producers.

Options

  • :oban - The Oban instance name. Defaults to Oban.
  • :partition - The partition key to consume from. Defaults to "*" (the global partition).
  • :require_full - When true, returns {:error, :insufficient_capacity} if the full amount can't be consumed. Defaults to false, which consumes as much as available.

Examples

Consume 5 units from the default partition

Oban.Pro.RateLimit.consume(:my_queue, 5)

Consume from a specific partition

Oban.Pro.RateLimit.consume(:my_queue, 3, partition: job.meta["partition_key"])

Use a named Oban instance

Oban.Pro.RateLimit.consume(:my_queue, 1, oban: MyApp.Oban)

reset(queue, opts \\ [])

(since 1.7.0)
@spec reset(queue(), [option()]) :: :ok | {:error, :no_rate_limit | :queue_not_found}

Reset the rate limit state for a queue.

This clears all window data and resets the window time for all producers on the queue. Tracking is cleared across all partitions for partitioned queues.

Options

  • :oban - The Oban instance name. Defaults to Oban.

Examples

Reset the rate limit for a queue:

:ok = Oban.Pro.RateLimit.reset(:my_queue)

Reset the rate limit for a queue:

:ok = Oban.Pro.RateLimit.reset(:my_queue, oban: MyApp.Oban)

with_quota(queue, amount, fun, opts \\ [])

(since 1.7.0)
@spec with_quota(queue(), pos_integer(), (-> result), [option() | wait_option()]) ::
  {:ok, result} | {:error, :timeout | :no_rate_limit | :queue_not_found}
when result: term()

Execute a function after atomically reserving rate limit capacity.

This function waits for capacity to become available, atomically consumes the requested amount, then executes the provided function. This prevents race conditions where multiple callers might consume the same quota.

Options

  • :oban - The Oban instance name. Defaults to Oban.
  • :partition - The partition key to check. Defaults to "*" (the global partition).
  • :timeout - Maximum time to wait in milliseconds. Defaults to 5_000 (5 seconds).
  • :interval - Polling interval in milliseconds. Defaults to 100.

Examples

Execute a function after reserving 5 units of capacity:

{:ok, result} = Oban.Pro.RateLimit.with_quota(:my_queue, 5, fn ->
  ExternalAPI.batch_request(items)
end)

Handle timeout when capacity isn't available:

case Oban.Pro.RateLimit.with_quota(:my_queue, 5, &make_api_calls/0, timeout: 10_000) do
  {:ok, result} -> handle_result(result)
  {:error, :timeout} -> handle_timeout()
end

Reserve capacity on a specific partition:

{:ok, result} = Oban.Pro.RateLimit.with_quota(:my_queue, 1, fun, partition: "user_123")