Oban.Pro.Workers.Workflow behaviour (Oban Pro v1.4.8)

Workflow workers compose together with arbitrary dependencies between jobs, allowing sequential, fan-out, and fan-in execution workflows. Workflows are fault tolerant, can be homogeneous or heterogeneous, and scale horizontally across all available nodes.

Workflow jobs aren't executed until all upstream dependencies have completed. This includes waiting on retries, scheduled jobs, or snoozing.

Installation

Before running a Workflow in production, you should run a migration to add an optimized index for workflow queries. Without the optimization workflow queries may be very slow:

defmodule MyApp.Repo.Migrations.AddWorkflowIndex do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.Workflow
end

Also, be sure that you're running the DynamicLifeline to rescue stuck workflows when upstream dependencies are deleted unexpectedly.

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicLifeline],
  ...

Usage

Workflows are ideal for linking jobs together into a directed acyclic graph, a DAG. Dependency resolution guarantees that jobs execute in the prescribed order, even if one of the jobs fails and needs to retry. Any job that defines a dependency will wait for each upstream dependency to complete before it starts.

As a trivial example, we'll define an EchoWorker that only inspects that args, and then use it in a workflow to show how jobs execute in order. First, here's the worker:

defmodule MyApp.EchoWorker do
  use Oban.Pro.Workers.Workflow, queue: :default

  @impl true
  def process(%{args: args}) do
    IO.inspect(args)

    :ok
  end
end

Now, we'll use new/1 to initialize a workflow, and add/4 to add named jobs with dependencies to the workflow:

alias MyApp.EchoWorker
alias Oban.Pro.Workers.Workflow

Workflow.new()
|> Workflow.add(:a, EchoWorker.new(%{id: 1}))
|> Workflow.add(:b, EchoWorker.new(%{id: 2}), deps: [:a])
|> Workflow.add(:c, EchoWorker.new(%{id: 3}), deps: [:b])
|> Workflow.add(:d, EchoWorker.new(%{id: 4}), deps: [:b])
|> Workflow.add(:e, EchoWorker.new(%{id: 5}), deps: [:c, :d])
|> Oban.insert_all()

When the workflow executes, it will print out each job's args in the prescribed order. However, because steps c and d each depend on b, they may execute in parallel.

Visually, the workflow jobs composes like this:

A
B
C
D
E

Dynamic Workflows

Many workflows aren't static—the number of jobs and their interdependencies aren't known beforehand.

The following worker accepts a count and generates a workflow that fans-out and back in twice, using a variable number of dependencies. The key us using Enum.reduce to accumulate a workflow with interpolated names, i.e. "a_0", "a_1", etc.

defmodule MyApp.Dynamic do
  use Oban.Pro.Workers.Workflow

  @impl true
  def process(%{meta: %{"name" => name}}) do
    IO.puts(name)

    :ok
  end

  def insert_workflow(count) when is_integer(count) do
    range = Range.new(0, count)
    a_deps = Enum.map(range, &"a_#{&1}")
    b_deps = Enum.map(range, &"b_#{&1}")

    Workflow.new()
    |> Workflow.add(:a, new(%{}), [])
    |> fan_out(:a, range)
    |> Workflow.add(:b, new(%{}), deps: a_deps)
    |> fan_out(:b, range)
    |> Workflow.add(:c, new(%{}), deps: b_deps)
    |> Oban.insert_all()
  end

  defp fan_out(workflow, base, range) do
    Enum.reduce(range, workflow, fn key, acc ->
      Workflow.add(acc, "#{base}_#{key}", new(%{}), deps: [base])
    end)
  end
end

Calling MyApp.Dynamic.insert_workflow(3) generates a workflow that fans out to 3 a and 3 b jobs:

A
A1
A2
A3
B
B1
B2
B3
C

Using Upstream Results

Directed dependencies between jobs, paired with the recorded option, allow a workflow's downstream jobs to fetch the output of upstream jobs.

To demonstrate, let's make a workflow that combines all_jobs/2 and Oban.Pro.Worker.fetch_recorded/1 to simulate a multi-step API interaction.

The first worker simulates fetching an authentication token using an api_key:

defmodule MyApp.WorkerA do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%Job{args: %{"api_key" => api_key}}) do
    token =
      api_key
      |> String.graphemes()
      |> Enum.shuffle()
      |> to_string()

    {:ok, token}
  end
end

