Oban.Pro.Worker behaviour (Oban Pro v1.4.3)
The Oban.Pro.Worker
is a replacement for Oban.Worker
with expanded capabilities such as
encryption, enforced structure, output recording, and execution hooks.
In addition, because Batch
, Chunk
, and Workflow
workers are based on the Pro worker, you
can use all of the advanced options* there as well (The one exception is that recording doesn't
function with the Chunk
worker).
Usage
Using Oban.Pro.Worker
is identical to using Oban.Worker
, with a few additional options. All
of the basic options such as queue
, priority
, and unique
are still available along with
more advanced options.
To create a basic Pro worker point use
at Oban.Pro.Worker
and define a process/1
callback:
def MyApp.Worker do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def process(%Job{} = job) do
# Do stuff with the job
end
end
If you have existing workers that you'd like to convert you only need to change the use
definition and replace perform/1
with process/1
.
Without any of the advanced Pro features there isn't any difference between the basic and pro workers.
Structured Jobs
Structured workers help you catch invalid data within your jobs by validating args on insert and casting args before execution. They also automatically generate structs for compile-time checks and friendly dot access.
Defining a Structured Worker
Structured workers use args_schema/1
to define which fields are allowed, required, and their
expected types. Another benefit, aside from validation, is that args passed to process/1
are
converted into a struct named after the worker module. Here's an example that demonstrates
defining a worker with several field types and embedded data structures:
defmodule MyApp.StructuredWorker do
use Oban.Pro.Worker
args_schema do
field :id, :id, required: true
field :name, :string, required: true
field :mode, :enum, values: ~w(enabled disabled paused)a, default: :enabled
field :xtra, :term
embeds_one :data, required: true do
field :office_id, :uuid, required: true
field :has_notes, :boolean, default: false
field :addons, {:array, :string}
end
embeds_many :addresses do
field :street, :string
field :city, :string
field :country, :string
end
end
@impl Oban.Pro.Worker
def process(%Job{args: %__MODULE__{id: id, name: name, mode: mode, data: data}}) do
%{office_id: office_id, notes: notes} = data
# Use the matched, cast values
end
end
The example's schema declares five top level keys, :id
, :name
, :mode
, :data
, and
:addresses
. Of those, only :id
, :name
, and the :office_id
subkey are marked required.
The :mode
field is an enum that validates values and casts to an atom. The embedded :data
field declares a nested map with its own type coercion and validation, including a custom
Ecto.UUID
type. Finally, :addresses
specifies an embedded list of maps.
Job args are validated on new/1
and errors bubble up to prevent insertion:
StructuredWorker.new(%{id: "not-an-id", mode: "unknown"}).valid?
# => false (invalid id, invalid mode, missing name)
StructuredWorker.new(%{id: "123", mode: "enabled"}).valid?
# => false (missing name)
StructuredWorker.new(%{id: "123", name: "NewBiz", mode: "enabled"}).valid?
# => true
This shows how args, stored as JSON, are cast before passing to process/1
:
# {"id":123,"name":"NewBiz","mode":"enabled","data":{"parent_id":456}}
%MyApp.StructuredWorker{
id: 123,
name: "NewBiz",
mode: :enabled,
data: %{parent_id:456}
}
Structured Types and Casting
Type casting and validation are handled by changesets. All types supported in Ecto schemas are
allowed, e.g. :id
, :integer
, :string
, :float
, or :map
. See the Ecto documentation for
a complete list of Ecto types and their Elixir counterparts.
Structured Options
Structured fields support a few common options.
:default
— Sets the default value for a field. The default value is calculated at compilation time, so don't use expressions likeDateTime.utc_now/0
as they'd be the same for all records.:required
— Validates that a value is provided for the field. Values that arenil
or an empty string aren't considered valid.
Structured Extensions
Structured workers support some convenient extensions beyond Ecto's standard type casting.
:enum
— Maps atoms to strings based on a list of predefiend atoms passed likevalues: ~w(foo bar baz)a
. Both validates that values are included in the list and casts them to an atom.:term
— Safely encodes any Elixir term as a string for storage, then decodes it back to the original term on load. This is similar to:any
, but works with terms like tuples or pids that can't usually be serialied. For safety, terms are encoded with the:safe
option to prevent decoding data that may be used to attack the runtime.:uuid
— an intention revealing alias forbinary_id
{:array, *}
— A list of one or more values of any type, including:enum
and:uuid
embeds_one/2,3
— Declares a nested map with an explicit set of fieldsembeds_many/2,3
— Delcares a list of nested maps with an explicit set of fields
Defining Typespecs for Structured Workers
Typespecs aren't generated automatically. If desired, you must to define a sctuctured worker's type manually:
defmodule MyApp.StructuredWorker do
use Oban.Pro.Worker
@type t :: %__MODULE__{id: integer(), active: boolean()}
Recorded Jobs
Sometimes the output of a job is just as important as any side effects. When that's the case,
you can use the recorded
option to stash a job's output back into the job itself. Results are
compressed and safely encoded for retrieval later, either manually, in a batch callback, or a in
downstream workflow job.
Defining a Recorded Worker
defmodule MyApp.RecordedWorker do
use Oban.Pro.Worker, recorded: true
@impl true
def process(%Job{args: args}) do
# Do your typical work here.
end
end
If your process function returns an {:ok, value}
tuple, it is recorded. Any other value, i.e.
an plain :ok
, error, or snooze, is ignored.
The example above uses recorded: true
to opt into recording with the defaults. That means an
output limit
of 32kb after compression and encoding—anything larger than the configured limit
will return an error tuple. If you expect larger results (and you want them stored in the
database) you can override the limit. For example, to set the limit to 64kb instead:
use Oban.Pro.Worker, recorded: [limit: 64_000]
Retrieving Results
The fetch_recorded/1
function is your ticket to extracting recorded results. If a job has ran
and recorded a value, it will return an {:ok, result}
tuple:
job = MyApp.Repo.get(Oban.Job, job_id)
case MyApp.RecordedWorker.fetch_recorded(job) do
{:ok, result} ->
# Use the result
{:error, :missing} ->
# Nothing recorded yet
end
Encrypted Jobs
Some applications have strong regulations around the storage of personal information. For
example, medical records, financial details, social security numbers, or other data that should
never leak. The encrypted
option lets you store all job data at rest with encryption so
sensitive data can't be seen.
Defining an Encrypted Worker
Encryption is handled transparently as jobs are inserted and executed. All you need to do is flag the worker as encrypted and configure it to fetch a secret key:
defmodule MyApp.SensitiveWorker do
use Oban.Pro.Worker, encrypted: [key: {module, fun, args}]
@impl true
def process(%Job{args: args}) do
# Args are decrypted, use them as you normally would
end
end
Now job args are encrypted before insertion into the database and decrypted when the job runs.
Generating Encryption Keys
Encryption requires a 32 byte, Base 64 encoded key. You can generate one with the :crypto
and
Base
modules:
key = 32 |> :crypto.strong_rand_bytes() |> Base.encode64()
The result will look something like this "w7xGJClzEh1pbWuq6zsZfKfwdINu2VIkgCe3IO0hpsA="
.
While it's possible to use the generated key in your worker directly, that defeats the purpose of encrypting sensitive data because anybody with access to the codebase can read the encryption key. That's why it is highly recommended that you use an MFA to retrieve the key dynamically at runtime. For example, here's how you could pull the key from the Application environment:
use Oban.Pro.Worker, encrypted: [key: {Application, :fetch_key!, [:enc_key]}]
Encryption Implementation Details
Erlang's
crypto
module is used with theaes_256_ctr
cipher for encryption.Encoding and decoding stacktraces are pruned to prevent leaking the private key or initialization vector.
Only
args
are encrypted,meta
is kept as plaintext. You can use that to your advantage for uniqueness, but be careful not to put anything sensitive inmeta
.Error messages and stacktraces aren't encrypted and are stored as plaintext. Be careful not to expose sensitive data when raising errors.
Args are encrypted at rest as well as in Oban Web. You won't be able to view or search encrypted args in the Web dashboard.
Uniqueness works for encrypted jobs, but not for arguments because the same args are encrypted differently every time. Favor
meta
overargs
to enforce uniqueness for encrypted jobs.
Job Deadlines
Jobs that shouldn't run after some period of time can be marked with a deadline
. After the
deadline has passed the job will be pre-emptively cancelled on its next run, or optionally
during its next run if desired.
defmodule DeadlinedWorker do
use Oban.Pro.Worker, deadline: {1, :hour}
@impl true
def process(%Job{args: args}) do
# If this doesn't run within an hour, it's cancelled
end
end
Deadlines may be set at runtime as well, provided the worker was already configured with a
deadline
option.
# Specify the deadline in seconds
DeadlinedWorker.new(args, deadline: 60)
# Specify the deadline in minutes, using the tuple syntax
DeadlinedWorker.new(args, deadline: {30, :minutes})
# Specify the deadline in days
DeadlinedWorker.new(args, deadline: {1, :day})
In either case, the deadline is always relative and computed at runtime. That also allows the deadline to consider scheduling—a job scheduled to run 1 hour from now with a 1 hour deadline will expire 2 hours in the future. Note that deadlines only account for scheduling on insert, not on retry or after a snooze.
Applying Deadlines to Running Jobs
A job that has already started may be cancelled at runtime if it won't complete before the deadline. Consider a slower job that takes 5 minutes to execute and has a deadline 10 minutes in the future. If the job starts 9 minutes from now it wouldn't finish until 4 minutes past the deadline.
By setting the force
flag, the job will cancel itself if it exceeds the deadline:
defmodule FirmDeadlinedWorker do
use Oban.Pro.Worker, deadline: [in: {10, :minutes}, force: true]
...
Worker Hooks
Worker hooks are called after a job finishes executing. They can be defined as callback functions on the worker, or in a separate module for reuse across workers.
Hooks are called synchronously, from within the job's process with safety applied. Any exceptions or crashes are caught and logged, they won't cause the job to fail or the queue to crash.
Hooks do not modify the job or execution results. Think of them as a convenient alternative to globally attached telemetry handlers. They are purely for side-effects such as cleanup, logging, recording metrics, broadcasting notifications, updating other records, error notifications, etc.
Defining Hooks
There are three mechanisms for defining and attaching an after_process/3
hook:
- Implicitly—hooks are defined directly on the worker and they only run for that worker
- Explicitly—hooks are listed when defining a worker and they run anywhere they are listed
- Globally—hooks are executed for all Pro workers
It's possible to combine each type of hook on a single worker. When multiple hooks are stacked they're executed in the order: implicit, explicit, and then global.
An after_process/3
hook is called with the job and an execution state corresponding to the
result from process/1
:
complete
—whenprocess/1
returns:ok
or{:ok, result}
cancel
—whenprocess/1
returns{:cancel, reason}
discard
—when a job errors and exhausts retries, or returns{:discard, reason}
error
—when a job crashes, raises an exception, or returns{:error, value}
snooze
—when a job returns{:snooze, seconds}
First, here's how to define a single implicit local hook on the worker using
after_process/3
:
defmodule MyApp.HookWorker do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def process(_job) do
# ...
end
@impl Oban.Pro.Worker
def after_process(state, %Job{} = job, _result) do
MyApp.Notifier.broadcast("oban-jobs", {state, %{id: job.id}})
end
end
Any module that exports after_process/3
can be used as a hook. For example, here we'll
define a shared error notification hook:
defmodule MyApp.ErrorHook do
def after_process(state, job, _result) when state in [:discard, :error] do
error = job.unsaved_error
extra = Map.take(job, [:attempt, :id, :args, :max_attempts, :meta, :queue, :worker])
tags = %{oban_worker: job.worker, oban_queue: job.queue, oban_state: job.state}
Sentry.capture_exception(error.reason, stacktrace: error.stacktrace, tags: tags, extra: extra)
end
def after_process(_state, _job), do: :ok
end
defmodule MyApp.HookWorker do
use Oban.Pro.Worker, hooks: [MyApp.ErrorHook]
@impl Oban.Pro.Worker
def process(_job) do
# ...
end
end
The same module can be attached globally, for all Oban.Pro.Worker
modules, using
attach_hook/1
:
:ok = Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)
Attaching hooks in your application's start/2
function is an easy way to ensure hooks are
registered before your application starts processing jobs.
def start(_type, _args) do
:ok = Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)
children = [
...
Summary
Types
Options to enable and configure encrypted
mode.
All possible hook states.
Options to enable and configure recorded
mode.
Callbacks
Called after a job finishes processing regardless of status (complete, failure, etc).
Extract the results of a previously executed job.
Called when executing a job.
Functions
Define an args schema struct with field definitions and optional embedded structs.
Register a worker hook to be ran after any Pro worker executes.
Unregister a worker hook.
Types
encrypted()
@type encrypted() :: [{:key, mfa()}]
Options to enable and configure encrypted
mode.
hook_state()
@type hook_state() :: :cancel | :complete | :discard | :error | :snooze
All possible hook states.
recorded()
@type recorded() :: true | [to: atom(), limit: pos_integer(), safe_decode: boolean()]
Options to enable and configure recorded
mode.
Callbacks
@callback after_process(hook_state(), job :: Oban.Job.t()) :: :ok
@callback after_process(hook_state(), job :: Oban.Job.t(), result :: Oban.Worker.result()) :: :ok
Called after a job finishes processing regardless of status (complete, failure, etc).
See the shared "Worker Hooks" section for more details.
@callback fetch_recorded(job :: Oban.Job.t()) :: {:ok, term()} | {:error, :missing}
Extract the results of a previously executed job.
If a job has ran and recorded a value, it will return an {:ok, result}
tuple. Otherwise,
you'll get {:error, :missing}
.
process(job)
@callback process(job :: Oban.Job.t() | [Oban.Job.t()]) :: Oban.Worker.result()
Called when executing a job.
The process/1
callback behaves identically to Oban.Worker.perform/1
, except that it may
have pre-processing and post-processing applied.
Functions
Define an args schema struct with field definitions and optional embedded structs.
The schema is used to validate args before insertion or execution. See the Structured Workers section for more details.
Example
Define an args schema for a worker:
defmodule MyApp.Worker do
use Oban.Pro.Worker
args_schema do
field :id, :id, required: true
field :name, :string, required: true
field :mode, :enum, values: ~w(on off paused)a
field :safe, :boolean, default: false
embeds_one :address, required: true do
field :street, :string
field :number, :integer
field :city, :string
end
end
...
Register a worker hook to be ran after any Pro worker executes.
The module must define a function that matches the hook. For example, a module that handles
an :on_complete
hook must define an on_complete/1
function.
Example
Attach a hook handler globally:
defmodule MyApp.Hook do
def after_process(_state, %Oban.Job{} = job) do
# Do something with the job
:ok
end
end
:ok = Oban.Pro.Worker.attach_hook(MyApp.Hook)
@spec detach_hook(module()) :: :ok
Unregister a worker hook.
Example
Detach a previously registered global hook:
:ok = Oban.Pro.Worker.detach_hook(MyApp.Hook)