Oban.Pro.Worker behaviour (Oban Pro v1.5.0-rc.8)

Oban.Pro.Worker is a replacement for Oban.Worker with expanded capabilities.

Major features:

  • 🏗️ Structured Jobs — types for job args with validation, casting, defaults, enums and more.
  • 📼 Recorded Jobs — store a job's return output for retrieval later by other jobs, in a console, or in the Web dashboard.
  • 🔗 Chained Jobs — link jobs together to ensure they run in a strict sequential order regardless of scheduling or retries.
  • 🔐 Encrypted Jobs — store job args with encryption at rest so sensitive data can't be seen from the database or Web dashboard.
  • 🪦 Job Deadlines — preemptively cancel jobs that shouldn't run after a period of time has elapsed.
  • 🧑‍🤝‍🧑 Worker Aliases — aliasing allows jobs enqueued with the original worker name to continue without exceptions using the new worker code.
  • 🪝 Worker Hooks — callbacks triggered around job execution, defined as callback functions on the worker, or in a separate module for reuse.

Usage

Using Oban.Pro.Worker is identical to using Oban.Worker, with a few additional options. All of the basic options such as queue, priority, and unique are still available along with more advanced options.

To create a basic Pro worker point use at Oban.Pro.Worker and define a process/1 callback:

def MyApp.Worker do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(%Job{} = job) do
    # Do stuff with the job
  end
end

If you have existing workers that you'd like to convert you only need to change the use definition and replace perform/1 with process/1.

Without any of the advanced Pro features there isn't any difference between the basic and pro workers.

Structured Jobs

Structured workers help you catch invalid data within your jobs by validating args on insert and casting args before execution. They also automatically generate structs for compile-time checks and friendly dot access.

Defining a Structured Worker

Structured workers use args_schema/1 to define which fields are allowed, required, and their expected types. Another benefit, aside from validation, is that args passed to process/1 are converted into a struct named after the worker module. Here's an example that demonstrates defining a worker with several field types and embedded data structures:

defmodule MyApp.StructuredWorker do
  use Oban.Pro.Worker

  args_schema do
    field :id, :id, required: true
    field :name, :string, required: true
    field :mode, :enum, values: ~w(enabled disabled paused)a, default: :enabled
    field :xtra, :term

    embeds_one :data, required: true do
      field :office_id, :uuid, required: true
      field :has_notes, :boolean, default: false
      field :addons, {:array, :string}
    end

    embeds_many :addresses do
      field :street, :string
      field :city, :string
      field :country, :string
    end
  end

  @impl Oban.Pro.Worker
  def process(%Job{args: %__MODULE__{id: id, name: name, mode: mode, data: data}}) do
    %{office_id: office_id, notes: notes} = data

    # Use the matched, cast values
  end
end

The example's schema declares five top level keys, :id, :name, :mode, :data, and :addresses. Of those, only :id, :name, and the :office_id subkey are marked required. The :mode field is an enum that validates values and casts to an atom. The embedded :data field declares a nested map with its own type coercion and validation, including a custom Ecto.UUID type. Finally, :addresses specifies an embedded list of maps.

Job args are validated on new/1 and errors bubble up to prevent insertion:

StructuredWorker.new(%{id: "not-an-id", mode: "unknown"}).valid?
# => false (invalid id, invalid mode, missing name)

StructuredWorker.new(%{id: "123", mode: "enabled"}).valid?
# => false (missing name)

StructuredWorker.new(%{id: "123", name: "NewBiz", mode: "enabled"}).valid?
# => true

This shows how args, stored as JSON, are cast before passing to process/1:

# {"id":123,"name":"NewBiz","mode":"enabled","data":{"parent_id":456}}

%MyApp.StructuredWorker{
  id: 123,
  name: "NewBiz",
  mode: :enabled,
  data: %{parent_id:456}
}

Structured Types and Casting

Type casting and validation are handled by changesets. All types supported in Ecto schemas are allowed, e.g. :id, :integer, :string, :float, or :map. See the Ecto documentation for a complete list of Ecto types and their Elixir counterparts.

Structured Options