The second worker fetches the token from the first job by calling all_jobs/2 with the names option to restrict results to the job named "a", which we'll set while building the workflow later.

defmodule MyApp.WorkerB do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%Job{args: %{"url" => url}} = job) do
    [token_job] = Workflow.all_jobs(job, names: ["a"])

    {:ok, token} = fetch_recorded(token_job)

    {:ok, {token, url}}
  end
end

Then the final worker uses all_jobs/2 with the only_deps option to fetch the results from all upstream jobs, then it prints out everything that was fetched.

defmodule MyApp.WorkerC do
  use Oban.Pro.Workers.Workflow

  @impl true
  def process(job) do
    job
    |> Workflow.all_jobs(only_deps: true)
    |> Enum.map(&fetch_recorded/1)
    |> IO.inspect()

    :ok
  end
end

The final step is to build a workflow that composes all of the jobs together with names, args, and deps:

alias MyApp.{WorkerA, WorkerB, WorkerC}

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{api_key: "23kl239bjljlk309af"}))
|> Workflow.add(:b, WorkerB.new(%{url: "elixir-lang.org"}), deps: [:a])
|> Workflow.add(:c, WorkerB.new(%{url: "www.erlang.org"}), deps: [:a])
|> Workflow.add(:d, WorkerB.new(%{url: "getoban.pro"}), deps: [:a])
|> Workflow.add(:e, WorkerC.new(%{}), deps: [:b, :c, :d])
|> Oban.insert_all()
A
B
C
D
E

When the workflow runs the final step, e, prints out something like the following:

{"93l2jlj3kl90baf2k3", "elixir-lang.org"}
{"93l2jlj3kl90baf2k3", "www.erlang.org"}
{"93l2jlj3kl90baf2k3", "getoban.pro"}

Customizing Workflows

Workflows use conservative defaults for safe, and relatively quick, dependency resolution. You can customize the safety checks by providing a few top-level options:

  • ignore_cancelled — regard cancelled dependencies as completed rather than cancelling remaining jobs in the workflow. Defaults to false.

  • ignore_discarded — regard discarded dependencies as completed rather than cancelling remaining jobs in the workflow. Defaults to false.

  • ignore_deleted — regard deleted (typically pruned) dependencies as completed rather cancelling remaining jobs in workflow. Defaults to false.

  • workflow_name — an optional name to describe the purpose of the workflow, beyond the individual jobs in it.

The following example creates a workflow with all of the available options:

alias Oban.Pro.Workers.Workflow

workflow = Workflow.new(
  ignore_cancelled: true,
  ignore_deleted: true,
  ignore_discarded: true,
  workflow_name: "special_purpose"
)

Options may also be applied to individual workflow jobs For example, configure a single job to ignore cancelled dependencies, another to ignore discarded, and another to ignore deleted:

Workflow.new()
|> Workflow.add(:a, MyWorkflow.new(%{}))
|> Workflow.add(:b, MyWorkflow.new(%{}, deps: [:a], ignore_cancelled: true))
|> Workflow.add(:c, MyWorkflow.new(%{}, deps: [:b], ignore_discarded: true))
|> Workflow.add(:d, MyWorkflow.new(%{}, deps: [:c], ignore_deleted: true))

Dependency resolution relies on jobs lingering in the database after execution. If your system prunes job dependencies then the workflow may never complete. Set ignore_deleted: true on your workflows to override this behaviour.

Handling Cancellations

Workflow jobs are automatically cancelled when their upstream dependencies are cancelled, discarded, or deleted (unless specifically overridden using the ignore_* options as described earlier). Those workflow jobs are cancelled before they're executing, which means standard Oban.Pro.Worker.after_process/3 hooks won't be called. Instead, there's an optional after_cancelled/2 callback specifically for workflows.

Here's a trivial after_cancelled hook that logs a warning when a workflow job is cancelled:

def MyApp.Workflow do
  use Oban.Pro.Workers.Workflow

  require Logger

  @impl true
  def after_cancelled(reason, job) do
    Logger.warn("Workflow job #{job.id} cancelled because a dependency was #{reason}")
  end

Appending Workflow Jobs

Sometimes all jobs aren't known when the workflow is created. In that case, you can add more jobs with optional dependency checking using append/2. An appended workflow starts with one or more jobs, which reuses the original workflow id, and optionally builds a set of dependencies for checking.

In this example we disable deps checking with check_deps: false:

