Oban.Pro.Testing (Oban Pro v1.4.8)

Advanced helpers for testing supervised Oban instances, workers, and making assertions about enqueued jobs.

The Oban.Pro.Testing module is a drop-in replacement for Oban.Testing, with additional functions tailored toward integration testing and Pro modules.

Usage in Tests

The most convenient way to use Oban.Pro.Testing is to use the module:

use Oban.Pro.Testing, repo: MyApp.Repo

Other repo-specific configuration options can also be used:

use Oban.Pro.Testing, repo: MyApp.Repo, prefix: "private", log: :debug

If you already have use Oban.Testing in your tests or test cases, simply replace it with use Oban.Pro.Testing.

Naming Convention

The testing helpers in this module adhere to the following naming convention:

  • perform_* — executes jobs locally, without touching the database, for unit testing.

  • drain_* — execute jobs inline, for integration testing.

  • run_* — insert jobs into the database and execute them inline, for integration testing.

Shared Repo Options

The use macro accepts all of these repo-specific configuration options, and they may be passed to all database functions (run_, drain_, etc.)

  • :log — a usable log level or false to disable logging. See Logger.level/0 for valid options.

  • :prefix — an optional database prefix. Defaults to public.

  • :repo — the name of an Ecto repo, which should be running in sandbox mode.

Summary

Types

Batch callback identifiers, correlating to a handle_ callback function.

Functions

Retrieve all currently enqueued jobs matching a set of criteria.

Assert that one or more jobs were enqueued during a function call.

Assert that a job with particular criteria is enqueued.

Synchronously execute jobs in one or all queues, from within the current process.

Construct and execute a job with a batch handle_* callback.

Construct a list of jobs and process them with a Chunk worker.

Refute that any jobs were enqueued during a function call.

Refute that a job with particular criteria is enqueued.

Insert and execute a complete batch of jobs, along with callbacks, within the test process.

Insert and execute chained jobs within the test process.

Insert and execute chunked jobs within the test process.

Insert and execute jobs synchronously, within the test process.

Insert and execute a workflow synchronously, within the test process.

Start a supervised Oban instance under the test supervisor.

Types

@type callback() :: :attempted | :completed | :discarded | :exhausted

Batch callback identifiers, correlating to a handle_ callback function.

Link to this type

drain_option()

@type drain_option() ::
  repo_option()
  | {:queue, atom()}
  | {:with_limit, pos_integer()}
  | {:with_recursion, boolean()}
  | {:with_safety, boolean()}
  | {:with_scheduled, boolean()}
  | {:with_summary, boolean()}
Link to this type

drain_result()

@type drain_result() :: drain_summary() | [Oban.Job.t()]
Link to this type

drain_summary()

@type drain_summary() :: %{
  cancelled: non_neg_integer(),
  completed: non_neg_integer(),
  discarded: non_neg_integer(),
  exhausted: non_neg_integer(),
  retryable: non_neg_integer(),
  scheduled: non_neg_integer()
}
Link to this type

perform_option()

@type perform_option() :: Oban.Job.option() | repo_option()
Link to this type

repo_option()

@type repo_option() ::
  {:log, false | Logger.level()} | {:prefix, String.t()} | {:repo, module()}

Functions

Link to this function

all_enqueued(opts)

(since 0.11.0)
@spec all_enqueued(keyword()) :: [Oban.Job.t()]

Retrieve all currently enqueued jobs matching a set of criteria.

This is a wrapper around Oban.Testing.all_enqueued/1, see Oban.Testing for more details.

Options

See shared options for additional repo-specific options.

Link to this function

assert_enqueue(opts, fun)

(since 1.1.0)
@spec assert_enqueue(
  keyword(),
  (-> return)
) :: return
when return: any()

Assert that one or more jobs were enqueued during a function call.

Any pre-existing jobs are ignored for the assertion. If the assertion passes then the function's return value is passed back.

Options

See shared options for additional repo-specific options.

Examples

Assert that a MyApp.Worker job was added to the default queue:

assert_enqueue([queue: :default, worker: MyApp.Worker], fn ->
  MyApp.do_some_business()
end)

Make an assertion about the return value:

result = assert_enqueue([worker: MyApp.Worker], &MyApp.more_business/0)

assert {:ok, _} = result
Link to this function

assert_enqueued(opts, timeout \\ :none)

(since 0.11.0)
@spec assert_enqueued(
  keyword(),
  timeout() | :none
) :: true

Assert that a job with particular criteria is enqueued.

This is a wrapper around Oban.Testing.assert_enqueued/2, see Oban.Testing for more details.

Options

See shared options for additional repo-specific options.

Link to this function

drain_jobs(opts)

(since 0.11.0)
@spec drain_jobs([drain_option()]) :: drain_result()

Synchronously execute jobs in one or all queues, from within the current process.

Jobs that are enqueued by a process when Ecto is in sandbox mode are only visible to that process. Calling drain_jobs/1 allows you to control when the jobs are executed and to wait synchronously for all jobs to complete.