Structured fields support a few common options.

  • :default — Sets the default value for a field. The default value is calculated at compilation time, so don't use expressions like DateTime.utc_now/0 as they'd be the same for all records.

  • :required — Validates that a value is provided for the field. Values that are nil or an empty string aren't considered valid.

Structured Extensions

Structured workers support some convenient extensions beyond Ecto's standard type casting.

  • :enum — Maps atoms to strings based on a list of predefiend atoms passed like values: ~w(foo bar baz)a. Both validates that values are included in the list and casts them to an atom.

  • :term — Safely encodes any Elixir term as a string for storage, then decodes it back to the original term on load. This is similar to :any, but works with terms like tuples or pids that can't usually be serialied. For safety, terms are encoded with the :safe option to prevent decoding data that may be used to attack the runtime.

  • :uuid — an intention revealing alias for binary_id

  • {:array, *} — A list of one or more values of any type, including :enum and :uuid

  • embeds_one/2,3 — Declares a nested map with an explicit set of fields

  • embeds_many/2,3 — Delcares a list of nested maps with an explicit set of fields

Defining Typespecs for Structured Workers

Typespecs aren't generated automatically. If desired, you must to define a sctuctured worker's type manually:

defmodule MyApp.StructuredWorker do
  use Oban.Pro.Worker

  @type t :: %__MODULE__{id: integer(), active: boolean()}

Recorded Jobs

Sometimes the output of a job is just as important as any side effects. When that's the case, you can use the recorded option to stash a job's output back into the job itself. Results are compressed and safely encoded for retrieval later, either manually, in a batch callback, or a in downstream workflow job.

Defining a Recorded Worker

defmodule MyApp.RecordedWorker do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: args}) do
    # Do your typical work here.
  end
end

If your process function returns an {:ok, value} tuple, it is recorded. Any other value, i.e. an plain :ok, error, or snooze, is ignored.

The example above uses recorded: true to opt into recording with the defaults. That means an output limit of 32kb after compression and encoding—anything larger than the configured limit will return an error tuple. If you expect larger results (and you want them stored in the database) you can override the limit. For example, to set the limit to 64kb instead:

use Oban.Pro.Worker, recorded: [limit: 64_000]

Retrieving Results

The fetch_recorded/1 function is your ticket to extracting recorded results. If a job has ran and recorded a value, it will return an {:ok, result} tuple:

job = MyApp.Repo.get(Oban.Job, job_id)

case MyApp.RecordedWorker.fetch_recorded(job) do
  {:ok, result} ->
    # Use the result

  {:error, :missing} ->
    # Nothing recorded yet
end

Chained Jobs

Chains link jobs together to ensure they run in a strict sequential order. Downstream jobs in the chain won't execute until the upstream job is completed, cancelled, or discarded. Behaviour in the event of cancellation or discards is customizable to allow for uninterrupted processing or holding for outside intervention

Jobs in a chain only run after the previous job completes successfully, regardless of scheduling, snoozing, or retries.

Defining a Chain

Chains are appropriate in situations where jobs are used to synchronize internal state with outside state via events. For example, imagine a system that relies on webhooks from a payment processor to track account balance:

defmodule MyApp.WebhookWorker do
  use Oban.Pro.Worker, queue: :webhooks, chain: [by: :worker]

  @impl true
  def process(%Job{args: %{"account_id" => account_id, "event" => event}}) do
    account_id
    |> MyApp.Account.find()
    |> MyApp.Account.handle_webhook_event(event)
  end
end

Now imagine that it's essential that jobs for an account are processed in order, while jobs from separate accounts can run concurrently. Modify the :by option to partition by worker and :account_id:

defmodule MyApp.WebhookWorker do
  use Oban.Pro.Worker, queue: :webhooks, chain: [by: [args: :account_id]]

  ...

Webhooks for each account are guaranteed to run in order regardless of queue concurrency or errors. The following section shows more partitioning examples.

Chain Partitioning

By default, chains are sequenced by worker, which means any job with the same worker forms a chain. This approach may not always be suitable. For instance, you may want to link workers based on a field like :account_id instead of just the worker. In such cases, you can use the :by option to customize how chains are partitioned.