defmodule MyApp.WorkflowWorker do
  use Oban.Pro.Workers.Workflow

  @impl true
  def process(%Job{} = job) do
    jobs =
      job
      |> Workflow.append(check_deps: false)
      |> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
      |> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
      |> Workflow.add(:f, WorkerF.new(%{}), deps: [:c])
      |> Oban.insert_all()

    {:ok, jobs}
  end
end

The new jobs specify deps on preexisting jobs named :a, :b, and :c, but there isn't any guarantee those jobs actually exist. That could lead to an incomplete workflow where the new jobs may never complete.

To be safe and check jobs while appending we'll fetch all of the previous jobs and feed them in:

defmodule MyApp.WorkflowWorker do
  use Oban.Pro.Workers.Workflow

  @impl true
  def process(%Job{} = job) do
    {:ok, jobs} =
      MyApp.Repo.transaction(fn ->
        job
        |> Workflow.stream_jobs()
        |> Enum.to_list()
      end)

    jobs
    |> Workflow.append()
    |> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
    |> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
    |> Workflow.add(:f, WorkerF.new(%{}), deps: [:c])
    |> Oban.insert_all()

    :ok
  end
end

Now there isn't any risk of an incomplete workflow from missing dependencies, at the expense of loading some extraneous jobs.

Fetching Workflow Jobs

Workflow jobs are tied together through meta attributes. The all_jobs/2 function uses those attributes to load other jobs in a workflow. This is particularly useful from a worker's process/1 function. For example, to fetch all of the jobs in a workflow:

defmodule MyApp.Workflow do
  use Oban.Pro.Workers.Workflow

  @impl Workflow
  def process(%Job{} = job) do
    job
    |> Workflow.all_jobs()
    |> do_things_with_jobs()

    :ok
  end
end

It's also possible to scope fetching to only dependencies of the current job with only_deps:

deps = Workflow.all_jobs(job, only_deps: true)

Or, only fetch a single explicit dependency by name with names:

[dep_job] = Workflow.all_jobs(job, names: [:a])

For large workflows it may be inefficient to load all jobs in memory at once. In that case, you can the stream_jobs/2 function to fetch jobs lazily. For example, to stream all of the completed jobs in a workflow:

defmodule MyApp.Workflow do
  use Oban.Pro.Workers.Workflow

  @impl Workflow
  def process(%Job{} = job) do
    {:ok, workflow_jobs} =
      MyApp.Repo.transaction(fn ->
        job
        |> Workflow.stream_jobs()
        |> Stream.filter(& &1.state == "completed")
        |> Enum.to_list()
      end)

    do_things_with_jobs(workflow_jobs)

    :ok
  end
end

Streaming is provided by Ecto's Repo.stream, and it must take place within a transaction. Using a stream lets you control the number of jobs loaded from the database, minimizing memory usage for large workflows.

Generating Workflow IDs

By default workflow_id is a time-ordered random UUIDv7. This is more than sufficient to ensure that workflows are unique for any period of time. However, if you require more control you can override workflow_id generation at the worker level, or pass a value directly to the new_workflow/1 function.

To override the workflow_id for a particular workflow you override the gen_id/0 callback:

defmodule MyApp.Workflow do
  use Oban.Pro.Workers.Workflow

  # Generate a 24 character long random string instead
  @impl true
  def gen_id do
    24
    |> :crypto.strong_rand_bytes()
    |> Base.encode64()
  end
  ...
end

The gen_id/0 callback works for random/non-deterministic id generation. If you'd prefer to use a deterministic id instead you can pass the workflow_id in as an option to new_workflow/1:

MyApp.Workflow.new_workflow(workflow_id: "custom-id")

Using this technique you can verify the workflow_id in tests or append to the workflow manually after it was originally created.

Visualizing Workflows

Workflows are a type of Directed Acyclic Graph, also known as a DAG. That means we can describe a workflow as a graph of jobs and dependencies, where execution flows between jobs. By converting the workflow into DOT notation, a standard graph description language, we can render visualizations!

Dot generation relies on libgraph, which is an optional dependency. You'll need to specify it as a dependency before generating dot output:

def deps do
  [{:libgraph, "~> 0.7"}]
end

Once you've installed libgraph, we can use to_dot/1 to convert a workflow. As with new_workflow and add, all workflow workers define a to_dot/1 function that takes a workflow and returns a dot formatted string. For example, calling to_dot/1 with the account archiving workflow from above:

FinalReceipt.to_dot(archive_account_workflow(123))

