Oban.Pro.Testing (Oban Pro v1.2.0)
Advanced helpers for testing supervised Oban instances, workers, and making assertions about enqueued jobs.
The Oban.Pro.Testing
module is a drop-in replacement for Oban.Testing
, with additional
functions tailored toward integration testing and Pro modules.
Usage in Tests
The most convenient way to use Oban.Pro.Testing
is to use
the module:
use Oban.Pro.Testing, repo: MyApp.Repo
Other repo-specific configuration options can also be used:
use Oban.Pro.Testing, repo: MyApp.Repo, prefix: "private", log: :debug
If you already have use Oban.Testing
in your tests or test cases, simply replace it with use Oban.Pro.Testing
.
Naming Convention
The testing helpers in this module adhere to the following naming convention:
perform_*
— executes jobs locally, without touching the database, for unit testing.drain_*
— execute jobs inline, for integration testing.run_*
— insert jobs into the database and execute them inline, for integration testing.
Shared Repo Options
The use
macro accepts all of these repo-specific configuration options, and they may be passed
to all database functions (run_
, drain_
, etc.)
:log
— a usable log level orfalse
to disable logging. SeeLogger.level/0
for valid options.:prefix
— an optional database prefix. Defaults topublic
.:repo
— the name of an Ecto repo, which should be running in sandbox mode.
Summary
Types
Batch callback identifiers, correlating to a handle_
callback function.
Functions
Retrieve all currently enqueued jobs matching a set of criteria.
Assert that one or more jobs were enqueued during a function call.
Assert that a job with particular criteria is enqueued.
Synchronously execute jobs in one or all queues, from within the current process.
Construct and execute a job with a batch handle_*
callback.
Construct a list of jobs and process them with a Chunk worker.
Refute that any jobs were enqueued during a function call.
Refute that a job with particular criteria is enqueued.
Insert and execute a complete batch of jobs, along with callbacks, within the test process.
Insert and execute chained jobs within the test process.
Insert and execute chunked jobs within the test process.
Insert and execute jobs synchronously, within the test process.
Insert and execute a workflow synchronously, within the test process.
Start a supervised Oban instance under the test supervisor.
Types
callback()
@type callback() :: :attempted | :completed | :discarded | :exhausted
Batch callback identifiers, correlating to a handle_
callback function.
drain_option()
@type drain_option() :: repo_option() | {:queue, atom()} | {:with_limit, pos_integer()} | {:with_recursion, boolean()} | {:with_safety, boolean()} | {:with_scheduled, boolean()} | {:with_summary, boolean()}
drain_result()
@type drain_result() :: drain_summary() | [Oban.Job.t()]
drain_summary()
@type drain_summary() :: %{ cancelled: non_neg_integer(), completed: non_neg_integer(), discarded: non_neg_integer(), exhausted: non_neg_integer(), retryable: non_neg_integer(), scheduled: non_neg_integer() }
perform_option()
@type perform_option() :: Oban.Job.option() | repo_option()
repo_option()
@type repo_option() :: {:log, false | Logger.level()} | {:prefix, String.t()} | {:repo, module()}
Functions
@spec all_enqueued(keyword()) :: [Oban.Job.t()]
Retrieve all currently enqueued jobs matching a set of criteria.
This is a wrapper around Oban.Testing.all_enqueued/1
, see Oban.Testing
for more details.
Options
See shared options for additional repo-specific options.
Assert that one or more jobs were enqueued during a function call.
Any pre-existing jobs are ignored for the assertion. If the assertion passes then the function's return value is passed back.
Options
See shared options for additional repo-specific options.
Examples
Assert that a MyApp.Worker
job was added to the default
queue:
assert_enqueue([queue: :default, worker: MyApp.Worker], fn ->
MyApp.do_some_business()
end)
Make an assertion about the return value:
result = assert_enqueue([worker: MyApp.Worker], &MyApp.more_business/0)
assert {:ok, _} = result
Assert that a job with particular criteria is enqueued.
This is a wrapper around Oban.Testing.assert_enqueued/2
, see Oban.Testing
for more details.
Options
See shared options for additional repo-specific options.
@spec drain_jobs([drain_option()]) :: drain_result()
Synchronously execute jobs in one or all queues, from within the current process.
Jobs that are enqueued by a process when Ecto
is in sandbox mode are only visible to that
process. Calling drain_jobs/1
allows you to control when the jobs are executed and to wait
synchronously for all jobs to complete.
This function provides several distinct advantages over the standard Oban.drain_queue/2
:
- It can drain jobs across one or all queues simultaneously
- It can return the drained jobs rather than a count summary
- It optimizes the defaults for testing batches, workflows, etc.
- It always uses the
Smart
engine to guarantee that Pro worker features work as expected
Options
:queue
- an atom specifying the queue to drain, or:all
to drain jobs across all queues at once. Defaults to:all
when no queue is provided.:with_limit
— the maximum number of jobs to fetch for draining at once. The limit only impacts how many jobs are fetched at once, not concurrency. When recursion is enabled this is how many jobs are processed per-iteration, and it defaults to1
. Otherwise, there isn't a limit and all available jobs are fetched.:with_recursion
— whether to draining jobs recursively, or all in a single pass. Either way, jobs are processed sequentially, one at a time. Recursion is required when jobs insert other jobs (e.g. batches), or depend on the execution of other jobs (e.g. workflows). Defaults totrue
.:with_safety
— whether to silently catch errors when draining. Whenfalse
, raised exceptions or unhandled exits are reraised (unhandled exits are wrapped inOban.CrashError
). Defaults tofalse
.:with_scheduled
— whether to include scheduled or retryable jobs when draining. In recursive mode, which is the default, this will include snoozed jobs, and may lead to an infinite loop if the job snoozes repeatedly. Defaults totrue
.:with_summary
— whether to summarize execution results with a map of counts by state, or return a list of each job that was drained. Defaults totrue
, which returns a summary map.
See shared options for additional repo-specific options.
Examples
Drain all available jobs across all queues:
assert %{completed: 3} = drain_jobs(queue: :all)
Drain and return all executed jobs without a count summary:
assert [%Oban.Job{}, %Oban.Job{}] = drain_jobs(with_summary: false)
Drain including a job that you expect to raise:
assert_raise RuntimeError, fn -> drain_jobs(queue: :risky) end
Drain without recursion to identify snoozed jobs:
assert %{scheduled: 3} = drain_jobs(with_recursion: false)
Drain without staging scheduled jobs:
assert %{completed: 1, scheduled: 0} = drain_jobs(with_scheduled: false)
Drain within a custom prefix:
assert %{completed: 3} = drain_jobs(queue: :default, prefix: "private")
Drain using a specific repo (necessary when calling this function directly):
assert %{completed: 3} = drain_jobs(queue: :default, repo: MyApp.Repo)
@spec perform_callback(Oban.Worker.t(), callback(), term(), [perform_option()]) :: Oban.Worker.result()
Construct and execute a job with a batch handle_*
callback.
This helper verifies that the batch worker exports the requested callback handler,
along with the standard assertions made by perform_job/3
.
Examples
Execute the handle_attempted
callback without any args:
assert :ok = perform_callback(MyBatch, :attempted, %{})
Execute the handle_exhausted
callback with args:
assert :ok = perform_callback(MyBatch, :exhausted, %{for_account_id: 123})
@spec perform_chunk(Oban.Worker.t(), [term()], [perform_option()]) :: Oban.Pro.Workers.Chunk.result()
Construct a list of jobs and process them with a Chunk worker.
Like perform_job/3
, this helper reduces boilerplate when constructing jobs and checks for
common pitfalls. Unlike perform_job/3
, this helper calls the chunk's process/1
function
directly and it won't trigger telemetry events.
Examples
Successfully process a chunk of jobs:
assert :ok = perform_chunk(MyChunk, [%{id: 1}, %{id: 2}])
Process a chunk of jobs with job options:
assert :ok = perform_chunk(MyChunk, [%{id: 1}, %{id: 2}], attempt: 5, priority: 3)
@spec perform_job(Oban.Worker.t(), term(), [perform_option()]) :: Oban.Worker.result()
Refute that any jobs were enqueued during a function call.
Any pre-existing jobs are ignored for the refutation. If the refutation passes then the function's return value is passed back.
Options
See shared options for additional repo-specific options.
Examples
Refute that a MyApp.Worker
job was added to the default
queue:
refute_enqueue([queue: :default, worker: MyApp.Worker], fn ->
MyApp.do_some_business()
end)
Make a refutation about the return value:
result = refute_enqueue([worker: MyApp.Worker], &MyApp.more_business/0)
assert {:ok, _} = result
Refute that a job with particular criteria is enqueued.
This is a wrapper around Oban.Testing.refute_enqueued/2
, see Oban.Testing
for more details.
Options
See shared options for additional repo-specific options.
@spec run_batch([Oban.Job.changeset()], [drain_option()]) :: drain_result()
Insert and execute a complete batch of jobs, along with callbacks, within the test process.
Options
Accepts all options for drain_jobs/1
, including the repo-specific options listed in
shared-options.
Examples
Run a batch:
ids
|> Enum.map(&MyBatch.new(%{id: &1}))
|> MyBatch.new_batch()
|> run_batch()
Run a batch with a specific repo (necessary when calling this function directly):
run_batch(my_batch, repo: MyApp.Repo, prefix: "private")
Insert and execute chained jobs within the test process.
Options
Accepts all options for drain_jobs/1
, including the repo-specific options listed in
shared-options.
Examples
Run all jobs in a chain:
1..10
|> Enum.map(&MyChain.new(%{id: &1}))
|> run_chain()
@spec run_chunk([Oban.Job.changeset()], [drain_option()]) :: drain_result()
Insert and execute chunked jobs within the test process.
This helper overrides the chunk's timeout
to force immediate processing of jobs up to the
chunk size.
Options
Accepts all options for drain_jobs/1
, including the repo-specific options listed in
shared-options.
Examples
Run jobs in chunks:
1..50
|> Enum.map(&MyChunk.new(%{id: &1}))
|> run_chunk()
Run a chunk with an overriden size:
1..10
|> Enum.map(&MyChunk.new(%{id: &1}))
|> run_chunk(size: 10)
Run chunks with an explicit repo (necessary when calling this function directly):
run_chunk(changesets, repo: MyApp.Repo, prefix: "private")
Run chunks only for a specific queue:
run_chunk(changesets, queue: "default")
@spec run_jobs([Oban.Job.changeset()], [drain_option()]) :: drain_result()
Insert and execute jobs synchronously, within the test process.
This is the basis of all other run_*
helpers.
Options
Accepts all options for drain_jobs/1
, including the repo-specific options listed in
shared-options.
Examples
Run a list of jobs:
ids
|> Enum.map(&MyWorker.new(%{id: &1}))
|> run_jobs()
Run jobs with an explicit repo (necessary when calling this function directly):
run_jobs(changesets, repo: MyApp.Repo)
@spec run_workflow(Oban.Pro.Workers.Workflow.t(), [ Oban.Pro.Workers.Workflow.new_option() | drain_option() ]) :: drain_result()
Insert and execute a workflow synchronously, within the test process.
This helper augments the workflow with options optimized for testing, but it will still respect all standard workflow options.
Options
Accepts all options for drain_jobs/1
, including the repo-specific options listed in
shared-options.
Examples
Run a basic workflow:
MyFlow.new_workflow()
|> MyFlow.add(:a, MyFlow.new(%{id: 1}))
|> MyFlow.add(:b, MyFlow.new(%{id: 2}), deps: [:a])
|> MyFlow.add(:c, MyFlow.new(%{id: 3}), deps: [:b])
|> run_workflow()
Run a workflow and match on returned jobs, but be careful that the execution order may differ from the insertion order:
workflow =
MyFlow.new_workflow()
|> MyFlow.add(:a, MyFlow.new(%{id: 1}))
|> MyFlow.add(:b, MyFlow.new(%{id: 2}), deps: [:a])
|> MyFlow.add(:c, MyFlow.new(%{id: 3}), deps: [:b])
[_job_a, _job_b, _job_c] = run_workflow(workflow, with_summary: false)
Run a workflow with increased parallelism to prevent slow or deadlocked workflows:
run_workflow(my_expansive_workflow, with_limit: 5)
Run a workflow with an explicit repo (necessary when calling this function directly):
run_workflow(workflow, repo: MyApp.Repo)
@spec start_supervised_oban!([Oban.option()]) :: Registry.key()
Start a supervised Oban instance under the test supervisor.
All valid Oban options are accepted. The supervised instance is registered with a unique
reference, rather than the default Oban
. That prevents any conflict with Oban instances
started by your Application, or with other tests running asynchronously.
Furthermore, this helper automatically adds sandbox allowances for any plugins or queue producers, allowing tests to run async.
After the test finishes the test process will wait for the Oban instance to shut down cleanly.
Running Jobs
By default, the supervised instance won't process any jobs because the stage_interval
is set
to :infinity
. Set the stage_interval
to a low value to process jobs normally, without manual
draining.
Options
Any option accepted by Oban.start_link/1
is acceptable, including the repo-specific options
listed in shared options.
Examples
Start a basic supervised instance without any queues or plugins:
name = start_supervised_oban!(repo: MyApp.Repo)
Oban.insert(name, MyWorker.new())
Start the supervisor with a single queue and polling every 10ms:
start_supervised_oban!(repo: MyApp.Repo, stage_interval: 10, queues: [alpha: 10])