Here are a few examples of using :by to achieve fine-grained control over chain partitioning:

# Chain by :worker
use Oban.Pro.Worker, chain: [by: :worker]

# Chain by a single args key without considering the worker
use Oban.Pro.Worker, chain: [by: [args: :account_id]]

# Chain by multiple args keys without considering the worker
use Oban.Pro.Worker, chain: [by: [args: [:account_id, :cohort]]]

# Chain by worker and a single args key
use Oban.Pro.Worker, chain: [by: [:worker, args: :account_id]]

# Chain by worker and multiple args key
use Oban.Pro.Worker, chain: [by: [:worker, args: [:account_id, :cohort]]]

# Chain by a single meta key
use Oban.Pro.Worker, chain: [by: [meta: :source_id]]

Handling Cancelled/Discarded

The way a chain behaves when jobs are cancelled or discarded is customizable with the :on_discarded and :on_cancelled options.

There are three strategies for handling upstream discards and cancellations:

  • :ignore — keep processing jobs in the chain as if upstream cancelled or discarded jobs completed successfully. This is the default behaviour.

  • :hold — stop processing any jobs in the chain until the cancelled or discarded job is completed or eventually deleted.

Here's an example of a chain that holds on discarded or cancelled:

use Oban.Pro.Worker, chain: [on_discarded: :hold, on_cancelled: :hold]

Chains and Workflows

Using chained jobs from a workflow isn't allowed. Chains and workflows share an "on hold" scheduling mechanism which prevents predictable ordering.

Encrypted Jobs

Some applications have strong regulations around the storage of personal information. For example, medical records, financial details, social security numbers, or other data that should never leak. The encrypted option lets you store all job data at rest with encryption so sensitive data can't be seen.

Defining an Encrypted Worker

Encryption is handled transparently as jobs are inserted and executed. All you need to do is flag the worker as encrypted and configure it to fetch a secret key:

defmodule MyApp.SensitiveWorker do
  use Oban.Pro.Worker, encrypted: [key: {module, fun, args}]

  @impl true
  def process(%Job{args: args}) do
    # Args are decrypted, use them as you normally would
  end
end

Now job args are encrypted before insertion into the database and decrypted when the job runs.

Generating Encryption Keys

Encryption requires a 32 byte, Base 64 encoded key. You can generate one with the :crypto and Base modules:

key = 32 |> :crypto.strong_rand_bytes() |> Base.encode64()

The result will look something like this "w7xGJClzEh1pbWuq6zsZfKfwdINu2VIkgCe3IO0hpsA=".

While it's possible to use the generated key in your worker directly, that defeats the purpose of encrypting sensitive data because anybody with access to the codebase can read the encryption key. That's why it is highly recommended that you use an MFA to retrieve the key dynamically at runtime. For example, here's how you could pull the key from the Application environment:

use Oban.Pro.Worker, encrypted: [key: {Application, :fetch_key!, [:enc_key]}]

Encryption Implementation Details

  • Erlang's crypto module is used with the aes_256_ctr cipher for encryption.

  • Encoding and decoding stacktraces are pruned to prevent leaking the private key or initialization vector.

  • Only args are encrypted, meta is kept as plaintext. You can use that to your advantage for uniqueness, but be careful not to put anything sensitive in meta.

  • Error messages and stacktraces aren't encrypted and are stored as plaintext. Be careful not to expose sensitive data when raising errors.

  • Args are encrypted at rest as well as in Oban Web. You won't be able to view or search encrypted args in the Web dashboard.

  • Uniqueness works for encrypted jobs, but not for arguments because the same args are encrypted differently every time. Favor meta over args to enforce uniqueness for encrypted jobs.

Job Deadlines

Jobs that shouldn't run after some period of time can be marked with a deadline. After the deadline has passed the job will be pre-emptively cancelled on its next run, or optionally during its next run if desired.

defmodule DeadlinedWorker do
  use Oban.Pro.Worker, deadline: {1, :hour}

  @impl true
  def process(%Job{args: args}) do
    # If this doesn't run within an hour, it's cancelled
  end