Generates the following dot output, where each vertex is a combination of the job's name in the workflow and its worker module:

strict digraph {
    "delete (MyApp.DeleteAccount)"
    "backup_1 (MyApp.BackupPost)"
    "backup_2 (MyApp.BackupPost)"
    "backup_3 (MyApp.BackupPost)"
    "receipt (MyApp.FinalReceipt)"
    "email_1 (MyApp.EmailSubscriber)"
    "email_2 (MyApp.EmailSubscriber)"
    "backup_1 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "backup_2 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "backup_3 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "backup_1 (MyApp.BackupPost)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "backup_2 (MyApp.BackupPost)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "backup_3 (MyApp.BackupPost)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "email_1 (MyApp.EmailSubscriber)" [weight=1]
    "receipt (MyApp.FinalReceipt)" -> "email_2 (MyApp.EmailSubscriber)" [weight=1]
    "email_1 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
    "email_2 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
}

Now we can take that dot output and render it using a tool like graphviz. The following example function accepts a workflow and renders it out as an SVG:

defmodule WorkflowRenderer do
  alias Oban.Pro.Workers.Workflow

  def render(workflow) do
    dot_path = "workflow.dot"
    svg_path = "workflow.svg"

    File.write!(dot_path, Workflow.to_dot(workflow))

    System.cmd("dot", ["-T", "svg", "-o", svg_path, dot_path])
  end
end

With graphviz installed, that will generate a SVG of the workflow:

delete (MyApp.DeleteAccount)backup_1 (MyApp.BackupPost)backup_2 (MyApp.BackupPost)backup_3 (MyApp.BackupPost)receipt (MyApp.FinalReceipt)email_1 (MyApp.EmailSubscriber)email_2 (MyApp.EmailSubscriber)

Looking at the visualized graph we can clearly see how the workflow starts with a single render job, fans-out to multiple email and backup jobs, and finally fans-in to the delete job—exactly as we planned!

Summary

Callbacks

Called after a workflow job is cancelled due to upstream jobs being cancelled, deleted, or discarded.

Generate a unique string to identify the workflow.

Instantiate a new workflow struct with a unique workflow id.

Delegates to to_dot/1.

Functions

Add a named job to the workflow along with optional dependencies.

Get all jobs for a workflow, optionally filtered by upstream deps.

Instantiate a new workflow from an existing workflow job or jobs.

Generates a UUIDv7 based workflow id.

Instantiate a new workflow struct with a unique workflow id.

Stream all jobs for a workflow.

Converts the given workflow to DOT format, which can then be converted to a number of other formats via Graphviz, e.g. dot -Tpng out.dot > out.png.

Types

Link to this type

add_option()

@type add_option() :: {:deps, [name()]}
Link to this type

append_option()

@type append_option() :: new_option() | {:check_deps, boolean()}
Link to this type

cancel_reason()

@type cancel_reason() :: :deleted | :discarded | :cancelled
@type chan() :: Oban.Job.changeset()
Link to this type

fetch_option()

@type fetch_option() ::
  {:log, Logger.level()}
  | {:names, [name()]}
  | {:only_deps, boolean()}
  | {:timeout, timeout()}
@type name() :: atom() | String.t()
Link to this type

new_option()

@type new_option() ::
  {:ignore_cancelled, boolean()}
  | {:ignore_deleted, boolean()}
  | {:ignore_discarded, boolean()}
  | {:workflow_id, String.t()}
  | {:workflow_name, String.t()}
@type t() :: %Oban.Pro.Workers.Workflow{
  changesets: [chan()],
  check_deps: boolean(),
  id: String.t(),
  names: MapSet.t(),
  opts: map()
}

Callbacks

Link to this callback

add(flow, name, changeset, opts)

@callback add(flow :: t(), name :: name(), changeset :: chan(), opts :: [add_option()]) ::
  t()

Delegates to add/4.

Link to this callback

after_cancelled(cancel_reason, job)

(optional)
@callback after_cancelled(cancel_reason(), job :: Oban.Job.t()) :: :ok

Called after a workflow job is cancelled due to upstream jobs being cancelled, deleted, or discarded.

This callback is only called when a job is cancelled because of an upstream dependency. It is never called after normal job execution. For that, use Oban.Pro.Worker.after_process/3.

Link to this callback

all_workflow_jobs(job, list)

@callback all_workflow_jobs(job :: Oban.Job.t(), [fetch_option()]) :: [Oban.Job.t()]

