Oban.Pro.Relay (Oban Pro v1.5.0-rc.9)

The Relay extension lets you insert and await the results of jobs locally or remotely, across any number of nodes, so you can seamlessly distribute jobs and await the results synchronously.

Think of Relay as persistent, distributed tasks.

Relay uses PubSub for to transmit results. That means it will work without Erlang distribution or clustering, but it does require Oban to have functional PubSub. It doesn't matter which node executes a job, the result will still be broadcast back.

Results are encoded using term_to_binary/2 and decoded using binary_to_term/2 using the :safe option to prevent the creation of new atoms or function references. If you're returning results with atoms you must be sure that those atoms are defined locally, where the await/2 or await_many/2 function is called.

Usage

Use async/1 to insert a job for asynchronous execution:

relay =
  %{id: 123}
  |> MyApp.Worker.new()
  |> Oban.Pro.Relay.async()

After inserting a job and constructing a relay, use await/1 to await the job's execution and return the result:

{:ok, result} =
  %{id: 123}
  |> MyApp.Worker.new()
  |> Oban.Pro.Relay.async()
  |> Oban.Pro.Relay.await()

By default, await/1 will timeout after 5 seconds and return an {:error, :timeout} tuple. The job itself may continue to run, only the local process stops waiting on it. Pass an explicit timeout to wait longer:

{:ok, result} = Oban.Pro.Relay.await(relay, :timer.seconds(30))

Any successful result such as :ok, {:ok, value}, or {:cancel, reason} is passed back as the await result. When the executed job returns an {:error, reason} tuple, raises an exception, or crashes, the result comes through as an error tuple.

Use await_many/1 when you need the results of multiple async relays:

relayed =
  1..3
  |> Enum.map(&DoubleWorker.new(%{int: &1}))
  |> Enum.map(&Oban.Pro.Relay.async/1)
  |> Oban.Pro.Relay.await_many()

#=> [{:ok, 2}, {:ok, 4}, {:ok, 6}]

Testing with Relay

Calls to Relay.await/2 will block until the job runs. That will cause tests to hang when testing in :manual mode until jobs are drained. Unit tests that wrap Relay processing can switch to :inline mode so that jobs run immediately.

For example, assuming Oban is configured to run in :manual testing mode:

defmodule MyWorker do
  use Oban.Pro.Worker

  alias Oban.Pro.Relay

  @impl Oban.Pro.Worker
  def process(%{args: %{"sub" => sub}}) do
    results =
      sub
      |> Enum.map(&MyOtherWorker.new(%{id: &1})
      |> Enum.map(&Relay.async/1)
      |> Relay.await_many()

    {:ok, results}
  end
end

test "running multiple sub workers from perform_job/2" do
  Oban.Testing.with_testing_mode(:inline, fn ->
    assert {:ok, _} = perform_job(MyWorker, %{subs: [1, 2, 3]})
  end)
end

Usage with Chunks

Relay is intended for use with a single job and isn't suited to awaiting results from Oban.Pro.Workers.Chunk jobs. That's because only one of the chunk's jobs (the leader) will relay results back. Awaiting any other job in the chunk will time out without returning a proper result.

Summary

Functions

Insert a job for asynchronous execution.

Await a relay's execution and return the result.

Await replies from multiple relays and return the results.

Types

Link to this type

await_result()

@type await_result() ::
  :ok
  | {:ok, term()}
  | {:cancel, term()}
  | {:discard, term()}
  | {:error, :result_too_large | :timeout | Exception.t()}
  | {:snooze, integer()}
@type t() :: %Oban.Pro.Relay{
  job: Oban.Job.t(),
  name: term(),
  pid: pid(),
  ref: Ecto.UUID.t()
}

Functions

Link to this function

async(name \\ Oban, changeset)

@spec async(Oban.name(), Oban.Job.changeset()) :: t() | {:error, Oban.Job.changeset()}

Insert a job for asynchronous execution.

The returned map contains the caller's pid and a unique ref that is used to await the results.

Examples

The single arity version takes a job changeset and inserts it:

relay =
  %{id: 123}
  |> MyApp.Worker.new()
  |> Oban.Pro.Relay.async()

When the Oban instance has a custom name, or an app has multiple Oban instances, you can use the two arity version to select an instance:

changeset = MyApp.Worker.new(%{id: 123})
Oban.Pro.Relay.async(MyOban, changeset)
Link to this function

await(map, timeout \\ 5000)

@spec await(t(), timeout()) :: await_result()

Await a relay's execution and return the result.

Any successful result such as :ok, {:ok, value}, or {:cancel, reason} is passed back as the await result. When the executed job returns an {:error, reason} tuple, raises an exception, or crashes, the result comes back as an error tuple with the exception.

By default, await/1 will timeout after 5 seconds and return {:error, :timeout}. The job itself may continue to run, only the local process stops waiting on it.

Result Size Limits

When using the default Oban.Notifiers.Postgres notifier for PubSub, any value larger than 8kb (compressed) can't be broadcast due to a Postgres NOTIFY limitation. Instead, awaiting will return an {:error, :result_too_large} tuple. The Oban.Notifiers.PG notifier doesn't have any such size limitation, but it requires Distributed Erlang.

Examples

Await a job:

{:ok, result} =
  %{id: 123}
  |> MyApp.Worker.new()
  |> Oban.Pro.Relay.async()
  |> Oban.Pro.Relay.await()

Increase the wait time with a timeout value:

%{id: 456}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
|> Oban.Pro.Relay.await(:timer.seconds(30))
Link to this function

await_many(relays, timeout \\ 5000)

@spec await_many([t()], timeout()) :: [await_result()]

Await replies from multiple relays and return the results.

It returns a list of the results in the same order as the relays supplied as the first argument.

Unlike Task.await_many or Task.yield_many, await_many/2 may return partial results when the timeout is reached. When a job hasn't finished executing the value will be {:error, :timeout}

Examples

Await multiple jobs without any errors or timeouts:

relayed =
  1..3
  |> Enum.map(&DoubleWorker.new(%{int: &1}))
  |> Enum.map(&Oban.Pro.Relay.async(&1))
  |> Oban.Pro.Relay.await_many()

#=> [{:ok, 2}, {:ok, 4}, {:ok, 6}]

Await multiple jobs with an error timeout:

relayed =
  [1, 2, 300_000_000]
  |> Enum.map(&SlowWorker.new(%{int: &1}))
  |> Enum.map(&Oban.Pro.Relay.async(&1))
  |> Oban.Pro.Relay.await_many(100)

#=> [{:ok, 2}, {:ok, 4}, {:error, :timeout}]