end

Deadlines may be set at runtime as well, provided the worker was already configured with a deadline option.

# Specify the deadline in seconds
DeadlinedWorker.new(args, deadline: 60)

# Specify the deadline in minutes, using the tuple syntax
DeadlinedWorker.new(args, deadline: {30, :minutes})

# Specify the deadline in days
DeadlinedWorker.new(args, deadline: {1, :day})

In either case, the deadline is always relative and computed at runtime. That also allows the deadline to consider scheduling—a job scheduled to run 1 hour from now with a 1 hour deadline will expire 2 hours in the future. Note that deadlines only account for scheduling on insert, not on retry or after a snooze.

Applying Deadlines to Running Jobs

A job that has already started may be cancelled at runtime if it won't complete before the deadline. Consider a slower job that takes 5 minutes to execute and has a deadline 10 minutes in the future. If the job starts 9 minutes from now it wouldn't finish until 4 minutes past the deadline.

By setting the force flag, the job will cancel itself if it exceeds the deadline:

defmodule FirmDeadlinedWorker do
  use Oban.Pro.Worker, deadline: [in: {10, :minutes}, force: true]

  ...

Worker Aliases

Worker aliases solve a perennial production issue—how to rename workers without breaking existing jobs. Aliasing allows jobs enqueued with the original worker name to continue executing without exceptions using the new worker code.

As an example, imagine that a UserPurge worker must expand to purge more than user data. The UserPurge name is no longer accurate, and you want to rename it to DataPurge. To rename the worker and add an alias for the original name:

-defmodule MyApp.UserPurge do
+defmodule MyApp.DataPurge do
-  use Oban.Pro.Worker
+  use Oban.Pro.Worker, aliases: [MyApp.UserPurge]

   @impl Oban.Pro.Worker
   def process(job) do
     # purge data, not just users
   end
 end

Now any existing MyApp.UserPurge jobs can safely run, and they'll delegate to the new DataPurge code.

Workers may also have multiple aliases to account for multiple renames or merging workers:

use Oban.Pro.Worker, aliases: [MyApp.WorkerA, MyApp.WorkerB]

Worker Hooks

Worker hooks are called before and after a job finishes executing. They can be defined as callback functions on the worker, or in a separate module for reuse across workers.

Hooks are called synchronously, from within the job's process.

Defining Hooks

There are three mechanisms for defining and attaching before_new/2, before_process/1, and after_process/3 hooks:

  1. Implicitly—hooks are defined directly on the worker and they only run for that worker
  2. Explicitly—hooks are listed when defining a worker and they run anywhere they are listed
  3. Globally—hooks are executed for all Pro workers

It's possible to combine each type of hook on a single worker. When multiple hooks are stacked they're executed in the order: implicit, explicit, and then global.

After Process

After hooks do not modify the job or execution results. Think of them as a convenient alternative to globally attached telemetry handlers. They are purely for side-effects such as cleanup, logging, recording metrics, broadcasting notifications, updating other records, error notifications, etc.

Any exceptions or crashes are caught and logged, they won't cause the job to fail or the queue to crash.

An after_process/3 hook is called with the job and an execution state corresponding to the result from process/1:

  • complete—when process/1 returns :ok or {:ok, result}
  • cancel—when process/1 returns {:cancel, reason}
  • discard—when a job errors and exhausts retries, or returns {:discard, reason}
  • error—when a job crashes, raises an exception, or returns {:error, value}
  • snooze—when a job returns {:snooze, seconds}

First, here's how to define a single implicit local hook on the worker using after_process/3:

defmodule MyApp.HookWorker do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(_job) do
    # ...
  end

  @impl Oban.Pro.Worker
  def after_process(state, %Job{} = job, _result) do
    MyApp.Notifier.broadcast("oban-jobs", {state, %{id: job.id}})
  end
end

Any module that exports after_process/3 can be used as a hook. For example, here we'll define a shared error notification hook:

