Oban Releases

Pro v0.14.0

Docs

This release includes engine and supervisor improvements which require Oban v2.15+

💸 Horizontal Auto-Scaling with DynamicScaler

The new DynamicScaler plugin monitors queue throughput and horizontally scales cloud infrastructure to optimize processing. Horizontal scaling is applied at the node level, not the queue level, so you can distribute processing over more physical hardware.

With auto-scaling you can spin up additional nodes during high traffic events, and pare down to a single node during a lull. Beyond optimizing throughput, scaling can save money in environments with little to no usage at off-peak times, e.g. staging.

DynamicScaler calculates the optimal scale by predicting the future size of a queue based on recent trends. You provide an acceptable range of node sizes, which queues to track, and a cloud module—auto-scaling takes care of the rest (with observability, of course).

It’s designed to integrate with popular cloud infrastructure like AWS, GCP, K8S, and Fly via a simple, flexible behaviour. For example, here we declare auto-scaling rules for two distinct queues:

plugins: [
  {DynamicScaler, scalers: [
      queue: :reports, range: 0..1, cloud: {MyApp.Cloud, asg: "my-reports-asg"},
      queue: :exports, range: 1..4, cloud: {MyApp.Cloud, asg: "my-exports-asg"}
    ]
  }
]

There are also options to filter scaling by a particular queue, optimize responsiveness with scaling steps, and tunable cooldown periods to prevent unnecessary scaling.

See the DynamicScaler docs for a complete reference and a guide on defining cloud modules.

🏗️ Args Schema Macro for Structured Workers

Structured args are indispensable for enforcing an args schema. However, the legacy keyword list syntax with field: :type, implicit enums, and an asterisk symbol for required fields was simply awkward. We’re correcting that awkwardness with a new args_schema/1 macro for defining structured workers. The args_schema macro defines a DSL that’s a subset of Ecto’s schema, optimized for JSON compatibility and without the need for dedicated changeset functions.

Here’s a taste that demonstrates multiple field types, the required option, enums, and embedded structures:

use Oban.Pro.Worker

alias __MODULE__, as: Args

args_schema do
  field :id, :id, required: true
  field :mode, :enum, values: ~w(enabled disabled paused)a

  embeds_one :data do
    field :office_id, :uuid, required: true
    field :has_notes, :boolean
    field :addons, {:array, :string}
  end

  embeds_many :address do
    field :street, :string
    field :city, :string
    field :country, :string
  end
end

