This release enhances workflows with sub-workflows and context sharing, overhauls queue partitioning for better performance, improves dynamic plugins, and adds various usability improvements.
See the v1.6 Upgrade Guide for complete upgrade steps and migration caveats.
🗂️ Sub-Workflows
Workflows gain powerful new capabilities for organizing complex job relationships with two major
enhancements: add_workflow/4
and add_many/4
.
Use add_workflow/4
to nest entire workflows within others to create hierarchical job
dependencies. This makes complex workflows more maintainable by grouping related jobs together:
extr_flow =
Workflow.new(workflow_name: "extract")
|> Workflow.add(:extract, WorkerA.new(%{source: "database"}))
|> Workflow.add(:transform, WorkerB.new(%{}), deps: :extract)
# Add sub-workflow as a dependency
Workflow.new()
|> Workflow.add(:setup, WorkerC.new(%{mode: "initialize"}))
|> Workflow.add_workflow(:extract, extr_flow, deps: :setup)
|> Workflow.add(:finalize, WorkerC.new(%{}), deps: :extract)
Workflows can depend on other workflows, and downstream deps will wait until the sub-workflow completes before executing.
Need to run similar jobs in parallel? Use add_many/4
to add multiple jobs with a single
dependency name:
# Add multiple email jobs that can run in parallel
email_jobs = Enum.map(users, &EmailWorker.new(%{user_id: &1.id}))
workflow =
Workflow.new()
|> Workflow.add_many(:emails, email_jobs)
|> Workflow.add(:report, ReportWorker.new(), deps: :emails)
The add_many/4
step creates a sub workflow from either a list or a map, and the full recorded
results can be extracted with a single call:
def process(job) do
map_of_results = Workflow.all_recorded(job, with_subs: true)
end
🌊 Cascading Workflows
The most significant workflow enhancement in v1.6 is the ability to build workflows with cascading
functions. Using add_cascade/3
, you can build workflows using regular Elixir functions, with
results from each step automatically passed to subsequent steps:
Workflow.new()
|> Workflow.put_context(%{user_id: 123})
|> Workflow.add_cascade(:fetch, &MyApp.fetch_user/1)
|> Workflow.add_cascade(:process, &MyApp.process_user/1, deps: :fetch)
|> Workflow.add_cascade(:notify, &MyApp.send_notification/1, deps: :process)
Each function receives a context map containing the workflow's shared context and any results from
its upstream dependencies. For example, the process_user/1
function automatically receives a map
with the results of fetch_user/1
under the :fetch
key.
Additionally, add_cascade/3
supports fan-out operations with {enumerable, function/2}
tuples:
items = [1, 2, 3]
Workflow.new()
|> Workflow.add_cascade(:process_items, {items, &process_item/2})
|> Workflow.add_cascade(:finalize, &finalize/1, deps: :process_items)
The process_item/2
function is called for each item in the enumerable, with the first argument
receiving the item and the second receiving the context.
🪴 Grafting Sub-Workflows
Grafting allows sub-workflows to be attached after a workflow has started. With grafting, you define placeholders that are expanded into full sub-workflows later when data becomes available. Downstream jobs that depend on the graft will automatically wait for the sub-workflow, not just the graft job itself.
Grafting solves the tricky problem of inserting multiple jobs into the middle of a workflow. For example, to wait until an unknown number of notifications are sent before summarizing the output:
Workflow.new()
|> Workflow.put_context(%{account_id: account_id})
|> Workflow.add_graft(:users, &expand_users/1)
|> Workflow.add_cascade(:summary, &send_summary/1, deps: :users)
The expand_users/1
function takes the account_id
, fetches all of the users for the account,
then expands the graft job into a sub-workflow that sends each user a notice:
def expand_users(%{account_id: account_id}) do
user_ids = MyApp.Account.get_user_ids!(account_id)
Workflow.apply_graft({user_ids, &send_notice/2})
end
🖼️ Context and Workflow Status
Workflows that rely on common data can now share data without duplicating arguments using
put_context/3
:
workflow =
Workflow.new()
|> Workflow.put_context(%{user_id: 123, app_version: "1.2.3"})
|> Workflow.add(:job_a, WorkerA.new(%{}))
|> Workflow.add(:job_b, WorkerB.new(%{}))
# Later in a worker:
def process(job) do
context = Workflow.get_context(job)
# Use context map...
end
It's now easier to check workflow progress with status/1
, which provides execution stats:
%{
total: 5,
state: :executing,
counts: %{completed: 3, executing: 2},
duration: 15_620
} = Workflow.status(workflow_id)
🧰 Queue Partitioning Overhaul
Queue partitioning is completely redesigned for dramatic performance improvements. Jobs are now assigned partition keys on insert rather than at runtime, enabling more efficient querying and eliminating head-of-line blocking when one partition has many jobs.
The new design has none of the issues of the previous solution:
- Job processing is completely fair. Jobs from a single partition can't block processing of other partitions after bulk insert. No priority or scheduling workarounds are necessary.
- Querying in partitioned queues relies on a single, partial index
- Partitioning uses a single, optimized query without any unions or dynamic sections. That allows ecto to prepare and cache a single plan for faster fetching and less memory usage.
In a benchmark of 10k jobs spread across 20 partitions (200k jobs), processing took 17s in v1.6, down from 360s in v1.5 (20x faster) with far less load on the database.
🧨 Global Burst Mode
Global partitioning gained an advanced feature called "burst mode" that allows you to maximize throughput by temporarily exceeding per-partition global limits when there are available resources.
Each global partition is typically restricted to the configured allowed
value. However, with
burst mode enabled, the system can intelligently allocate more jobs to each active partition,
potentially exceeding the per-partition limit while still respecting the overall queue
concurrency.
This is particularly useful when:
- You have many potential partitions but only a few are active at any given time
- You want to maximize throughput while maintaining some level of fairness between partitions
- You need to ensure your queues aren't sitting idle when capacity is available
Here's an example of a queue that will 5 jobs from a single partition concurrently under load, but can burst up to 100 for a single partition when there is available capacity:
queues: [
exports: [
global_limit: [
allowed: 5,
burst: true,
partition: [args: :tenant_id]
],
local_limit: 100
]
]
🍋 Preserve Queue Updates
DynamicQueues now preserves queue changes made at runtime across application restarts. This brings two key improvements:
- Runtime changes to queues (via Web or CLI) persist until explicitly changed in configuration
- A new
:automatic
sync mode that can manage queue deletions based on configuration
# Automatic mode - Deletes queues missing from configuration
config :my_app, Oban,
plugins: [{DynamicQueues, sync_mode: :automatic, queues: [...]}]
In automatic mode, any queue that exists in the database but isn't defined in the configuration will be automatically deleted during startup. This is useful when you want to ensure your runtime queue configuration exactly matches what's defined in your application config.
Now when you pause a queue through the dashboard or change its limits via API, those changes will persist across application restarts until you explicitly update those options in your configuration.
🎨 Decorator Enhancements
Decorated jobs gain a few new capabilities. You can now use current_job/0
to access the
underlying job struct from within decorated functions, making it easier to work with job context
or pass job details to other functions. Additionally, you can mark any decorated job as recorded
at runtime with the recorded
option, enabling workflow composition and return value access
without separate modules.
defmodule Business do
use Oban.Pro.Decorator
@job queue: :default, recorded: true
def process_account(account_id) do
job = current_job()
IO.puts("Processing account #{account_id} with job #{job.id}")
{:ok, %{processed: true}}
end
end