Testing Pro Workers
This guide builds on Testing Workers, which you should read to aquaint
yourself with the basics of unit testing workers. With Oban.Pro.Testing
helpers, testing Pro workers is identical to testing basic workers, but with a
few powerful additions. To see those additions in action, let's step through how
to test each of the Pro specific workers.
testing-batches
Testing Batches
Batches link the execution of multiple jobs and optional callbacks after jobs
are processed. To test a batch exhaustively, you can exercise process/1
with
Oban.Pro.Testing.perform_job/2,3
and use Oban.Pro.Testing.perform_callback/2,3
for any callback handlers.
To demonstrate, we'll define a basic batch worker with a handler for when the batch completes successfully:
defmodule MyApp.MyBatch do
use Oban.Pro.Workers.Batch
@impl true
def process(%{args: %{"email" => email}}) do
if MyApp.valid_email?(email) do
MyApp.deliver_welcome(email)
else
{:error, :invalid_email}
end
end
@impl true
def handle_completed(%{args: %{"admin_email" => email}, meta: %{"batch_id" => bid}}) do
MyApp.batch_complete(bid, email)
:ok
end
end
Testing the worker's process/1
function is straight forward with perform_job/2
:
test "delivering welcome emails to valid addresses" do
assert :ok = perform_job(MyBatch, %{email: "[email protected]"})
assert {:error, _} = perform_job(MyBatch, %{email: "fake-email"})
end
Similarly, there is a helper for testing callback functions. The helper produces
a batch callback job and verifies that the callback function is exported. Here we
are verifying the handle_completed/1
callback:
test "notifying admins that a batch completed" do
assert :ok = perform_callback(MyBatch, :completed, %{admin_email: "[email protected]"})
end
integration-testing-batches
Integration Testing Batches
Oban inserts callback jobs automatically based on the results of each job in the
batch; e.g. if each job is completed
then there will be a handle_completed/1
callback job. The Oban.Pro.Testing.run_batch/2
helper handles inserting and
executing all jobs in a batch, including any appropriate callbacks.
test "running all jobs in a batch and the callbacks" do
batch =
["[email protected]", "[email protected]", "[email protected]"]
|> Enum.map(&MyBatch.new(%{email: &1}))
|> MyBatch.new_batch(batch_callback_args: %{admin_email: "[email protected]"})
assert %{completed: 4} = run_batch(batch)
end
When you're application code inserts a batch on its own, outside the context of
your test, you can't call run_batch/1,2
. In that case, you can use
Oban.Pro.Testing.drain_jobs/1
instead to execute the jobs and the callback:
test "draining batches inserted by application code" do
:ok = MyApp.welcome_recent_users()
assert %{completed: 4} = drain_jobs(queue: :all)
end
testing-chunks
Testing Chunks
Chunk workers process jobs in groups based on size or a timeout. They are an
outlier amongst other workers because the process/1
callback receives a list
of jobs rather than a single job. That difference prevents chunks from working
with perform_job/2
, and instead you can use the Oban.Pro.Testing.perform_chunk/3
helper.
To demonstrate testing chunks we'll define a worker that checks a batch of password hashes against a pwned database and notifies admins when a significant ratio of hashes are compromised.
defmodule MyApp.MyChunk do
use Oban.Pro.Workers.Chunk, size: 100, timeout: :timer.seconds(30)
@impl true
def process([_ | _] = jobs) do
pwned_count = Enum.count(jobs, fn %{args: args} -> MyApp.was_pwned?(args["hash"]) end)
pwned_ratio = pwned_count / length(jobs)
if pwned_ratio > 0.1, do: MyApp.deliver_pwned_alert()
{:ok, pwned_count}
end
end
Now, exercise process/1
with perform_chunk/3
:
test "calculating the ratio of pwned password hashes" do
clear_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:clear)}
pwned_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:pwned)}
assert {:ok, 3} = perform_chunk(MyChunk, clear_args ++ pwned_args)
end
Unit testing chunks is convenient for checking edge cases, but it lacks the depth and reality of real chunked execution. For that, we need to integration testing.
integration-testing-chunks
Integration Testing Chunks
During normal execution chunk size is limited to the configured size
. In our
example above, the size
is set to 100
, which means that a chunk may process
up to 100 jobs at once. To verify our chunking we'll insert and execute the
chunk jobs with Oban.Pro.Testing.run_chunk/2
:
test "running up to 100 jobs at a time" do
jobs = Enum.map(1..150, fn _ -> MyChunk.new(%{hash: MyApp.gen_hash(:clear)}) end)
assert %{completed: 2} = run_chunk(jobs)
# Integration tests are about side effects; assert no alert email was delivered
refute email_delivered()
end
As with the other run_*
functions, if you need to execute jobs that were
inserted within application code, use drain_jobs
instead:
test "draining chunks inserted by application code" do
# Prepare the database with less than 100 recent users
:ok = MyApp.check_pwned_signups()
assert %{completed: 1} = drain_jobs(queue: :chunked)
end
testing-workflows
Testing Workflows
Workflows jobs compose together with arbitrary dependencies that effect if and
when jobs are executed. Individual jobs in a workflow are easily tested with
perform_job/3
, but the real challenge is testing the interplay between jobs as
they execute.
For this demonstration we'll build a workflow that applies various natural language processing to a text submission.
defmodule MyApp.MyWorkflow do
use Oban.Pro.Workers.Workflow, recorded: true
@impl true
def process(%{args: %{"text" => text, "mode" => mode}}) do
analysis_fun =
case mode do
"complexity" => :complexity_analysis
"sentiment" => :sentiment_analysis
"syntax" => :syntax_analysis
end
apply(MyApp, analysis_fun, [text])
end
def process(job) do
expressiveness =
job
|> all_workflow_jobs(only_deps: true)
|> Enum.map(fn job, acc -> {job.args["mode"], fetch_recorded(job)} end)
|> MyApp.expressiveness()
{:ok, expressiveness}
end
end
Our workflow is defined as a single worker with multiple process/1
clauses.
Typically, workflows are composed of multiple workers, but there isn't any
practical difference.
Each of the clauses can be exercised with perform_job/3
:
test "analyzing text sentiment" do
assert {:ok, :positive} = perform_job(MyWorkflow, %{text: text(), mode: :sentiment})
end
Dependencies between jobs are what defines a workflow, and to test those dependencies we need integration tests.
integration-testing-workflows
Integration Testing Workflows
Downstream workflow jobs only run when their upstream dependencies have
completed successfully. To verify ordered execution between dependencies we'll
insert and execute jobs using Oban.Pro.Testing.run_workflow/2
.
test "running through a complete NLP analysis workflow" do
text = text_sample()
workflow =
MyWorkflow.new_workflow()
|> MyWorkflow.add(:com, MyWorkflow.new(%{text: text, mode: :complexity}))
|> MyWorkflow.add(:sen, MyWorkflow.new(%{text: text, mode: :sentiment}))
|> MyWorkflow.add(:syn, MyWorkflow.new(%{text: text, mode: :syntax}))
|> MyWorkflow.add(:exp, MyWorkflow.new(%{}), deps: [:com, :sen, :syn])
# Using with_summary: false gives us a list of executed jobs, but be careful,
# the exact execution order may differ from the insertion order.
assert [_com, _sen, _syn, exp_job] = run_workflow(workflow, with_summary: false)
assert {:ok, 0.8} = MyWorkflow.fetch_recorded(exp_job)
end
The test executes all upstream jobs and then uses the results to compute a score
in the final downstream job. Because we want to verify the result of the final
job, we use with_summary: false
to give us the completed jobs rather than a
count summary.
📓 The with_summary
option is available to all run
and drain
functions.
Finally, you can use drain_jobs
directly when your application code inserts
the workflow:
test "draining workflows inserted by application code" do
:ok = MyApp.analyze_text(text_sample())
assert %{completed: 4} = drain_jobs(queue: :all)
end