Oban.Pro.Batch behaviour (Oban Pro v1.5.0-rc.6)
Batches link the execution of many jobs as a group and runs optional callbacks after jobs are processed. This allows your application to coordinate the execution of any number of jobs in parallel.
Usage
Batches are composed of one or more Pro workers that are linked with a shared batch_id
and
optional callbacks. As a simple example, let's define a worker that delivers daily emails:
defmodule MyApp.EmailBatch do
use Oban.Pro.Worker, queue: :mailers
@behaviour Oban.Pro.Batch
@impl Oban.Pro.Worker
def process(%Job{args: %{"email" => email}}) do
MyApp.Mailer.daily_update(email)
end
@impl Oban.Pro.Batch
def batch_completed(_job) do
Logger.info("BATCH COMPLETE")
:ok
end
end
Now, create a batch with new/1
by passing a list of job changesets:
emails
|> Enum.map(&MyApp.EmailWorker.new(%{email: &1}))
|> Batch.new()
|> Oban.insert_all()
After all jobs in the batch are completed
, the batch_completed/1
callback will be
triggered.
Handler Callbacks
After jobs in the batch are processed, a callback job may be inserted. There are four optional batch handler callbacks that a worker may define:
callback | enqueued after |
---|---|
batch_attempted/1 | all jobs executed at least once |
batch_cancelled/1 | the first job is cancelled |
batch_completed/1 | all jobs in the batch are completed |
batch_discarded/1 | the first job is discarded |
batch_exhausted/1 | all jobs are completed , cancelled , or discarded |
batch_retryable/1 | the first job is retryable |
Each callback runs in a separate, isolated job, so it may be retried or discarded like any other
job. The callback function receives an Oban.Job
struct with the batch_id
in meta
and
should return :ok
(or another valid Worker.result()
).
Here we'll implement each of the optional handler callbacks and have them print out the batch
status along with the batch_id
:
defmodule MyApp.BatchWorker do
use Oban.Pro.Worker
@behaviour Oban.Pro.Batch
@impl Oban.Pro.Worker
def process(_job), do: :ok
@impl Oban.Pro.Batch
def batch_attempted(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect({:attempted, batch_id})
:ok
end
@impl Oban.Pro.Batch
def batch_cancelled(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect({:cancelled, batch_id})
:ok
end
@impl Oban.Pro.Batch
def batch_completed(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect({:completed, batch_id})
:ok
end
@impl Oban.Pro.Batch
def batch_discarded(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect({:discarded, batch_id})
:ok
end
@impl Oban.Pro.Batch
def batch_exhausted(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect({:exhausted, batch_id})
:ok
end
@impl Oban.Pro.Batch
def batch_retryable(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect({:retryable, batch_id})
:ok
end
end
Hybrid Batches
Batches may also be built from a variety of workers, though you must provide an explicit worker for callbacks:
mail_jobs = Enum.map(mail_args, &MyApp.MailWorker.new/1)
push_jobs = Enum.map(push_args, &MyApp.PushWorker.new/1)
[callback_worker: MyApp.CallbackWorker]
|> Batch.new()
|> Batch.add(mail_jobs)
|> Batch.add(push_jobs)
Without an explicit callback_worker
, any worker in the batch may be used for callback. That
makes callback handling unpredictable and unexpected.
Customizing Batches
Batches accept a variety of options for grouping and callback customization.
Batch ID
By default a batch_id
is generated as a time-ordered random UUIDv7. UUIDs are more
than sufficient to ensure that batches are unique between workers and nodes for any period.
However, if you require control, you can override batch_id
generation at the worker level or
pass a value directly to the new/2
function.
Batch.new(batch_id: "custom-batch-id")
Batch Name
Batches accept an optional name to describe the purpose of the batch, beyond the generated id or
individual jobs in it. While the batch_id
must be unique, the batch_name
doesn't, so it can
be used as a general purpose label.
Batch.new(batch_name: "nightly-etl")
Callback Workers
For some batches, notably those with heterogeneous jobs, it's necessary to specify a different
worker for callbacks. That is easily accomplished by passing the :callback_worker
option to
new/2
:
Batch.new(callback_worker: MyCallbackWorker)
The callback worker must be an Oban.Pro.Worker
that defines one or more of the batch callback
handlers.
Callback Options
By default, callback jobs have an empty args
map and inherit other options from batch jobs.
With callback_opts
, you can set standard job options for batch callback jobs (only args
,
max_attempts
, meta
, priority
, queue
, and tags
are allowed).
For example, here we're passing a webhook URLs as args
:
Batch.new(callback_opts: [args: %{webhook: "https://web.hook"}])
Here, we change the callback queue and knock the priority down:
Batch.new(callback_opts: [queue: :callbacks, priority: 9])
Be careful to minimize callback_opts
as they are stored in each batch job's meta.
Fetching Batch Jobs
To pull more context from batch jobs, it's possible to load all jobs from the batch with
all_jobs/2
and stream_jobs/2
. The functions take a single batch job and returns a list or
stream of all non-callback jobs in the batch, which you can then operate on with Enum
or
Stream
functions.
As an example, imagine you have a batch that ran for a few thousand accounts and you'd like to notify admins that the batch is complete.
defmodule MyApp.BatchWorker do
use Oban.Pro.Worker
@behaviour Oban.Pro.Batch
@impl Oban.Pro.Batch
def batch_completed(%Job{} = job) do
{:ok, account_ids} =
job
|> Oban.Pro.Batch.all_jobs()
|> Enum.map(& &1.args["account_id"])
account_ids
|> MyApp.Accounts.all()
|> MyApp.Mailer.notify_admins_about_batch()
end
Fetching a thousand jobs may be alright, but for larger batches you don't want to load that much
data into memory. Instead, you can use stream_jobs
to iterate through them lazily:
{:ok, account_ids} =
MyApp.Repo.transaction(fn ->
job
|> Batch.stream_jobs()
|> Stream.map(& &1.args["account_id"])
|> Enum.to_list()
end)
Streaming is provided by Ecto's Repo.stream
, and it must take place within a transaction.
While it may be overkill for small batches, for batches with tens or hundreds of thousands of
jobs, it will prevent massive memory spikes or the database grinding to a halt.
Summary
Callbacks
Called after all jobs in the batch were attempted at least once.
Called after any jobs in the batch have a cancelled
state.
Called after all jobs in the batch have a completed
state.
Called after any jobs in the batch have a discarded
state.
Called after all jobs in the batch have either a completed
or discarded
state.
Called after any jobs in the batch have a retryable
state.
Functions
Add one or more jobs to a batch.
Get all non-callback jobs from a batch.
Append to a batch from an existing batch job.
Cancel all non-callback jobs in a batch.
Create a batch from a workflow.
Build a new batch from a list or stream of job changesets and some options.
Stream all non-callback jobs from a batch.
Types
batch_opts()
@type batch_opts() :: [ batch_id: String.t(), batch_name: String.t(), callback_opts: callback_opts(), callback_worker: module() ]
callback_opts()
@type callback_opts() :: [ args: Oban.Job.args(), max_attempts: pos_integer(), meta: map(), priority: 0..9, queue: atom() | String.t(), tags: Oban.Job.tags() ]
changeset()
@type changeset() :: Oban.Job.changeset()
repo_opts()
@type repo_opts() :: [{:timeout, timeout()}]
@type t() :: %Oban.Pro.Batch{changesets: Enumerable.t(changeset()), opts: map()}
Callbacks
@callback batch_attempted(job :: Oban.Job.t()) :: Oban.Worker.result()
Called after all jobs in the batch were attempted at least once.
If a batch_attempted/1
function is defined it is executed by an isolated callback job.
Example
Print when all jobs were attempted:
def batch_attempted(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect(batch_id, label: "Attempted")
end
@callback batch_cancelled(job :: Oban.Job.t()) :: Oban.Worker.result()
Called after any jobs in the batch have a cancelled
state.
If a batch_cancelled/1
function is defined it is executed by an isolated callback job.
Example
Print when any jobs are discarded:
def batch_cancelled(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect(batch_id, label: "Cancelled")
end
@callback batch_completed(job :: Oban.Job.t()) :: Oban.Worker.result()
Called after all jobs in the batch have a completed
state.
If a batch_completed/1
function is defined it is executed by an isolated callback job.
Example
Print when all jobs are completed:
def batch_completed(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect(batch_id, label: "Completed")
end
@callback batch_discarded(job :: Oban.Job.t()) :: Oban.Worker.result()
Called after any jobs in the batch have a discarded
state.
If a batch_discarded/1
function is defined it is executed by an isolated callback job.
Example
Print when any jobs are discarded:
def batch_discarded(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect(batch_id, label: "Discarded")
end
@callback batch_exhausted(job :: Oban.Job.t()) :: Oban.Worker.result()
Called after all jobs in the batch have either a completed
or discarded
state.
If a batch_exhausted/1
function is defined it is executed by an isolated callback job.
Example
Print when all jobs are completed or discarded:
def batch_exhausted(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect(batch_id, label: "Exhausted")
end
@callback batch_retryable(job :: Oban.Job.t()) :: Oban.Worker.result()
Called after any jobs in the batch have a retryable
state.
If a batch_retryable/1
function is defined it is executed by an isolated callback job.
Example
Print when any jobs are retryable:
def batch_retryable(%Job{meta: %{"batch_id" => batch_id}}) do
IO.inspect(batch_id, label: "Retryable")
end
Functions
add(batch, changeset)
@spec add(t(), Enumerable.t(changeset())) :: t()
Add one or more jobs to a batch.
Examples
Add jobs to an existing batche:
Batch.add(batch, Enum.map(args, &MyWorker.new/1))
Add jobs to a batch one at a time:
Batch.new()
|> Batch.add(MyWorker.new(%{}))
|> Batch.add(MyWorker.new(%{}))
|> Batch.add(MyWorker.new(%{}))
all_jobs(name \\ Oban, job_or_batch_id, opts \\ [])
@spec all_jobs(Oban.name(), Oban.Job.t() | String.t(), [repo_opts()]) :: [ Oban.Job.t() ]
Get all non-callback jobs from a batch.
Examples
Fetch results from all jobs from within a batch_completed/1
callback:
def batch_completed(%Job{} = job) do
results =
job
|> Batch.all_jobs()
|> Enum.map(&fetch_recorded/1)
...
end
Get all batch jobs from anywhere using the batch_id
:
Batch.all_jobs("some-uuid-1234-5678")
Get all jobs from anywhere with a custom Oban instance name:
Batch.all_jobs(MyApp.Oban, "some-uuid-1234-5678")
append(job, opts \\ [])
@spec append(Oban.Job.t(), batch_opts()) :: t()
Append to a batch from an existing batch job.
The batch_id
and any other batch options are propagated to the newly created batch.
Appending and Callbacks
Batch callback jobs are only inserted once. Appending to a batch where callbacks were already triggered, e.g.
batch_completed
, won't re-trigger the callback.
Examples
Build an empty batch from an existing batch job:
job
|> Batch.append()
|> Batch.add(MyWorker.new(%{}))
|> Batch.add(MyWorker.new(%{}))
|> Oban.insert_all()
Build a batch from an existing job with overriden options:
Batch.append(job, callback_worker: MyApp.OtherWorker)
cancel_jobs(oban_name \\ Oban, job_or_batch_id)
@spec cancel_jobs(Oban.name(), Oban.Job.t() | String.t()) :: {:ok, non_neg_integer()}
Cancel all non-callback jobs in a batch.
Examples
Cancel jobs with the job
in a process/1
function:
def process(job) do
if should_stop_processing?(job.args) do
Batch.cancel_jobs(job)
else
...
end
end
Cancel jobs from anywhere using the batch_id
:
Batch.cancel_jobs("some-uuid-1234-5678")
Cancel jobs from anywhere with a custom Oban instance name:
Batch.cancel_jobs(MyApp.Oban, "some-uuid-1234-5678")
from_workflow(workflow, opts \\ [])
@spec from_workflow(Oban.Pro.Workflow.t(), batch_opts()) :: t()
Create a batch from a workflow.
All jobs in the workflow are augmented to also be part of a batch.
Examples
Create a batch from a workflow without any extra options:
Workflow.new()
|> Workflow.add(:step_1, MyWorker.new(%{id: 123}))
|> Workflow.add(:step_2, MyWorker.new(%{id: 345}), deps: [:step_1])
|> Workflow.add(:step_3, MyWorker.new(%{id: 456}), deps: [:step_2])
|> Batch.from_workflow()
|> Oban.insert_all()
Create a batch with callback options:
Batch.from_workflow(workflow, callback_opts: [priority: 3], callback_worker: MyApp.Worker)
new(changesets, opts)
@spec new(Enumerable.t(changeset()), batch_opts()) :: t()
Build a new batch from a list or stream of job changesets and some options.
Examples
Build an empty batch without any jobs or options:
Batch.new()
Build a batch from a list of changesets:
1..3
|> Enum.map(fn id -> MyWorker.new(%{id: id}) end)
|> Batch.new()
Build a batch from a stream:
stream_of_args
|> Stream.map(&MyWorker.new/1)
|> Batch.new()
Build a batch with callback options:
Batch.new(list_of_jobs, batch_id: "abc-123", callback_worker: MyApp.CallbackWorker)
stream_jobs(name \\ Oban, job_or_batch_id, opts \\ [])
@spec stream_jobs(Oban.name(), Oban.Job.t() | String.t(), [repo_opts()]) :: Enum.t()
Stream all non-callback jobs from a batch.
Examples
Stream all batch jobs from within a batch_completed/1
callback:
def batch_completed(%Job{} = job) do
{:ok, account_ids} =
MyApp.Repo.transaction(fn ->
job
|> Batch.stream_jobs()
|> Enum.map(& &1.args["account_id"])
end)
# use_account_ids
end
Stream all batch jobs from anywhere using the batch_id
:
MyApp.Repo.transaction(fn ->
"some-uuid-1234-5678"
|> Batch.stream_jobs()
|> Enum.map(& &1.args["account_id"])
end)
Stream all batch jobs using a custom Oban instance name:
MyApp.Repo.transaction(fn ->
MyApp.Oban
|> Batch.stream_jobs("some-uuid-1234-5678")
|> Enum.map(& &1.args["account_id"])
end)