Oban.Pro.Relay (Oban Pro v1.5.0-rc.8)
The Relay
extension lets you insert and await the results of jobs locally or remotely, across
any number of nodes, so you can seamlessly distribute jobs and await the results synchronously.
Think of Relay
as persistent, distributed tasks.
Relay
uses PubSub for to transmit results. That means it will work without Erlang distribution
or clustering, but it does require Oban to have functional PubSub. It doesn't matter which node
executes a job, the result will still be broadcast back.
Results are encoded using term_to_binary/2
and decoded using binary_to_term/2
using the
:safe
option to prevent the creation of new atoms or function references. If you're returning
results with atoms you must be sure that those atoms are defined locally, where the await/2
or await_many/2
function is called.
Usage
Use async/1
to insert a job for asynchronous execution:
relay =
%{id: 123}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
After inserting a job and constructing a relay, use await/1
to await the job's execution and
return the result:
{:ok, result} =
%{id: 123}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
|> Oban.Pro.Relay.await()
By default, await/1
will timeout after 5 seconds and return an {:error, :timeout}
tuple. The job
itself may continue to run, only the local process stops waiting on it. Pass an explicit timeout
to wait longer:
{:ok, result} = Oban.Pro.Relay.await(relay, :timer.seconds(30))
Any successful result such as :ok
, {:ok, value}
, or {:cancel, reason}
is passed back as
the await result. When the executed job returns an {:error, reason}
tuple, raises an
exception, or crashes, the result comes through as an error tuple.
Use await_many/1
when you need the results of multiple async relays:
relayed =
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Oban.Pro.Relay.async/1)
|> Oban.Pro.Relay.await_many()
#=> [{:ok, 2}, {:ok, 4}, {:ok, 6}]
Testing with Relay
Calls to Relay.await/2
will block until the job runs. That will cause tests to hang when
testing in :manual
mode until jobs are drained. Unit tests that wrap Relay
processing can
switch to :inline
mode so that jobs run immediately.
For example, assuming Oban
is configured to run in :manual
testing mode:
defmodule MyWorker do
use Oban.Pro.Worker
alias Oban.Pro.Relay
@impl Oban.Pro.Worker
def process(%{args: %{"sub" => sub}}) do
results =
sub
|> Enum.map(&MyOtherWorker.new(%{id: &1})
|> Enum.map(&Relay.async/1)
|> Relay.await_many()
{:ok, results}
end
end
test "running multiple sub workers from perform_job/2" do
Oban.Testing.with_testing_mode(:inline, fn ->
assert {:ok, _} = perform_job(MyWorker, %{subs: [1, 2, 3]})
end)
end
Usage with Chunks
Relay
is intended for use with a single job and isn't suited to awaiting results from
Oban.Pro.Workers.Chunk
jobs. That's because only one of the chunk's jobs (the leader) will
relay results back. Awaiting any other job in the chunk will time out without returning a proper
result.
Summary
Functions
Insert a job for asynchronous execution.
Await a relay's execution and return the result.
Await replies from multiple relays and return the results.
Types
await_result()
@type await_result() :: :ok | {:ok, term()} | {:cancel, term()} | {:discard, term()} | {:error, :result_too_large | :timeout | Exception.t()} | {:snooze, integer()}
@type t() :: %Oban.Pro.Relay{ job: Oban.Job.t(), name: term(), pid: pid(), ref: Ecto.UUID.t() }
Functions
async(name \\ Oban, changeset)
@spec async(Oban.name(), Oban.Job.changeset()) :: t() | {:error, Oban.Job.changeset()}
Insert a job for asynchronous execution.
The returned map contains the caller's pid and a unique ref that is used to await the results.
Examples
The single arity version takes a job changeset and inserts it:
relay =
%{id: 123}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
When the Oban instance has a custom name, or an app has multiple Oban instances, you can use the two arity version to select an instance:
changeset = MyApp.Worker.new(%{id: 123})
Oban.Pro.Relay.async(MyOban, changeset)
await(map, timeout \\ 5000)
@spec await(t(), timeout()) :: await_result()
Await a relay's execution and return the result.
Any successful result such as :ok
, {:ok, value}
, or {:cancel, reason}
is passed back as
the await result. When the executed job returns an {:error, reason}
tuple, raises an
exception, or crashes, the result comes back as an error tuple with the exception.
By default, await/1
will timeout after 5 seconds and return {:error, :timeout}
. The job
itself may continue to run, only the local process stops waiting on it.
Result Size Limits
When using the default
Oban.Notifiers.Postgres
notifier for PubSub, any value larger than 8kb (compressed) can't be broadcast due to a PostgresNOTIFY
limitation. Instead, awaiting will return an{:error, :result_too_large}
tuple. TheOban.Notifiers.PG
notifier doesn't have any such size limitation, but it requires Distributed Erlang.
Examples
Await a job:
{:ok, result} =
%{id: 123}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
|> Oban.Pro.Relay.await()
Increase the wait time with a timeout value:
%{id: 456}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
|> Oban.Pro.Relay.await(:timer.seconds(30))
await_many(relays, timeout \\ 5000)
@spec await_many([t()], timeout()) :: [await_result()]
Await replies from multiple relays and return the results.
It returns a list of the results in the same order as the relays supplied as the first argument.
Unlike Task.await_many
or Task.yield_many
, await_many/2
may return partial results when
the timeout is reached. When a job hasn't finished executing the value will be {:error, :timeout}
Examples
Await multiple jobs without any errors or timeouts:
relayed =
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Oban.Pro.Relay.async(&1))
|> Oban.Pro.Relay.await_many()
#=> [{:ok, 2}, {:ok, 4}, {:ok, 6}]
Await multiple jobs with an error timeout:
relayed =
[1, 2, 300_000_000]
|> Enum.map(&SlowWorker.new(%{int: &1}))
|> Enum.map(&Oban.Pro.Relay.async(&1))
|> Oban.Pro.Relay.await_many(100)
#=> [{:ok, 2}, {:ok, 4}, {:error, :timeout}]