Oban.Pro.Decorator (Oban Pro v1.6.5)
The Decorator extension converts functions into Oban jobs with a simple annotation.
Decorated functions, such as those in contexts or other non-worker modules, can be executed as fully fledged background jobs with retries, scheduling, and the other guarantees you'd expect from Oban jobs.
Usage
To get started, use the Decorator module, then annotate functions with the @job attribute.
Here's a simple example that uses @job true to decorate a function without any options:
defmodule MyApp.Business do
use Oban.Pro.Decorator
@job true
def weekly_report(tps_id, opts) do
...
end
endThe @job attribute also accepts all standard Oban.Worker options, e.g. max_attempts,
priority, queue. This example swaps out @job true for a list of options:
@job [max_attempts: 3, priority: 1, queue: :reports, recorded: true]
def weekly_report(tps_id, opts) do
...
endNow you can build and insert a job by calling insert_weekly_report/2:
{:ok, job} = MyApp.Business.insert_weekly_report(123, pdf: true, rtf: true)Notice that the second argument is a keyword list of options, which isn't normally allowed in job args because it's not JSON serializable.
Generated Functions
Functions decorated with @job generate three variants of the original function:
new_*— builds anOban.Jobchangeset ready to be inserted for executioninsert_*— builds and inserts anOban.JobusingOban.insert/2relay_*— inserts a job, awaits execution, then returns the job's results
See the comp_opts/0 typespec for the subset of job options that are supported at compile
time. Additional options are available at runtime, as described in the Runtime
Options section below.
Using New
The new_ variant will build an Oban.Job changeset that's ready for insertion, but not
persisted to the database. This is identical to the output from calling Oban.Worker.new/2 on
a standard worker.
changeset = Business.new_weekly_report(123)The returned changeset is perfect for bulk inserts via Oban.insert_all/1:
[123, 456, 789]
|> Enum.map(&Business.new_weekly_report/1)
|> Oban.insert_all()It can also be used to compose batches or workflows:
alias Oban.Pro.Workflow
Workflow.new()
|> Workflow.add(:rep_1, Business.new_weekly_report(1))
|> Workflow.add(:rep_2, Business.new_weekly_report(2))
|> Workflow.add(:rep_3, Business.new_weekly_report(3))
|> Workflow.add(:fin, Business.new_finish_up(), deps: ~w(rep_1 rep_2 rep_3)a)
|> Oban.insert_all()Using Insert
The insert_ variant builds a changeset and immediately calls Oban.insert/3 to enqueue it.
The result is a success tuple containing the job, or a changeset with errors.
{:ok, job} = Business.insert_weekly_report(123)By default, jobs are inserted using the standard Oban instance. For applications that run
multiple Oban instances, or use a non-standard name, you can override the instance with the
:oban option:
{:ok, job} = Business.insert_weekly_report(123, oban: SomeOban)Using Relay
The relay_ variant builds a changeset, inserts it, then leverages Oban.Pro.Relay to await
execution and return a result:
{:ok, result} = Business.relay_weekly_report(123)The default timeout is a brief 5000ms, which doesn't account for scheduling or queueing time. Provide an alternate timeout to wait longer:
case Business.relay_weekly_report(123, timeout: 30_000) do
{:ok, result} -> IO.inspect(result, label: "RESULT")
{:error, reason} -> IO.inspect(reason, label: "ERROR")
endNote that the timeout option only controls how long the local process will block while
awaiting a result. The job will keep executing regardless of the timeout period.
Considerations and Caveats
Decorated functions are a convenient way to run functions in the background, and suitable in many situations. However, there are circumstances where they're unsuitable and you should exercise care:
Advanced worker functionality such as custom backoffs, hooks, structured args, or callbacks requires a dedicated worker module and isn't suitable for decoration.
Args of any type are safely serialized, but dumping large amounts of data may cause performance problems because it must be serialized, stored, and deserialized.
Changing function signatures while jobs are in-flight can cause jobs to fail, just like changing the shape of args passed to a
process/1callback.
Runtime Options
Each decorated function has an additional generated clause that accepts job options, e.g.
new_weekly_report/1 also has a new_weekly_report/2 variant.
All compile time options can be overridden at runtime. For example, to override the queue and
max_attempts:
Business.insert_weekly_report(123, queue: "default", max_attempts: 10)In addition to compile time options, runtime options accepted by Oban.Job.new/2 (other than
worker) are also allowed. This makes it possible to schedule decorated jobs:
Business.insert_weekly_report(123, schedule_in: {1, :minute})See full_opts/0 for the complete typespec of runtime options.
Patterns and Guards
Each generated variant retains pattern matches and guards from the original function. That allows expressive, defensive data validation before a job executes.
For example, use a guard to ensure the id argument is an integer:
@job true
def notify_user(id) when is_integer(id), do: ...Or, pattern match on a map with a user_id key and ensure the id is an integer:
@job true
def notify_user(%{user_id: id}) when is_integer(id), do: ...Complex Types
Any Elixir term may be passed as an argument, not just JSON encodable values. That enables passing native data-types such as tuples, keywords, or structs that can't easily be used in regular jobs.
@job true
def add_discount(%User{id: _}, amount: 10_000), do: ...Avoid Non-Portable Types
Pass non-portable data types such as pid, reference, port with caution. There's no
guarantee that a job will run on the same node and those specific values available.
Furthermore, be careful passing anonymous functions because they are closures over the local
environment.
Return Values
Decorated jobs respect the same standard return types as Oban.Pro.Worker.process/1. That
means you can return an {:error, reason} tuple to signify an error, or {:cancel, reason} to
quietly cancel a job. However, because decorated functions weren't necessarily designed to be
executed in a job, unexpected returns are considered a success.
While there's no harm in returning nil, a struct, or some other non-standard value, it's best
to return an explicitly support a type such as :ok or {:ok, value}.
@job true
def update_account(user_id, params) do
user = Repo.get(User, user_id)
do_update(user, params)
- user
+ {:ok, user}
endUnique and Replace
Unique and replace options are available for decorated jobs. However, they have purposeful limitations for compatibility with the decorated worker:
unique— only supportsstates,period, andtimestampoptions. Thefieldsandkeysoptions aren't supported.replace— expects the newer, per-state syntax, and it doesn't support replacing theworkerorargs.
Both options can be defined at compilie in the @job annotation:
@job [unique: [period: :infinity], replace: [scheduled: [:scheduled_at]]]Or as runtime options:
Business.send_notice(user, schedule_in: 60, replace: [scheduled: [:scheduled_at]])Limiting Decoration
To avoid generating unnecessary functions you can disable generating new, insert, or relay
functions via passing flags to use:
use Oban.Pro.Decorator, new: false
use Oban.Pro.Decorator, insert: false
use Oban.Pro.Decorator, relay: falseFiltering options can be combined to restrict generation to one variant. For example, to only
generate insert_* functions:
use Oban.Pro.Decorator, new: false, relay: falseTesting Decorated Jobs
Testing decorated jobs is tricky because they're always enqueued with Oban.Pro.Decorated as
the worker. The assert helpers Oban.Pro.Testing have a :decorated option specifically to
make testing decorated jobs more convenient.
Pass a captured function with the original arity to the decorated option:
assert_enqueued decorated: &Business.weekly_report/2Use a list of to assert on args (not a map, as you would for a standard job):
assert_enqueued args: [123, pdf: true], decorated: &Business.weekly_report/2Accessing the Decorated Job
The Decorator module provides a convenient way to access the currently executing job through
the current_job/0 function. This can be useful when you need job context information during
execution.
In any module that uses Oban.Pro.Decorator, you can call the current_job/0 function to
retrieve the current job. This is particularly useful when you need to access job metadata as
part of a workflow. For example:
defmodule MyApp.Doubler do
use Oban.Pro.Decorator
alias Oban.Pro.Workflow
def invoke do
Workflow.new()
|> Workflow.add(:dbl_1, new_double(1))
|> Workflow.add(:dbl_2, new_double(2))
|> Workflow.add(:dbl_3, new_double(3))
|> Workflow.add(:print, new_print(), deps: [:dbl_1, :dbl_2, :dbl_3])
|> Oban.insert_all()
end
@job recorded: true
def double(value), do: value * 2
@job true
def print do
current_job()
|> Workflow.all_recorded(only_deps: true)
|> IO.inspect()
end
endIf current_job/0 is called outside of a job context (not during job execution), it returns
nil.
Summary
Types
Options allowed in @job annotations at compile time.
Options allowed at runtime when calling generated new_, insert_, or relay_ functions.
Types
@type comp_opts() :: [ max_attempts: pos_integer(), oban: GenServer.name(), priority: 0..9, queue: atom() | binary(), recorded: boolean() | keyword(), replace: [Oban.Job.replace_by_state_option()], tags: Oban.Job.tags(), unique: true | [ period: timeout(), states: atom() | [Oban.Job.unique_state()], timestamp: :inserted_at | :scheduled_at ] ]
Options allowed in @job annotations at compile time.
@type full_opts() :: [ max_attempts: pos_integer(), meta: map(), oban: GenServer.name(), priority: 0..9, queue: atom() | binary(), recorded: boolean() | keyword(), replace: [Oban.Job.replace_by_state_option()], scheduled_at: DateTime.t(), schedule_in: Oban.Job.schedule_in_option(), tags: Oban.Job.tags(), timeout: timeout(), unique: true | [ period: timeout(), states: atom() | [Oban.Job.unique_state()], timestamp: :inserted_at | :scheduled_at ] ]
Options allowed at runtime when calling generated new_, insert_, or relay_ functions.