defmodule MyApp.ErrorHook do
  def after_process(state, job, _result) when state in [:discard, :error] do
    error = job.unsaved_error
    extra = Map.take(job, [:attempt, :id, :args, :max_attempts, :meta, :queue, :worker])
    tags = %{oban_worker: job.worker, oban_queue: job.queue, oban_state: job.state}

    Sentry.capture_exception(error.reason, stacktrace: error.stacktrace, tags: tags, extra: extra)
  end

  def after_process(_state, _job), do: :ok
end

defmodule MyApp.HookWorker do
  use Oban.Pro.Worker, hooks: [MyApp.ErrorHook]

  @impl Oban.Pro.Worker
  def process(_job) do
    # ...
  end
end

The same module can be attached globally, for all Oban.Pro.Worker modules, using attach_hook/1:

:ok = Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)

Attaching hooks in your application's start/2 function is an easy way to ensure hooks are registered before your application starts processing jobs.

def start(_type, _args) do
  :ok = Oban.Pro.Worker.attach_hook(MyApp.ErrorHook)

  children = [
    ...

Before New

The before_new/2 hook can augment or override the args and opts used to construct new jobs. It's akin to overriding a worker's new/2 callback, but the logic can be shared between workers or operate on a global scale.

Returning {:ok, args, opts} will continue building a job changeset using the provided args and opts, while an {:error, reason} tuple will mark the changeset as invalid with a custom error.

The hook is most useful for augmenting jobs from a central point or injecting custom meta, i.e. to add span ids for tracing. Here we define a global hook that injects a span_id into the meta:

defmodule MyApp.SpanHook do
  def before_new(args, opts) do
    opts =
      opts
      |> Keyword.put_new(:meta, %{})
      |> put_in([:meta, :span_id], MyApp.Telemetry.gen_span_id())

    {:ok, args, opts}
  end
end

Before Process

Before process hooks may modify the job, or prevent calling process/1 entirely depending on the return value. Returning an {:ok, job} tuple will continue processing the job, while :error or :cancel tuples will short circuit and record a standard error or cancel the job, respectively.

The before_process/1 callback is executed within the job's process, after other internal processing such as encryption and args structuring are applied.

It's most useful in situations where the worker needs to perform generic logic such as adding logger metadata, setting up a dynamic repo, etc. For example, let's define a global hook to set user_id in the logger metadata for every job that contains that field:

defmodule MyApp.UserMetadataHook do
  def before_process(job) do
    with %{"user_id" => user_id} <- job.args do
      Logger.metadata(user_id: user_id)
    end

    {:ok, job}
  end
end

Then attach the hook globally, as demonstrated earlier:

:ok = Oban.Pro.Worker.attach_hook(MyApp.UserMetadataHook)

Summary

Types

Options to enable and configure alias mode.

Options to enable and configure chain mode.

Configuration that controls how chains are linked together.

Options to enable and configure deadline mode.

Options to enable and configure encrypted mode.

All possible states for the after_process/3 hook.

Options to enable and configure recorded mode.

Callbacks

Called after a job finishes processing regardless of status (complete, failure, etc). The result is available for any job that returns a value, not just recorded jobs.

Called before new/2 when constructing a job changeset.

Called before process/1 when executing a job.

Extract the results of a previously executed job.

Called when executing a job.

Functions

Define an args schema struct with field definitions and optional embedded structs.

Register a worker hook to be ran after any Pro worker executes.

Unregister a worker hook.

Types

@type aliases() :: [module()]

Options to enable and configure alias mode.

@type args() :: Oban.Job.args()
@type chain() :: [
  by: chain_by(),
  on_cancelled: :ignore | :hold,
  on_discarded: :ignore | :hold
]

Options to enable and configure chain mode.

@type chain_by() ::
  :worker
  | {:args, atom() | [atom()]}
  | {:meta, atom() | [atom()]}
  | [:worker | {:args, atom() | [atom()]} | {:meta, atom() | [atom()]}]

Configuration that controls how chains are linked together.

@type deadline() :: [in: timeout(), force: boolean()]

Options to enable and configure deadline mode.

@type encrypted() :: [{:key, mfa()}]

Options to enable and configure encrypted mode.

Link to this type

hook_state()

@type hook_state() :: :cancel | :complete | :discard | :error | :snooze

All possible states for the after_process/3 hook.

@type opts() :: [Oban.Job.option()]
@type recorded() :: true | [to: atom(), limit: pos_integer(), safe_decode: boolean()]

Options to enable and configure recorded mode.

Callbacks

Link to this callback

after_process(hook_state, job)

(optional)
This callback is deprecated. Use after_process/3 instead.
@callback after_process(hook_state(), job :: Oban.Job.t()) :: :ok
Link to this callback

after_process(hook_state, job, result)

(optional)
@callback after_process(
  hook_state(),
  job :: Oban.Job.t(),
  result :: nil | Oban.Worker.result()
) :: :ok

Called after a job finishes processing regardless of status (complete, failure, etc). The result is available for any job that returns a value, not just recorded jobs.

Since crashes and exceptions don't return anything, result will always by nil.

See the shared "Worker Hooks" section for more details.

Link to this callback

before_new(args, opts)

(optional)
@callback before_new(args :: Oban.Job.args(), opts :: Keyword.t()) ::
  {:ok, Oban.Job.args(), Keyword.t()} | {:error, any()}

Called before new/2 when constructing a job changeset.

The args and opts returned in a {:ok, args, opts} tuple will be used to construct the job changeset. Returning an {:error, reason} tuple will add the reason as a custom error for the changeset and make it invalid.

This callback is intended for global or explicit hooks. For workers that override the new/2 callback, the before_new/2 hook will only be called if super(args, opts) is used and only after new/2, not before.

Link to this callback

before_process(job)

(optional)
@callback before_process(job :: Oban.Job.t()) ::
  {:ok, Oban.Job.t()} | {:error, reason :: term()} | {:cancel, reason :: term()}

Called before process/1 when executing a job.

The callback is executed within the job's process, after other internal processing such as encryption and args structuring are applied.

Returning an {:ok, job} tuple will continue processing the job, while :error or :cancel tuples will short circuit and record a standard error or cancel the job, respectively.

Link to this callback

fetch_recorded(job)

(optional)
@callback fetch_recorded(job :: Oban.Job.t()) :: {:ok, term()} | {:error, :missing}

Extract the results of a previously executed job.

If a job has ran and recorded a value, it will return an {:ok, result} tuple. Otherwise, you'll get {:error, :missing}.

@callback process(job :: Oban.Job.t() | [Oban.Job.t()]) :: Oban.Worker.result()

Called when executing a job.

The process/1 callback behaves identically to Oban.Worker.perform/1, except that it may have pre-processing and post-processing applied.

Functions

Link to this macro

args_schema(list)

(since 0.14.0) (macro)

Define an args schema struct with field definitions and optional embedded structs.

The schema is used to validate args before insertion or execution. See the Structured Workers section for more details.

Example

Define an args schema for a worker:

defmodule MyApp.Worker do
  use Oban.Pro.Worker

  args_schema do
    field :id, :id, required: true
    field :name, :string, required: true
    field :mode, :enum, values: ~w(on off paused)a
    field :safe, :boolean, default: false

    embeds_one :address, required: true do
      field :street, :string
      field :number, :integer
      field :city, :string
    end
  end

  ...
Link to this function

attach_hook(module)

(since 0.12.0)
@spec attach_hook(module()) :: :ok | {:error, term()}

Register a worker hook to be ran after any Pro worker executes.

The module must define a function that matches the hook. For example, a module that handles an :on_complete hook must define an on_complete/1 function.

Example

Attach a hook handler globally:

defmodule MyApp.Hook do
  def after_process(_state, %Oban.Job{} = job) do
    # Do something with the job

    :ok
  end
end

:ok = Oban.Pro.Worker.attach_hook(MyApp.Hook)
Link to this function

detach_hook(module)

(since 0.12.0)
@spec detach_hook(module()) :: :ok

Unregister a worker hook.

Example

Detach a previously registered global hook:

:ok = Oban.Pro.Worker.detach_hook(MyApp.Hook)