Delegates to all_jobs/2.

Link to this callback

append_workflow(jobs, list)

@callback append_workflow(jobs :: Oban.Job.t() | [Oban.Job.t()], [append_option()]) :: t()

Delegates to append/2.

@callback gen_id() :: String.t()

Generate a unique string to identify the workflow.

Defaults to a 128bit UUIDv7.

Examples

Generate a workflow id using random bytes instead of a UUID:

@impl Workflow
def gen_id do
  24
  |> :crypto.strong_rand_bytes()
  |> Base.encode64()
end
Link to this callback

new_workflow(opts)

@callback new_workflow(opts :: [new_option()]) :: t()

Instantiate a new workflow struct with a unique workflow id.

Delegates to new/1 and uses the module's gen_id/0 to generate the workflow id.

Link to this callback

stream_workflow_jobs(job, list)

@callback stream_workflow_jobs(job :: Oban.Job.t(), [fetch_option()]) :: Enum.t()

Delegates to stream_jobs/2.

@callback to_dot(flow :: t()) :: String.t()

Delegates to to_dot/1.

Functions

Link to this function

add(workflow, name, changeset, opts \\ [])

@spec add(flow :: t(), name :: name(), changeset :: chan(), opts :: [add_option()]) ::
  t()

Add a named job to the workflow along with optional dependencies.

Examples

Add jobs to a workflow with dependencies:

Workflow.new()
|> Workflow.add(:a, MyApp.WorkerA.new(%{id: id}))
|> Workflow.add(:b, MyApp.WorkerB.new(%{id: id}), deps: [:a])
|> Workflow.add(:c, MyApp.WorkerC.new(%{id: id}), deps: [:a])
|> Workflow.add(:d, MyApp.WorkerC.new(%{id: id}), deps: [:b, :c])
Link to this function

all_jobs(job, opts \\ [])

@spec all_jobs(Oban.Job.t(), [fetch_option()]) :: [Oban.Job.t()]

Get all jobs for a workflow, optionally filtered by upstream deps.

Examples

Retrieve all workflow jobs:

@impl Workflow
def process(%Job{} = job) do
  job
  |> Workflow.all_jobs()
  |> do_things_with_jobs()

  :ok
end

Retrieve only the current job's deps:

workflow_jobs = Workflow.all_jobs(job, only_deps: true)

Retrieve an explicit list of dependencies:

[job_a, job_b] = Workflow.all_jobs(job, names: [:a, :b])
Link to this function

append(jobs, opts \\ [])

@spec append(jobs :: Oban.Job.t() | [Oban.Job.t()], [append_option()]) :: t()

Instantiate a new workflow from an existing workflow job or jobs.

Examples

Append to a workflow seeded with all other jobs in the workflow:

jobs
|> Workflow.append()
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()

Append to a workflow from a single job and bypass checking deps:

job
|> Workflow.append(check_deps: false)
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()
@spec gen_id() :: String.t()

Generates a UUIDv7 based workflow id.

Examples

iex> Workflow.gen_id()
"018e5d3b-1bb6-7f60-9c12-d6ed50cfff59"
Link to this function

new(opts \\ [])

@spec new(opts :: [new_option()]) :: t()

Instantiate a new workflow struct with a unique workflow id.

Examples

Create a standard workflow without any options:

Workflow.new()

Create a workflow with a custom name:

Workflow.new(workflow_name: "logistics")

Create a workflow with a static id and some options:

Workflow.new(workflow_id: "workflow-id", ignore_cancelled: true, ignore_discarded: true)
Link to this function

stream_jobs(job, opts)

@spec stream_jobs(Oban.Job.t(), [fetch_option()]) :: Enum.t()

Stream all jobs for a workflow.

Examples

Stream with filtering to only preserve completed jobs:

@impl true def process(%Job{} = job) do

{:ok, workflow_jobs} =
  MyApp.Repo.transaction(fn ->
    job
    |> Workflow.stream_jobs()
    |> Stream.filter(& &1.state == "completed")
    |> Enum.to_list()
  end)

do_things_with_jobs(workflow_jobs)

:ok

end

Link to this function

to_dot(workflow)

@spec to_dot(flow :: t()) :: String.t()

Converts the given workflow to DOT format, which can then be converted to a number of other formats via Graphviz, e.g. dot -Tpng out.dot > out.png.

The default implementation relies on libgraph.

Examples

Generate a DOT graph format from a workflow:

Workflow.to_dot(workflow)