This function provides several distinct advantages over the standard Oban.drain_queue/2:

  • It can drain jobs across one or all queues simultaneously
  • It can return the drained jobs rather than a count summary
  • It optimizes the defaults for testing batches, workflows, etc.
  • It always uses the Smart engine to guarantee that Pro worker features work as expected

Options

  • :queue - an atom specifying the queue to drain, or :all to drain jobs across all queues at once. Defaults to :all when no queue is provided.

  • :with_limit — the maximum number of jobs to fetch for draining at once. The limit only impacts how many jobs are fetched at once, not concurrency. When recursion is enabled this is how many jobs are processed per-iteration, and it defaults to 1. Otherwise, there isn't a limit and all available jobs are fetched.

  • :with_recursion — whether to draining jobs recursively, or all in a single pass. Either way, jobs are processed sequentially, one at a time. Recursion is required when jobs insert other jobs (e.g. batches), or depend on the execution of other jobs (e.g. workflows). Defaults to true.

  • :with_safety — whether to silently catch errors when draining. When false, raised exceptions or unhandled exits are reraised (unhandled exits are wrapped in Oban.CrashError). Defaults to false.

  • :with_scheduled — whether to include scheduled or retryable jobs when draining. In recursive mode, which is the default, this will include snoozed jobs, and may lead to an infinite loop if the job snoozes repeatedly. Defaults to true.

  • :with_summary — whether to summarize execution results with a map of counts by state, or return a list of each job that was drained. Defaults to true, which returns a summary map.

See shared options for additional repo-specific options.

Examples

Drain all available jobs across all queues:

assert %{completed: 3} = drain_jobs(queue: :all)

Drain and return all executed jobs without a count summary:

assert [%Oban.Job{}, %Oban.Job{}] = drain_jobs(with_summary: false)

Drain including a job that you expect to raise:

assert_raise RuntimeError, fn -> drain_jobs(queue: :risky) end

Drain without recursion to identify snoozed jobs:

assert %{scheduled: 3} = drain_jobs(with_recursion: false)

Drain without staging scheduled jobs:

assert %{completed: 1, scheduled: 0} = drain_jobs(with_scheduled: false)

Drain within a custom prefix:

assert %{completed: 3} = drain_jobs(queue: :default, prefix: "private")

Drain using a specific repo (necessary when calling this function directly):

assert %{completed: 3} = drain_jobs(queue: :default, repo: MyApp.Repo)
Link to this function

perform_callback(worker, callback, args, opts)

(since 0.11.0)
@spec perform_callback(Oban.Worker.t(), callback(), term(), [perform_option()]) ::
  Oban.Worker.result()

Construct and execute a job with a batch handle_* callback.

This helper verifies that the batch worker exports the requested callback handler, along with the standard assertions made by perform_job/3.

Examples

Execute the handle_attempted callback without any args:

assert :ok = perform_callback(MyBatch, :attempted, %{})

Execute the handle_exhausted callback with args:

assert :ok = perform_callback(MyBatch, :exhausted, %{for_account_id: 123})
Link to this function

perform_chunk(worker, args_list, opts)

(since 0.11.0)
@spec perform_chunk(Oban.Worker.t(), [term()], [perform_option()]) ::
  Oban.Pro.Workers.Chunk.result()

Construct a list of jobs and process them with a Chunk worker.

Like perform_job/3, this helper reduces boilerplate when constructing jobs and checks for common pitfalls. Unlike perform_job/3, this helper calls the chunk's process/1 function directly and it won't trigger telemetry events.

Examples

Successfully process a chunk of jobs:

assert :ok = perform_chunk(MyChunk, [%{id: 1}, %{id: 2}])

Process a chunk of jobs with job options:

assert :ok = perform_chunk(MyChunk, [%{id: 1}, %{id: 2}], attempt: 5, priority: 3)
Link to this function

perform_job(worker, args, opts)

(since 0.11.0)
@spec perform_job(Oban.Worker.t(), term(), [perform_option()]) :: Oban.Worker.result()
Link to this function

refute_enqueue(opts, fun)

(since 1.1.0)
@spec refute_enqueue(
  keyword(),
  (-> return)
) :: return
when return: any()

Refute that any jobs were enqueued during a function call.

Any pre-existing jobs are ignored for the refutation. If the refutation passes then the function's return value is passed back.

Options

See shared options for additional repo-specific options.

Examples

Refute that a MyApp.Worker job was added to the default queue:

refute_enqueue([queue: :default, worker: MyApp.Worker], fn ->
  MyApp.do_some_business()
end)

Make a refutation about the return value:

result = refute_enqueue([worker: MyApp.Worker], &MyApp.more_business/0)

assert {:ok, _} = result
Link to this function

refute_enqueued(opts, timeout \\ :none)

(since 0.11.0)
@spec refute_enqueued(
  keyword(),
  timeout() | :none
) :: false

Refute that a job with particular criteria is enqueued.

This is a wrapper around Oban.Testing.refute_enqueued/2, see Oban.Testing for more details.

Options

See shared options for additional repo-specific options.

