Oban.Pro.Workers.Chain (Oban Pro v1.4.3)
Chain workers link jobs together to ensure they run in a strict sequential order. Downstream
jobs won't execute until the upstream job is completed, cancelled, or discarded. Behaviour
in the event of cancellation or discards is customizable to allow for uninterrupted processing,
holding for outside intervention, or cascading cancellation.
Jobs in a chain only run after the previous job completes successfully, regardless of snoozing or retries.
Usage
Chains are appropriate in situations where jobs are used to synchronize internal state with outside state via events. For example, imagine a system that relies on webhooks from a payment processor to track account balance.
defmodule MyApp.WebhookWorker do
use Oban.Pro.Workers.Chain, queue: :webhooks, by: :worker
@impl true
def process(%Job{args: %{"account_id" => account_id, "event" => event}}) do
account_id
|> MyApp.Account.find()
|> MyApp.Account.handle_webhook_event(event)
end
endIt's essential that jobs for an account are processed in order, while jobs from separate
accounts can run concurrently. Modify the :by option to partition by worker and :account_id:
defmodule MyApp.WebhookWorker do
use Oban.Pro.Workers.Chain, queue: :webhooks, by: [:worker, args: :account_id]
...Now webhooks for each account are guaranteed to run in order regardless of queue concurrency or errors. See chain partitioning for more partitioning examples.
Chain Partitioning
By default, chains are sequenced by worker, which means any job with the same worker forms a
chain. This approach may not always be suitable. For instance, you may want to link workers
based on a field like :account_id instead of just the worker. In such cases, you can use the
:by option to customize how chains are partitioned.
Here are a few examples of using :by to achieve fine-grained control over chain partitioning:
# Explicitly chain by :worker
use Oban.Pro.Workers.Chain, by: :worker
# Chain by a single args key without considering the worker
use Oban.Pro.Workers.Chain, by: [args: :account_id]
# Chain by multiple args keys without considering the worker
use Oban.Pro.Workers.Chain, by: [args: [:account_id, :cohort]]
# Chain by worker and a single args key
use Oban.Pro.Workers.Chain, by: [:worker, args: :account_id]
# Chain by worker and multiple args key
use Oban.Pro.Workers.Chain, by: [:worker, args: [:account_id, :cohort]]
# Chain by a single meta key
use Oban.Pro.Workers.Chain, by: [meta: :source_id]Handling Cancelled/Discarded
The way a chain behaves when jobs are cancelled or discarded is customizable with the
:on_discarded and :on_cancelled options.
There are three strategies for handling upstream discards and cancellations:
:ignore— keep processing jobs in the chain as if upstreamcancelledordiscardedjobs completed successfully. This is the default behaviour.:hold— stop processing any jobs in the chain until thecancelledordiscardedjob iscompletedor eventually deleted.:halt— cancel jobs in the chain when an upstream job iscancelledordiscarded. Cancelling cascades until jobs are retried via manual intervention, e.g. with the Web dashboard orOban.retry_all_jobs/1.Because halting cancels a job, you must use
on_discarded: :haltalong withon_cancelled: :haltto fully stop a chain.
Here's an example of a chain that halts on discarded and holds on cancelled:
use Oban.Pro.Workers.Chain, on_discarded: :halt, on_cancelled: :holdHolding a job snoozes it briefly while it awaits manual intervention. The default snooze period
is 60s, and you can customize it through the :hold_snooze option. Here we're bumping snooze up
to 600s (10 minutes):
use Oban.Pro.Workers.Chain, on_cancelled: :hold, hold_snooze: 600Customizing Chains
Chains use conservative defaults for safe, and relatively quick, dependency resolution. You can customize waiting times and retry intensity by providing a few top-level options:
:wait_sleep— the number of milliseconds to wait between checks. This value, combined with:wait_retry, dictates how long a job waits before snoozing. Defaults to1000ms.:wait_retry— the number of times to retry a check between sleeping. This value, combined with:wait_sleep, dictates how long a job waits before snoozing. Defaults to10.:wait_snooze— the number of seconds a job will snooze after awaitingexecutingupstream dependencies. If upstream dependencies arescheduledorretryablethen the job snoozes until the latest dependency'sscheduled_attimestamp. Defaults to5.
Summary
Types
chain_by()
@type chain_by() :: :worker | {:args, key_or_keys()} | {:meta, key_or_keys()} | [:worker | {:args, key_or_keys()} | {:meta, key_or_keys()}]
key_or_keys()
on_action()
@type on_action() :: :halt | :hold | :ignore
options()
@type options() :: [ by: chain_by(), hold_snooze: pos_integer(), on_cancelled: on_action(), on_discarded: on_action(), wait_retry: pos_integer(), wait_sleep: timeout(), wait_snooze: pos_integer() ]