@impl Oban.Pro.Worker
def process(%{args: %Args{id: id, mode: mode} = args) do
  %Args.Data{office_id: office_id} = args.data
  [%Args.Address{street: street} | _] = args.addresses

  ...
end

The legacy (and legacy-legacy) syntax is still viable and it generates the appropriate field declarations automatically. However, we strongly recommend updating to the new syntax for your own sanity.

See the Structured Jobs docs for specifics.

🍪 Declarative Chunk Partitioning

Previously, chunk workers executed jobs in groups based on size or timeout, with the grouping always consisting of jobs from the same queue, regardless of worker, args, or other job attributes. However, sometimes there’s a need for more flexibility in grouping jobs based on different criteria.

To address this, we have introduced partitioning, which allows grouping chunks by worker and/or a subset of args or meta. This improvement enables you to methodically compose chunks of jobs with the same args or meta, instead of running a separate queue for each chunk.

Here’s an example that demonstrates using GPT to summarize groups of messages from a particular author:

defmodule MyApp.MessageSummarizer do
  use Oban.Pro.Workers.Chunk,
      by: [:worker, args: :author_id],
      size: 100,
      timeout: :timer.minutes(5)

  @impl true
  def process([%{"author_id" => author_id} | _] = jobs) do
    messages =
      jobs
      |> Enum.map(& &1.args["message_id"])
      |> MyApp.Messages.all()

    {:ok, summary} = MyApp.GPT.summarize(messages)

    # Push the summary
  end
end

By leveraging the enhanced partitioning capabilities, you can now effectively group and process jobs based on specific criteria, providing a more streamlined approach to managing your workloads.

Chunk queries have been optimized for tables of any size to compensate for their newfound advanced complexity. As a result, even with hundreds of thousands of available jobs, queries run in milliseconds.

See the Chunk Partitioning docs for details.

🗄️ Batch Performance

Some teams run batches containing upwards of 50k and often run multiple batches simultaneously to the tune of millions of jobs a day. That load level exposed some opportunities for performance tuning that we’re excited to provide for batches of all sizes.

In short, batches are lazier, their queries are faster, and they’ll put less load on your database. If query tuning and performance optimizations are your things, read on for the details!

  • Debounce batch callback queries so that only one database call is made for each batch within a short window of time. By default, batching debounces for 100ms, but it’s configurable at the batch level if you’d prefer to debounce for longer.

  • Only query for the exact details needed by a batch’s supported callbacks. If a batch only has a cancelled handler, then no other states are checked.

  • Optimize state-checking queries to avoid using exact counts and ignore completed jobs entirely, as that’s typically the state with most jobs.

  • Use index-powered match queries to find existing callback jobs, again, only for the current batch’s supported callbacks.

Enhancements

  • [DynamicLifeline] Remove transactional wrapper and add a :timeout option for rescue and discard queries.

  • [Chunk] Apply cancellation to all jobs within a chunk

    Only the leader job is registered as “running” by the queue producer. Now a running chunk listens for cancel messages for any job in the chunk and cancels the leader. Without this, cancelling doesn’t apply to the leader and chunk jobs were orphaned.

  • [SmartEngine] Bypass transaction on insert for small batches

    Inserting a list of non-unique jobs smaller than the batch size is no longer wrapped in a transaction.

Bug Fixes

  • [Chunk] Monitor chunks with timeouts to prevent orphans

    The queue executor uses :timer.exit_after/2 to kill jobs that exceed a timeout. Then, the queue’s producer catches the :DOWN message and uses that to ack a job error. Producers aren’t aware of the chunks, so we spawn a sentinel process to monitor the parent process and ack the chunk jobs.

  • [SmartEngine] Cast jsonb to text before casting to integer for global tracking

    A recent improvement to global tracking broke a previous compatibility trick used for legacy Postgres support. While we officially support PG 12+ now, there’s no reason to break backward compatibility for a simple cast.

  • [SmartEngine] Use a CTE for fetch_jobs query to prevent overfetching

    The CTE prevents Postgres from optimizing the subquery and ignoring the limit. More details are available in https://github.com/sorentwo/oban/commit/399092159.

Pro v0.14.1

Docs

Bug Fixes

  • [Chunk] Apply decrypting and structuring to all jobs in a chunk

    Previously, only the chunk leader was decrypted or structured, causing a mismatch of structured args and unusable encrypted args.

  • [DynamicScaler] Safely handle Cloud.scale/1 errors and recycle scaler configuration

    The DynamicScaler crashed when it received an error response from scaling events. Now, it includes an error value in the telemetry event metadata and makes it clear that a successful scale must return a new conf for subsequent scale calls.

  • [Worker] Support options for embeds_one/3 and embeds_many macros

    An oversight omitted support for required: true variants of the embed macros

  • [Worker] Safely cast {:array, :uuid} to array of binary_id

    In args, a UUID is identical to a binary id, but isn’t natively supported by Ecto changesets.

Pro v0.14.2

Docs

Bug Fixes

  • [SmartEngine] Prevent fetch_jobs from ever exceeding max_attempts

    In some situations, a condition to ensure the attempts don’t exceed max attempts is still necessary. By checking the attempts outside the CTE we maintain optimal query performance for the large scan that finds jobs, and only apply the check in the small outer query.

  • [SmartEngine] Use Oban.Repo.default_options for all Multi operations

    The SmartEngine and Chunk worker extracted a subset of opts from the config, but that subset didn’t include telemetry_options. To ensure consistency with plain Repo calls, all Multi operations now use Repo.default_options.

  • [DynamicLifeline] Prevent rescuing non-leader chunk jobs

    Now that chunks record the chunk leader as a third element in attempted_by, the DynamicLifeline could erroneously rescue executing jobs.

  • [DynamicQueues] Safely update dynamic queues via Oban.scale_queue/2

    For queues with a configured global or rate limit option, scale_queue calls that only included the plain limit would crash with an update error.

  • [Worker] Delete oban_meta from process dictionary when recording data

    Any oban_meta data stored in the process dictionary would linger between executions because it wasn’t deleted. That caused problems when using drain_jobs because each job runs in the same process.

  • [Producer] Derive Jason.Encodable for Oban.Pro.Producer schema

    The :__meta__ field isn’t encodable and triggers an error when passed to a JSON based log.

  • [DynamicQueue] Unwrap queue_input typespec for insert/2 because it is already a list

Pro v0.14.3

Docs

Bug Fixes

  • [SmartEngine] Gracefully recover from erroneous producer deletion.

    During database maintenance or other downtime it’s possible for producer’s to delete other records incorrectly. Now we insert a fresh record with the current state when a producer is deleted while the process is still running.

  • [DynamicCron] Validate cron opts before changeset or insertion

    Some cron option validation was left until changeset validation, which wouldn’t occur until entries were written into the database on startup. This moves all cron option validation into the config validation stage and clarifies the errors.