Link to this function

run_batch(batch, opts)

(since 0.11.0)
@spec run_batch([Oban.Job.changeset()], [drain_option()]) :: drain_result()

Insert and execute a complete batch of jobs, along with callbacks, within the test process.

Options

Accepts all options for drain_jobs/1, including the repo-specific options listed in shared-options.

Examples

Run a batch:

 ids
 |> Enum.map(&MyBatch.new(%{id: &1}))
 |> MyBatch.new_batch()
 |> run_batch()

Run a batch with a specific repo (necessary when calling this function directly):

run_batch(my_batch, repo: MyApp.Repo, prefix: "private")
Link to this function

run_chain(chain, opts)

(since 1.1.0)

Insert and execute chained jobs within the test process.

Options

Accepts all options for drain_jobs/1, including the repo-specific options listed in shared-options.

Examples

Run all jobs in a chain:

1..10
|> Enum.map(&MyChain.new(%{id: &1}))
|> run_chain()
Link to this function

run_chunk(chunk, opts)

(since 0.11.0)
@spec run_chunk([Oban.Job.changeset()], [drain_option()]) :: drain_result()

Insert and execute chunked jobs within the test process.

This helper overrides the chunk's timeout to force immediate processing of jobs up to the chunk size.

Options

Accepts all options for drain_jobs/1, including the repo-specific options listed in shared-options.

Examples

Run jobs in chunks:

1..50
|> Enum.map(&MyChunk.new(%{id: &1}))
|> run_chunk()

Run a chunk with an overriden size:

1..10
|> Enum.map(&MyChunk.new(%{id: &1}))
|> run_chunk(size: 10)

Run chunks with an explicit repo (necessary when calling this function directly):

run_chunk(changesets, repo: MyApp.Repo, prefix: "private")

Run chunks only for a specific queue:

run_chunk(changesets, queue: "default")
Link to this function

run_jobs(changesets, opts)

(since 0.11.0)
@spec run_jobs([Oban.Job.changeset()], [drain_option()]) :: drain_result()

Insert and execute jobs synchronously, within the test process.

This is the basis of all other run_* helpers.

Options

Accepts all options for drain_jobs/1, including the repo-specific options listed in shared-options.

Examples

Run a list of jobs:

ids
|> Enum.map(&MyWorker.new(%{id: &1}))
|> run_jobs()

Run jobs with an explicit repo (necessary when calling this function directly):

run_jobs(changesets, repo: MyApp.Repo)
Link to this function

run_workflow(workflow, opts \\ [])

(since 0.11.0)

Insert and execute a workflow synchronously, within the test process.

This helper augments the workflow with options optimized for testing, but it will still respect all standard workflow options.

Options

Accepts all options for drain_jobs/1, including the repo-specific options listed in shared-options.

Examples

Run a basic workflow:

MyFlow.new_workflow()
|> MyFlow.add(:a, MyFlow.new(%{id: 1}))
|> MyFlow.add(:b, MyFlow.new(%{id: 2}), deps: [:a])
|> MyFlow.add(:c, MyFlow.new(%{id: 3}), deps: [:b])
|> run_workflow()

Run a workflow and match on returned jobs, but be careful that the execution order may differ from the insertion order:

workflow =
  MyFlow.new_workflow()
  |> MyFlow.add(:a, MyFlow.new(%{id: 1}))
  |> MyFlow.add(:b, MyFlow.new(%{id: 2}), deps: [:a])
  |> MyFlow.add(:c, MyFlow.new(%{id: 3}), deps: [:b])

[_job_a, _job_b, _job_c] = run_workflow(workflow, with_summary: false)

Run a workflow with increased parallelism to prevent slow or deadlocked workflows:

run_workflow(my_expansive_workflow, with_limit: 5)

Run a workflow with an explicit repo (necessary when calling this function directly):

run_workflow(workflow, repo: MyApp.Repo)
Link to this function

start_supervised_oban!(opts)

(since 0.11.0)
@spec start_supervised_oban!([Oban.option()]) :: Registry.key()

Start a supervised Oban instance under the test supervisor.

All valid Oban options are accepted. The supervised instance is registered with a unique reference, rather than the default Oban. That prevents any conflict with Oban instances started by your Application, or with other tests running asynchronously.

Furthermore, this helper automatically adds sandbox allowances for any plugins or queue producers, allowing tests to run async.

After the test finishes the test process will wait for the Oban instance to shut down cleanly.

Running Jobs

By default, the supervised instance won't process any jobs because the stage_interval is set to :infinity. Set the stage_interval to a low value to process jobs normally, without manual draining.

Options

Any option accepted by Oban.start_link/1 is acceptable, including the repo-specific options listed in shared options.

Examples

Start a basic supervised instance without any queues or plugins:

name = start_supervised_oban!(repo: MyApp.Repo)
Oban.insert(name, MyWorker.new())

Start the supervisor with a single queue and polling every 10ms:

start_supervised_oban!(repo: MyApp.Repo, stage_interval: 10, queues: [alpha: 10])